当前位置:   article > 正文

RabbitMQ消息中间件(二)SpringBoot集成RabbitMQ五种常用工作模式(简单模式-点对点、工作模式-消息轮询、发布订阅模式-消息群发/共享、路由模式、主题模式)_springboo整合rabbitmq的点对点通讯

springboo整合rabbitmq的点对点通讯

简介:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

RabbitMQ服务支持下列操作系统:

  • Linux
  • Windows
  • MacOS
  • Solaris
  • FreeBSD
  • TRU64
  • Vxworks

RabbitMQ支持下列编程语言:

  • Java
  • Python
  • Ruby
  • Php
  • C#
  • JavaScript
  • Go
  • Elixir
  • Objective-C
  • Swift

主要特性:

  • 可伸缩性:集群服务
  • 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存

以上摘自百度百科

RabbitMQ核心:

  • Server:Broker,接收客户端的连接,实现AMQP实体服务。
  • Virtual host: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
  • Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。
  • Connection:连接,应用程序与Broker的网络连接。
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。

 

RabbitMQ常用的有5种工作模式,分别为简单模式、工作模式、发布/订阅模式、路由模式、主题模式。

在这里用到了消息发送确认和消息接收确认机制:

一般来讲当生产端发送消息给RabbitMQ之后,就默认消息发送成功,同理,RabbitMQ给消费者发送一条消息后也会默认消费端接收了消息,并从队列中删除已发送的消息,但是这样就存在消息丢失的可能。比如,消费端接收消息后,业务逻辑异常,那么消息就相当于未处理完全,需要重新处理,但是此刻RabbitMQ已在队列中删除了此消息,也就导致消息丢失;或者在加入了spring事务管理机制时,虽然消费端正确处理了消息,但是后续业务出错,导致事务出错而回滚,此时也就相当于消息丢失了。

这时候引入了消息确认机制,首先消息发送确认,在消息到达exchange时会给发送端返回一个发生成功或失败的结果,以及消息的标识,这个时候可以对消息发送成功或失败作一些处理,在消息到达queue时同样会给发送端返回结果。再说消息接收确认,当消息发送给消费端时,RabbitMQ会等待消费端回复确认处理才会从队列中删除掉消息,如果一直等不到回复,就一直标记消息为待确认状态,且不会再给该消费端发送消息,直到消费端确认消息才会继续给消费端发送消息,若消费端宕机或断开连接,RabbitMQ就会把此消费端未确认的消息发送给下一个消费端处理

RabbitMQ支持事务:

事务的实现还是在信道(channel)设置上,方法主要有三个:

1、channel.txSelect()开启事务

2、channel.txComment()提交事务

3、channel.txRollback()回滚事务

采用事务机制能保证消息到达RabbitMQ,但是采用事务机制也就导致了发送消息到RabbitMQ需要等待消息到达确认,性能上大大的降低了。

这里采用消息发送确认机制(publisher Confirm/publisher Return)和消息接收确认(ACK手动确认模式)

一、RabbitMQ常用五种工作模式之一简单模式:

  • 一个生产者对应一个消费者
  • 生产者将消息发送到指定队列,监听者监听指定队列并获取消息

1、创建一个Springboot项目,集成RabbitMQ,在pom.xml里添加如下代码,带注释的为添加rabbit依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.4.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.jx</groupId>
  12. <artifactId>rabbitmq</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>rabbitmq</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <!--添加rabbitmq依赖-->
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-amqp</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. <scope>test</scope>
  33. <exclusions>
  34. <exclusion>
  35. <groupId>org.junit.vintage</groupId>
  36. <artifactId>junit-vintage-engine</artifactId>
  37. </exclusion>
  38. </exclusions>
  39. </dependency>
  40. </dependencies>
  41. <build>
  42. <plugins>
  43. <plugin>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-maven-plugin</artifactId>
  46. </plugin>
  47. </plugins>
  48. </build>
  49. </project>

 

2、在application.properties里添加RabbitMQ配置信息

  1. #rabbitmq配置
  2. #访问地址为本机,端口号5672,账号/密码:guest
  3. spring.rabbitmq.host=127.0.0.1
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=guest
  6. spring.rabbitmq.password=guest
  7. #消费数量
  8. spring.rabbitmq.listener.simple.concurrency=3
  9. #最大消费数量
  10. spring.rabbitmq.listener.simple.max-concurrency=20
  11. #消费者每次从队列获取的消息数量。
  12. spring.rabbitmq.listener.simple.prefetch=1
  13. #消费接收确认机制-手动确认
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

3、编写RabbitmqConfig配置类,其中消费者监听器为自定义类,后面编写

  1. package com.jx.rabbitmq.queue;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. @Configuration
  13. public class RabbitmqConfig {
  14. //创建日志
  15. private static final Logger loger = LoggerFactory.getLogger(RabbitmqConfig.class);
  16. //创建一个简单队列
  17. @Bean
  18. public Queue simpleQueue(){
  19. return new Queue("simple",true);
  20. }
  21. //注入连接工厂
  22. @Autowired
  23. private CachingConnectionFactory cachingConnectionFactory;
  24. //监听器容器配置
  25. @Autowired
  26. private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
  27. //注入消费者监听器
  28. @Autowired
  29. private ConsumersListener consumersListener;
  30. @Bean
  31. public SimpleMessageListenerContainer comsumersListenerContainer(){
  32. //创建监听容器工厂
  33. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  34. //将监听容器配置和链接工厂注入监听容器工厂
  35. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  36. //监听容器工厂创建监听容器
  37. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  38. //监听容器指定监听器
  39. simpleMessageListenerContainer.setMessageListener(consumersListener);
  40. //指定监听器监听队列
  41. simpleMessageListenerContainer.setQueueNames("simple");
  42. return simpleMessageListenerContainer;
  43. }
  44. }

 

4、编写消费者监听器,实现ChannelAwareMessageListener接口,这里先把获取tag和确认消息注释掉,后面打开

  1. package com.jx.rabbitmq.queue;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class ConsumersListener implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static final Logger loger = LoggerFactory.getLogger(ConsumersListener.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. loger.info("消费者监听到一条消息-----------");
  16. // //获取tag
  17. // long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取监听消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者:"+str);
  22. // //确认消息
  23. // channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. loger.error("程序异常------------");
  26. }
  27. }
  28. }

5、编写生产者类

  1. package com.jx.rabbitmq.queue;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class Producers {
  11. //创建日志
  12. private static final Logger logger = LoggerFactory.getLogger(Producers.class);
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. public void send(){
  16. Message message = MessageBuilder.withBody(new String("生产者发送一条消息").getBytes()).build();
  17. rabbitTemplate.convertAndSend("simple",message);
  18. logger.info("消息发送完成");
  19. }
  20. }

6、编写ProducersTest测试类

  1. package com.jx.rabbitmq.queue;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import static org.junit.jupiter.api.Assertions.*;
  6. @SpringBootTest
  7. class ProducersTest {
  8. @Autowired
  9. private Producers producers;
  10. @Test
  11. public void test(){
  12. producers.send();
  13. }
  14. }

7、启动springboot

7.1、执行测试类

连续执行2次测试类发现,消费者监听到2条消息,管理工具查看到2条未确认消息,因为配置信息为手动确认,在编写消息监听时注释了确认消息代码

关闭springboot,消息重回队列

7.2、重启springboot

将消息监听类中的注释代码放开,再次执行2次测试类

确认消息有三种模式:无需确认(none)、手动确认(manual)、自动确认(auto)。

在springboot的配置文件里设置spring.rabbitmq.listener.simple.acknowledge-mode

  • 当设置为none时,RabbitMQ只会向消费端发送消息,并不会关心消费端是否处理了消息
  • 当设置为manual时,RabbitMQ向消费端发送消息后,会等待消费端回应确认了消息后才会在队列中删除已发送给消费端且消费端已确认处理的消息
  • 当设置为auto时,RabbitMQ向消费端发送消息后消费端会自动确认消息,无需消费端手动处理确认

在配置文件里添加spring.rabbitmq.listener.simple.acknowledge-mode=manual即为手动确认消息

  • 手动确认消息需使用basicAck()方法确认消息
  • 使用basicNack()拒绝消息且让消息重回队列可能造成死循环,因为RabbitMQ不断重发消息给消费端,消费端不断的拒绝消息且重回队列。
  • 使用basicRject()也可以拒绝消息,区别在于basicRject()一次只能拒绝一条,而basicNack()一次可以批量拒绝多条消息

二、RabbitMQ常用五种工作模式之二工作模式:

  • 一个生产者对应多个消费者
  • 当一个生产者向同一队列发送多条消息后,会将消息轮询发送给多个同时监听此队列的消费者

2.1编辑WorkRabbitmqConfig类,其中三个消费端监听器在后面补充

  1. package com.jx.rabbitmq.work;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. @Configuration
  13. public class WorkRabbitmqConfig {
  14. //创建日志
  15. private static Logger logger = LoggerFactory.getLogger(WorkRabbitmqConfig.class);
  16. //创建队列
  17. @Bean
  18. public Queue workQueue(){
  19. return new Queue("work_queue",true);
  20. }
  21. //注入链接工厂
  22. @Autowired
  23. private CachingConnectionFactory cachingConnectionFactory;
  24. //监听器容器配置
  25. @Autowired
  26. private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
  27. //注入消费监听器
  28. @Autowired
  29. private ConsumersListenerA consumersListenerA;
  30. @Autowired
  31. private ConsumersListenerB consumersListenerB;
  32. @Autowired
  33. private ConsumersListenerC consumersListenerC;
  34. //注入监听器容器
  35. @Bean
  36. public SimpleMessageListenerContainer workListenerContainerA(){
  37. //创建监听器容器
  38. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  39. //将监听器容器配置和链接工厂注入监听器容器
  40. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  41. //监听器容器工厂创建监听器容器
  42. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  43. //监听器容器指定监听队列
  44. simpleMessageListenerContainer.setQueueNames("work_queue");
  45. //指定监听器
  46. simpleMessageListenerContainer.setMessageListener(consumersListenerA);
  47. return simpleMessageListenerContainer;
  48. }
  49. @Bean
  50. public SimpleMessageListenerContainer workListenerContainerB(){
  51. //创建监听器容器
  52. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  53. //将监听器容器配置和链接工厂注入监听器容器
  54. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  55. //监听器容器工厂创建监听器容器
  56. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  57. //监听器容器指定监听队列
  58. simpleMessageListenerContainer.setQueueNames("work_queue");
  59. //指定监听器
  60. simpleMessageListenerContainer.setMessageListener(consumersListenerB);
  61. return simpleMessageListenerContainer;
  62. }
  63. @Bean
  64. public SimpleMessageListenerContainer workListenerContainerC(){
  65. //创建监听器容器
  66. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  67. //将监听器容器配置和链接工厂注入监听器容器
  68. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  69. //监听器容器工厂创建监听器容器
  70. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  71. //监听器容器指定监听队列
  72. simpleMessageListenerContainer.setQueueNames("work_queue");
  73. //指定监听器
  74. simpleMessageListenerContainer.setMessageListener(consumersListenerC);
  75. return simpleMessageListenerContainer;
  76. }
  77. }

2.2、编辑ConsumersListenerA、ConsumersListenerB、ConsumersListenerC分别实现ChannelAwareMessageListener接口

  1. package com.jx.rabbitmq.work;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class ConsumersListenerA implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(ConsumersListenerA.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. logger.info("监听到一条消息-------------------");
  16. //获取tag
  17. long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取监听消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者A:"+str+"-----"+tag);
  22. //确认消息
  23. channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. logger.error("监听异常");
  26. }
  27. }
  28. }
  1. package com.jx.rabbitmq.work;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class ConsumersListenerB implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(ConsumersListenerB.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try{
  15. logger.info("监听到一条消息-------------------");
  16. //获取tag
  17. long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者B:"+str+"-----"+tag);
  22. //确认消息
  23. channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. logger.error("监听异常");
  26. }
  27. }
  28. }
  1. package com.jx.rabbitmq.work;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class ConsumersListenerC implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(ConsumersListenerC.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. logger.info("监听到一条消息-------------------");
  16. //获取tag
  17. long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者C:"+str+"-----"+tag);
  22. //确认消息
  23. channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. logger.error("监听异常");
  26. }
  27. }
  28. }

2.3、编辑WorkProducers生产者类

  1. package com.jx.rabbitmq.work;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.BeanFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class WorkProducers {
  12. //创建日志
  13. private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. public void send() throws Exception{
  17. for(int i=0;i<3;i++){
  18. System.out.println("-----"+rabbitTemplate+"-------\n");
  19. //创建消息
  20. Message message = MessageBuilder.withBody(new String("生产者发送了第"+i+"条消息").getBytes()).build();
  21. //发送消息
  22. rabbitTemplate.convertAndSend("work_queue",message);
  23. logger.info("消息发送完成");
  24. //线程睡眠100毫秒
  25. Thread.sleep(100);
  26. }
  27. }
  28. }

2.4、启动springboot

2.4.1、首先注释掉生产者代码里的Thread.sleep(100),执行test

在这里可以看到,消息好像并没有轮询分配给三个消费端,且消费端处理数据为什么是无序的呢?

首先看第一个问题,为什么没有轮询?看配置文件,在设置spring.rabbitmq.listener.simple.concurrency时设置了消费数量为3,正好在生产端发送了三条消息到队列,导致三条消息全部被消费者A获取。

再看第二个问题,为什么消息是无序的?因为在理想环境下RabbitMQ发送消息到队列是“有序”的,但实际上由于网络原因、或其他如发送异常等会造成消息实际发送到队列时已经是“无序”了,这里假设其他正常,把Tread.sleep(100)加上再次执行会发现,这个时候消费顺序就是“有序”的

再将生产端代码循环执行20次

三、RabbitMQ常用五种工作模式之三发布/订阅模式:

  • 生产端发送一条消息给"fanout"类型交换器(exchange),无需关心最终将消息发送到了哪个队列
  • 消息发送到这个"fanout"类型交换器之后,同一消息会被分发给不同的与此交换器绑定的队列(queue)
  • 消费端监听与此交换器绑定的队列获取消息
  • 这种工作模式类似于群发消息

3.1、编辑application.properties,这里我们用到了消息发送确认,所以需要在配置文件里开启相关配置,在末尾添加。

  1. #rabbitmq配置
  2. #访问地址为本机,端口号5672,账号/密码:guest
  3. spring.rabbitmq.host=127.0.0.1
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=guest
  6. spring.rabbitmq.password=guest
  7. #消费数量
  8. spring.rabbitmq.listener.simple.concurrency=3
  9. #最大消费数量
  10. spring.rabbitmq.listener.simple.max-concurrency=20
  11. #消费者每次从队列获取的消息数量。
  12. spring.rabbitmq.listener.simple.prefetch=1
  13. #消费接收确认机制-手动确认
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  15. # 开启发送确认
  16. spring.rabbitmq.publisher-confirms=true

3.2、编辑FanoutRabbitmqConfig类,在这里将RabbitTemplate统一配置,消息到达和未到达交换器的处理

  1. package com.jx.rabbitmq.fanout;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Binding;
  5. import org.springframework.amqp.core.BindingBuilder;
  6. import org.springframework.amqp.core.FanoutExchange;
  7. import org.springframework.amqp.core.Queue;
  8. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  9. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  10. import org.springframework.amqp.rabbit.connection.CorrelationData;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.Configuration;
  17. import org.springframework.context.annotation.Scope;
  18. @Configuration
  19. public class FanoutRabbitmqConfig {
  20. private static final Logger logger = LoggerFactory.getLogger(FanoutRabbitmqConfig.class);
  21. //创建一个队列
  22. //第一个属性为队列名称,第二个属性为是否持久化队列
  23. @Bean
  24. public Queue queueA(){
  25. return new Queue("queueA",true);
  26. }
  27. @Bean
  28. public Queue queueB(){
  29. return new Queue("queueB",true);
  30. }
  31. //创建链接工厂
  32. @Autowired
  33. private CachingConnectionFactory cachingConnectionFactory;
  34. //创建监听器容器工厂配置
  35. @Autowired
  36. private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
  37. //创建一个fanout类型交换器
  38. //第一个属性name:交换器名称,第二个属性durable是否持久化,第三个属性autoDelete是否自动删除
  39. @Bean
  40. public FanoutExchange fanoutExchange(){
  41. return new FanoutExchange("fanoutExchange",true,true);
  42. }
  43. //创建绑定关系
  44. @Bean
  45. public Binding bindingQueueAToFanoutExchange(Queue queueA,FanoutExchange fanoutExchange){
  46. return BindingBuilder.bind(queueA).to(fanoutExchange);
  47. }
  48. @Bean
  49. public Binding bindingQueueBToFanoutExchange(Queue queueB,FanoutExchange fanoutExchange){
  50. return BindingBuilder.bind(queueB).to(fanoutExchange);
  51. }
  52. @Bean
  53. @Scope(value = "prototype")
  54. public RabbitTemplate rabbitTemplate(){
  55. //创建RabbitMQTemplate对象
  56. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  57. //设置链接工厂
  58. rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
  59. //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
  60. //第一个属性CorrelationData为消息标识
  61. //第二个属性为是否送达
  62. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  63. @Override
  64. public void confirm(CorrelationData correlationData, boolean b, String s) {
  65. if(b){
  66. logger.info("消息已送达至fanoutExchange交换器");
  67. logger.info("消息标识:"+correlationData.getId());
  68. }else {
  69. logger.info("消息未送达");
  70. logger.info("消息标识:"+correlationData.getId());
  71. }
  72. }
  73. });
  74. return rabbitTemplate;
  75. }
  76. //注入监听器容器相关配置
  77. @Bean
  78. public SimpleMessageListenerContainer FanoutListenerContainerA(Queue queueA,FanoutConsumersListenerA fanoutConsumersListenerA){
  79. //创建监听器容器
  80. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  81. //将监听器容器配置和链接工厂注入监听器容器
  82. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  83. //监听器容器工厂创建监听器容器
  84. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  85. //监听器容器指定监听队列
  86. simpleMessageListenerContainer.setQueues(queueA);
  87. //指定监听器
  88. simpleMessageListenerContainer.setMessageListener(fanoutConsumersListenerA);
  89. return simpleMessageListenerContainer;
  90. }
  91. @Bean
  92. public SimpleMessageListenerContainer FanoutListenerContainerB(Queue queueB,FanoutConsumersListenerB fanoutConsumersListenerB){
  93. //创建监听器容器
  94. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  95. //将监听器容器配置和链接工厂注入监听器容器
  96. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  97. //监听器容器工厂创建监听器容器
  98. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  99. //监听器容器指定监听队列
  100. simpleMessageListenerContainer.setQueues(queueB);
  101. //指定监听器
  102. simpleMessageListenerContainer.setMessageListener(fanoutConsumersListenerB);
  103. return simpleMessageListenerContainer;
  104. }
  105. }

3.3、编辑2个消费端类FanoutConsumersListenerA和FanoutConsumersListenerB,分别监听队列A和B

  1. package com.jx.rabbitmq.fanout;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class FanoutConsumersListenerA implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(FanoutConsumersListenerA.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. logger.info("监听到一条消息来自于queueA-------------------");
  16. //获取tag
  17. long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者A:"+str);
  22. //确认消息
  23. channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. logger.error("程序异常");
  26. }
  27. }
  28. }
  1. package com.jx.rabbitmq.fanout;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class FanoutConsumersListenerB implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(FanoutConsumersListenerB.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. logger.info("监听到一条消息来自于queueB-------------------");
  16. //获取tag
  17. long tag = message.getMessageProperties().getDeliveryTag();
  18. //获取消息体
  19. String str = new String(message.getBody(),"utf-8");
  20. //打印消息
  21. System.out.println("消费者B:"+str);
  22. //确认消息
  23. channel.basicAck(tag,true);
  24. }catch (Exception e){
  25. logger.error("程序异常");
  26. }
  27. }
  28. }

3.4、编辑生产者类FanoutPruducers

  1. package com.jx.rabbitmq.fanout;
  2. import com.jx.rabbitmq.work.WorkProducers;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageBuilder;
  7. import org.springframework.amqp.rabbit.connection.CorrelationData;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.util.UUID;
  12. @Component
  13. public class FanoutPruducers {
  14. //创建日志
  15. private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. public void send() throws Exception{
  19. for(int i=0;i<3;i++){
  20. //创建消息唯一标识
  21. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  22. //创建消息
  23. Message message = MessageBuilder.withBody(new String("生产者发送了第"+i+"条消息").getBytes()).build();
  24. //发送消息
  25. //第一个属性exchange指定交换器
  26. //第二个属性路由键,这里为fanout类型交换器群发所以无需指定
  27. //第三个属性为需要发送的消息
  28. //第四个属性为消息唯一标识
  29. rabbitTemplate.convertAndSend("fanoutExchange","",message,correlationData);
  30. logger.info("消息发送完成");
  31. //线程睡眠100毫秒
  32. Thread.sleep(100);
  33. }
  34. }
  35. }

3.5、编辑测试类FanoutPruducersTest

  1. package com.jx.rabbitmq.fanout;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import static org.junit.jupiter.api.Assertions.*;
  6. @SpringBootTest
  7. class FanoutPruducersTest {
  8. @Autowired
  9. private FanoutPruducers fanoutPruducers;
  10. @Test
  11. public void send()throws Exception{
  12. fanoutPruducers.send();
  13. }
  14. }

3.6、启动springboot

3.6.1、执行test,消息发送到fanout类型交换器,会将消息群发到所有与之绑定的队列中

再看看消息发送确认回调,所有消息都是送达

现在修改一下FanoutPruducers类,将发送指定的exchange更改一下,使消息无法送达,再执行一次test

消息均未送达到exchange,这时我们就需要做相应处理了

四、RabbitMQ常用五种工作模式之一路由模式:

  • 生产端发送消息时指定交换器和路由规则
  • RabbitMQ收到消息后根据生产端指定的规则将消息路由到相应的队列中
  • 消费端通过监听对应的队列获取消息

4.1、编辑application.properties,在末尾加入spring.rabbitmq.publisher-returns=true开启消息发送失败返回

  1. #rabbitmq配置
  2. #访问地址为本机,端口号5672,账号/密码:guest
  3. spring.rabbitmq.host=127.0.0.1
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=guest
  6. spring.rabbitmq.password=guest
  7. #消费数量
  8. spring.rabbitmq.listener.simple.concurrency=3
  9. #最大消费数量
  10. spring.rabbitmq.listener.simple.max-concurrency=20
  11. #消费者每次从队列获取的消息数量。
  12. spring.rabbitmq.listener.simple.prefetch=1
  13. #消费接收确认机制-手动确认
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  15. # 开启发送确认
  16. spring.rabbitmq.publisher-confirms=true
  17. # 开启发送失败退回
  18. spring.rabbitmq.publisher-returns=true

4.2、编辑DirectRabbitmqConfig,在rabbitTemplate里设置消息发送确认回调和发送失败返回处理

  1. package com.jx.rabbitmq.direct;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.amqp.rabbit.connection.CorrelationData;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. import org.springframework.context.annotation.Scope;
  15. @Configuration
  16. public class DirectRabbitmqConfig {
  17. //创建日志
  18. private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
  19. //链接工厂
  20. @Autowired
  21. private CachingConnectionFactory cachingConnectionFactory;
  22. //监听器容器工厂配置
  23. @Autowired
  24. private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
  25. //创建队列
  26. //第一个属性队列名称,第二个属性是否持久化
  27. @Bean
  28. public Queue directQueueA(){
  29. return new Queue("directQueueA",true);
  30. }
  31. @Bean
  32. public Queue directQueueB(){
  33. return new Queue("directQueueB",true);
  34. }
  35. //创建direct类型交换器
  36. //第一个属性交换器名称,第二个属性是否持久化,第三个属性是否自动删除,当没有队列与之绑定时会自定删除此交换器
  37. @Bean
  38. public DirectExchange directExchange(){
  39. return new DirectExchange("directExchange",true,true);
  40. }
  41. //创建绑定关系
  42. @Bean
  43. public Binding bindingQueueAToDirectExchange(Queue directQueueA,DirectExchange directExchange){
  44. return BindingBuilder.bind(directQueueA).to(directExchange).with("A");
  45. }
  46. @Bean
  47. public Binding bindingQueueBToDirectExchange(Queue directQueueB,DirectExchange directExchange){
  48. return BindingBuilder.bind(directQueueB).to(directExchange).with("B");
  49. }
  50. //注入RabbitTemplate
  51. @Bean
  52. @Scope(value = "prototype")
  53. public RabbitTemplate directRabbitTemplate(){
  54. //创建RabbitTemplate
  55. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  56. //设置链接工厂
  57. rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
  58. //开启消息失败退回一定要将Mandatory设置为true,否则无效
  59. rabbitTemplate.setMandatory(true);
  60. //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
  61. //第一个属性CorrelationData为消息标识
  62. //第二个属性为是否送达
  63. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  64. @Override
  65. public void confirm(CorrelationData correlationData, boolean b, String s) {
  66. if(b){
  67. logger.info("消息已送达至directExchange交换器");
  68. logger.info("消息标识:"+correlationData.getId());
  69. }else {
  70. logger.info("消息未送达");
  71. logger.info("消息标识:"+correlationData.getId());
  72. }
  73. }
  74. });
  75. //消息未送达队列时返回
  76. //第一个属性失败消息
  77. //第二个属性失败响应代码
  78. //第三个属性失败原因
  79. //第四个属性发送消息到队列失败的交换器类型
  80. //第五个属性为生产端发送消息时指定的路由规则routing key
  81. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  82. @Override
  83. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  84. System.out.println("发送失败的消息:"+new String(message.getBody()).toString());
  85. System.out.println(i);
  86. System.out.println(s);
  87. System.out.println(s1);
  88. System.out.println(s2);
  89. }
  90. });
  91. return rabbitTemplate;
  92. }
  93. //注入监听器容器
  94. @Bean
  95. public SimpleMessageListenerContainer directSimpleMessageListenerContainerA(Queue directQueueA,DirectConsumersListenerA directConsumersListenerA){
  96. //创建监听器容器工厂
  97. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  98. //设置监听器容器工厂配置
  99. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  100. //创建监听器容器
  101. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  102. //设置监听器容器监听队列
  103. simpleMessageListenerContainer.setQueues(directQueueA);
  104. //设置监听器
  105. simpleMessageListenerContainer.setMessageListener(directConsumersListenerA);
  106. return simpleMessageListenerContainer;
  107. }
  108. @Bean
  109. public SimpleMessageListenerContainer directSimpleMessageListenerContainerB(Queue directQueueB,DirectConsumersListenerB directConsumersListenerB){
  110. //创建监听器容器工厂
  111. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  112. //设置监听器容器工厂配置
  113. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  114. //创建监听器容器
  115. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  116. //设置监听器容器监听队列
  117. simpleMessageListenerContainer.setQueues(directQueueB);
  118. //设置监听器
  119. simpleMessageListenerContainer.setMessageListener(directConsumersListenerB);
  120. return simpleMessageListenerContainer;
  121. }
  122. }

注意:设置消息发送失败返回一定要设置rabbitTemplate.setMandatory(true);否则不会生效

4.3、编辑DirectConsumersListenerA和DirectConsumersListenerB

  1. package com.jx.rabbitmq.direct;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class DirectConsumersListenerA implements ChannelAwareMessageListener {
  12. //创建日志
  13. private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. @Override
  17. public void onMessage(Message message, Channel channel) throws Exception {
  18. try {
  19. //获取消息
  20. String str = new String(message.getBody());
  21. //打印消息
  22. System.out.println("消费者A:"+str);
  23. //确认消息
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  25. }catch (Exception e){
  26. logger.error("程序异常");
  27. }
  28. }
  29. }
  1. package com.jx.rabbitmq.direct;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class DirectConsumersListenerB implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. //获取消息
  16. String str = new String(message.getBody());
  17. //打印消息
  18. System.out.println("消费者B:"+str);
  19. //确认消息
  20. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  21. }catch (Exception e){
  22. logger.error("程序异常");
  23. }
  24. }
  25. }

4.4、编辑DirectPruducers和DirectPruducersTest

  1. package com.jx.rabbitmq.direct;
  2. import com.jx.rabbitmq.work.WorkProducers;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageBuilder;
  7. import org.springframework.amqp.rabbit.connection.CorrelationData;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.util.UUID;
  12. @Component
  13. public class DirectPruducers {
  14. //创建日志
  15. private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
  16. @Autowired
  17. private RabbitTemplate directRabbitTemplate;
  18. public void sendA() throws Exception{
  19. //创建消息唯一标识
  20. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  21. //创建消息
  22. Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到A").getBytes()).build();
  23. //发送消息
  24. //第一个属性exchange指定交换器
  25. //第二个属性路由键
  26. //第三个属性为需要发送的消息
  27. //第四个属性为消息唯一标识
  28. directRabbitTemplate.convertAndSend("directExchange","A",message,correlationData);
  29. logger.info("消息发送完成");
  30. }
  31. public void sendB() throws Exception{
  32. //创建消息唯一标识
  33. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  34. //创建消息
  35. Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到B").getBytes()).build();
  36. //发送消息
  37. //第一个属性exchange指定交换器
  38. //第二个属性路由键
  39. //第三个属性为需要发送的消息
  40. //第四个属性为消息唯一标识
  41. directRabbitTemplate.convertAndSend("directExchange","B",message,correlationData);
  42. System.out.println(message);
  43. logger.info("消息发送完成");
  44. }
  45. public void sendC() throws Exception{
  46. //创建消息唯一标识
  47. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  48. //创建消息
  49. Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到C").getBytes()).build();
  50. //发送消息
  51. //第一个属性exchange指定交换器
  52. //第二个属性路由键
  53. //第三个属性为需要发送的消息
  54. //第四个属性为消息唯一标识
  55. directRabbitTemplate.convertAndSend("directExchange","C",message,correlationData);
  56. System.out.println(message);
  57. logger.info("消息发送完成");
  58. }
  59. }
  1. package com.jx.rabbitmq.direct;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import static org.junit.jupiter.api.Assertions.*;
  6. @SpringBootTest
  7. class DirectPruducersTest {
  8. @Autowired
  9. private DirectPruducers directPruducers;
  10. @Test
  11. public void send()throws Exception{
  12. directPruducers.sendA();
  13. directPruducers.sendB();
  14. directPruducers.sendC();
  15. }
  16. }

4.5、启动springboot

4.5.1、执行test

生产端一共发送了三条消息,且三条消息都送到了交换器,但是消费端A和B各监听到一条,

第三条消息的routing key设置为C,但是directExchange交换器并没有匹配到该规则下绑定的队列,导致消息发送失败返回

五、RabbitMQ常用五种工作模式之一主题模式:

  • 该模式下的队列与交换器绑定必须是以“ . ”分割的单词列表,单词可以为任意词,一般使用带有具体业务含义的单词,绑定单词可以为任意数量单词,最大可达255字节,该模式下绑定规则有2个特殊含义字符:
  • “ . ”可以匹配(替代)任意一个单词
  • “ # ”可以匹配(替代)零或更多的单词
  • 生产端发送一条消息到topic交换器,指定路由规则为topic.A则可以路由到绑定关系为topic.A、topic.*或topic.#的队列中
  • 生产端发送一条消息到topic交换器,指定路由规则为topic.A.B则可以匹配到绑定关系为topic.A.B、topic.A.*、topic.*.B或topic.#的队列中

5.1、编辑application.properties

  1. #rabbitmq配置
  2. #访问地址为本机,端口号5672,账号/密码:guest
  3. spring.rabbitmq.host=127.0.0.1
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=guest
  6. spring.rabbitmq.password=guest
  7. #消费数量
  8. spring.rabbitmq.listener.simple.concurrency=3
  9. #最大消费数量
  10. spring.rabbitmq.listener.simple.max-concurrency=20
  11. #消费者每次从队列获取的消息数量。
  12. spring.rabbitmq.listener.simple.prefetch=1
  13. #消费接收确认机制-手动确认
  14. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  15. # 开启发送确认
  16. spring.rabbitmq.publisher-confirms=true
  17. # 开启发送失败退回
  18. spring.rabbitmq.publisher-returns=true

5.2、编辑TopicRabbitmqConfig

  1. package com.jx.rabbitmq.topic;
  2. import com.jx.rabbitmq.direct.DirectConsumersListenerA;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  7. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import org.springframework.context.annotation.Scope;
  16. @Configuration
  17. public class TopicRabbitmqConfig {
  18. //创建日志
  19. private static Logger logger = LoggerFactory.getLogger(TopicRabbitmqConfig.class);
  20. //链接工厂
  21. @Autowired
  22. private CachingConnectionFactory cachingConnectionFactory;
  23. //监听器容器工厂配置
  24. @Autowired
  25. private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
  26. //创建队列
  27. //第一个属性队列名称,第二个属性是否持久化
  28. @Bean
  29. public Queue topicQueueA(){
  30. return new Queue("topicQueueA",true);
  31. }
  32. @Bean
  33. public Queue topicQueueB(){
  34. return new Queue("topicQueueB",true);
  35. }
  36. @Bean
  37. public Queue topicQueueC(){
  38. return new Queue("topicQueueC",true);
  39. }
  40. //创建topic类型交换器
  41. //第一个属性交换器名称,第二个属性是否持久化,第三个属性是否自动删除,当没有队列与之绑定时会自定删除此交换器
  42. @Bean
  43. public TopicExchange topicExchange(){
  44. return new TopicExchange("topicExchange",true,true);
  45. }
  46. //创建绑定关系
  47. @Bean
  48. public Binding bindingTopicQueueAToTopicExchange(Queue topicQueueA, TopicExchange topicExchange){
  49. return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topic.A");
  50. }
  51. @Bean
  52. public Binding bindingTopicQueueBToTopicExchange(Queue topicQueueB,TopicExchange topicExchange){
  53. return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topic.*");
  54. }
  55. @Bean
  56. public Binding bindingTopicQueueCToTopicExchange(Queue topicQueueC,TopicExchange topicExchange){
  57. return BindingBuilder.bind(topicQueueC).to(topicExchange).with("topic.#");
  58. }
  59. //注入RabbitTemplate
  60. @Bean
  61. @Scope(value = "prototype")
  62. public RabbitTemplate topicRabbitTemplate(){
  63. //创建RabbitTemplate
  64. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  65. //设置链接工厂
  66. rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
  67. //开启消息失败退回一定要将Mandatory设置为true,否则无效
  68. rabbitTemplate.setMandatory(true);
  69. //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
  70. //第一个属性CorrelationData为消息标识
  71. //第二个属性为是否送达
  72. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  73. @Override
  74. public void confirm(CorrelationData correlationData, boolean b, String s) {
  75. if(b){
  76. logger.info("消息已送达至topicExchange交换器");
  77. logger.info("消息标识:"+correlationData.getId());
  78. }else {
  79. logger.info("消息未送达");
  80. logger.info("消息标识:"+correlationData.getId());
  81. }
  82. }
  83. });
  84. //消息未送达队列时返回
  85. //第一个属性失败消息
  86. //第二个属性失败响应代码
  87. //第三个属性失败原因
  88. //第四个属性发送消息到队列失败的交换器类型
  89. //第五个属性为生产端发送消息时指定的路由规则routing key
  90. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  91. @Override
  92. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  93. System.out.println("发送失败的消息:"+new String(message.getBody()).toString());
  94. System.out.println(i);
  95. System.out.println(s);
  96. System.out.println(s1);
  97. System.out.println(s2);
  98. }
  99. });
  100. return rabbitTemplate;
  101. }
  102. //注入监听器容器
  103. @Bean
  104. public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerA(Queue topicQueueA, TopicConsumersListenerA topicConsumersListenerA){
  105. //创建监听器容器工厂
  106. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  107. //设置监听器容器工厂配置
  108. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  109. //创建监听器容器
  110. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  111. //设置监听器容器监听队列
  112. simpleMessageListenerContainer.setQueues(topicQueueA);
  113. //设置监听器
  114. simpleMessageListenerContainer.setMessageListener(topicConsumersListenerA);
  115. return simpleMessageListenerContainer;
  116. }
  117. @Bean
  118. public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerB(Queue topicQueueB, TopicConsumersListenerB topicConsumersListenerB){
  119. //创建监听器容器工厂
  120. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  121. //设置监听器容器工厂配置
  122. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  123. //创建监听器容器
  124. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  125. //设置监听器容器监听队列
  126. simpleMessageListenerContainer.setQueues(topicQueueB);
  127. //设置监听器
  128. simpleMessageListenerContainer.setMessageListener(topicConsumersListenerB);
  129. return simpleMessageListenerContainer;
  130. }
  131. @Bean
  132. public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerC(Queue topicQueueC, TopicConsumersListenerC topicConsumersListenerC){
  133. //创建监听器容器工厂
  134. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  135. //设置监听器容器工厂配置
  136. simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
  137. //创建监听器容器
  138. SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
  139. //设置监听器容器监听队列
  140. simpleMessageListenerContainer.setQueues(topicQueueC);
  141. //设置监听器
  142. simpleMessageListenerContainer.setMessageListener(topicConsumersListenerC);
  143. return simpleMessageListenerContainer;
  144. }
  145. }

5.3、编辑TopicConsumersListenerA、TopicConsumersListenerB和TopicConsumersListenerC

  1. package com.jx.rabbitmq.topic;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Map;
  9. import java.util.Set;
  10. @Component
  11. public class TopicConsumersListenerA implements ChannelAwareMessageListener {
  12. //创建日志
  13. private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerA.class);
  14. @Override
  15. public void onMessage(Message message, Channel channel) throws Exception {
  16. try {
  17. //获取消息所在队列名
  18. String queueName = message.getMessageProperties().getConsumerQueue();
  19. //获取生产端指定匹配规则
  20. String routingKey = message.getMessageProperties().getReceivedRoutingKey();
  21. //获取消息
  22. String str = new String(message.getBody());
  23. //打印消息
  24. System.out.println("消费者A监听到消息来自"+queueName+"队列绑定规则为:topic.A,生产端指定匹配规则为"+routingKey+":"+str);
  25. //确认消息
  26. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  27. }catch (Exception e){
  28. logger.error("程序异常");
  29. }
  30. }
  31. }
  1. package com.jx.rabbitmq.topic;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class TopicConsumersListenerB implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerB.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. //获取消息所在队列名
  16. String queueName = message.getMessageProperties().getConsumerQueue();
  17. //获取匹配规则
  18. String routingKey = message.getMessageProperties().getReceivedRoutingKey();
  19. //获取消息
  20. String str = new String(message.getBody());
  21. //打印消息
  22. System.out.println("消费者B监听到消息来自"+queueName+"队列绑定规则为:topic.*,生产端指定匹配规则为"+routingKey+":"+str);
  23. //确认消息
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  25. }catch (Exception e){
  26. logger.error("程序异常");
  27. }
  28. }
  29. }
  1. package com.jx.rabbitmq.topic;
  2. import com.rabbitmq.client.Channel;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class TopicConsumersListenerC implements ChannelAwareMessageListener {
  10. //创建日志
  11. private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerC.class);
  12. @Override
  13. public void onMessage(Message message, Channel channel) throws Exception {
  14. try {
  15. //获取消息所在队列名
  16. String queueName = message.getMessageProperties().getConsumerQueue();
  17. //获取匹配规则
  18. String routingKey = message.getMessageProperties().getReceivedRoutingKey();
  19. //获取消息
  20. String str = new String(message.getBody());
  21. //打印消息
  22. System.out.println("消费者C监听到消息来自"+queueName+"队列绑定规则为:topic.#,生产端指定匹配规则为"+routingKey+":"+str);
  23. //确认消息
  24. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  25. }catch (Exception e){
  26. logger.error("程序异常");
  27. }
  28. }
  29. }

5.4、编辑TopicPruducers和TopicPruducersTest

  1. package com.jx.rabbitmq.topic;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.rabbit.connection.CorrelationData;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. @Component
  12. public class TopicPruducers {
  13. //创建日志
  14. private static final Logger logger = LoggerFactory.getLogger(TopicPruducers.class);
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. public void send(String routingKey) throws Exception{
  18. //创建消息唯一标识
  19. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  20. //创建消息
  21. Message message = MessageBuilder.withBody(new String("生产者发送了一条消息").getBytes()).build();
  22. //发送消息
  23. //第一个属性exchange指定交换器
  24. //第二个属性路由键
  25. //第三个属性为需要发送的消息
  26. //第四个属性为消息唯一标识
  27. rabbitTemplate.convertAndSend("topicExchange",routingKey,message,correlationData);
  28. logger.info("消息发送完成");
  29. }
  30. }
  1. package com.jx.rabbitmq.topic;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import static org.junit.jupiter.api.Assertions.*;
  6. @SpringBootTest
  7. class TopicPruducersTest {
  8. @Autowired
  9. private TopicPruducers topicPruducers;
  10. @Test
  11. public void send()throws Exception{
  12. topicPruducers.send("topic.A");
  13. Thread.sleep(100);
  14. topicPruducers.send("topic.abc.aaa");
  15. }
  16. }

5.5、启动springboot

5.5.1、执行test

生产端发送消息指定路由为topic.A匹配到3个队列,绑定关系分别为topic.A、topic.*和topic.#。

生产端发送消息指定路由为topic.abc.aaa匹配到1个队列,绑定关系为topic.#。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/971704
推荐阅读
相关标签