赞
踩
简介:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ服务支持下列操作系统:
RabbitMQ支持下列编程语言:
主要特性:
以上摘自百度百科
RabbitMQ核心:
RabbitMQ常用的有5种工作模式,分别为简单模式、工作模式、发布/订阅模式、路由模式、主题模式。
在这里用到了消息发送确认和消息接收确认机制:
一般来讲当生产端发送消息给RabbitMQ之后,就默认消息发送成功,同理,RabbitMQ给消费者发送一条消息后也会默认消费端接收了消息,并从队列中删除已发送的消息,但是这样就存在消息丢失的可能。比如,消费端接收消息后,业务逻辑异常,那么消息就相当于未处理完全,需要重新处理,但是此刻RabbitMQ已在队列中删除了此消息,也就导致消息丢失;或者在加入了spring事务管理机制时,虽然消费端正确处理了消息,但是后续业务出错,导致事务出错而回滚,此时也就相当于消息丢失了。
这时候引入了消息确认机制,首先消息发送确认,在消息到达exchange时会给发送端返回一个发生成功或失败的结果,以及消息的标识,这个时候可以对消息发送成功或失败作一些处理,在消息到达queue时同样会给发送端返回结果。再说消息接收确认,当消息发送给消费端时,RabbitMQ会等待消费端回复确认处理才会从队列中删除掉消息,如果一直等不到回复,就一直标记消息为待确认状态,且不会再给该消费端发送消息,直到消费端确认消息才会继续给消费端发送消息,若消费端宕机或断开连接,RabbitMQ就会把此消费端未确认的消息发送给下一个消费端处理
RabbitMQ支持事务:
事务的实现还是在信道(channel)设置上,方法主要有三个:
1、channel.txSelect()开启事务
2、channel.txComment()提交事务
3、channel.txRollback()回滚事务
采用事务机制能保证消息到达RabbitMQ,但是采用事务机制也就导致了发送消息到RabbitMQ需要等待消息到达确认,性能上大大的降低了。
这里采用消息发送确认机制(publisher Confirm/publisher Return)和消息接收确认(ACK手动确认模式)
一、RabbitMQ常用五种工作模式之一简单模式:
1、创建一个Springboot项目,集成RabbitMQ,在pom.xml里添加如下代码,带注释的为添加rabbit依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.2.4.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.jx</groupId>
- <artifactId>rabbitmq</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>rabbitmq</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!--添加rabbitmq依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
2、在application.properties里添加RabbitMQ配置信息
-
- #rabbitmq配置
- #访问地址为本机,端口号5672,账号/密码:guest
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- #消费数量
- spring.rabbitmq.listener.simple.concurrency=3
- #最大消费数量
- spring.rabbitmq.listener.simple.max-concurrency=20
- #消费者每次从队列获取的消息数量。
- spring.rabbitmq.listener.simple.prefetch=1
- #消费接收确认机制-手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、编写RabbitmqConfig配置类,其中消费者监听器为自定义类,后面编写
- package com.jx.rabbitmq.queue;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
- @Configuration
- public class RabbitmqConfig {
- //创建日志
- private static final Logger loger = LoggerFactory.getLogger(RabbitmqConfig.class);
- //创建一个简单队列
- @Bean
- public Queue simpleQueue(){
- return new Queue("simple",true);
- }
- //注入连接工厂
- @Autowired
- private CachingConnectionFactory cachingConnectionFactory;
- //监听器容器配置
- @Autowired
- private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
- //注入消费者监听器
- @Autowired
- private ConsumersListener consumersListener;
- @Bean
- public SimpleMessageListenerContainer comsumersListenerContainer(){
- //创建监听容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听容器配置和链接工厂注入监听容器工厂
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听容器工厂创建监听容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听容器指定监听器
- simpleMessageListenerContainer.setMessageListener(consumersListener);
- //指定监听器监听队列
- simpleMessageListenerContainer.setQueueNames("simple");
- return simpleMessageListenerContainer;
- }
- }
4、编写消费者监听器,实现ChannelAwareMessageListener接口,这里先把获取tag和确认消息注释掉,后面打开
- package com.jx.rabbitmq.queue;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class ConsumersListener implements ChannelAwareMessageListener {
- //创建日志
- private static final Logger loger = LoggerFactory.getLogger(ConsumersListener.class);
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- loger.info("消费者监听到一条消息-----------");
- // //获取tag
- // long tag = message.getMessageProperties().getDeliveryTag();
- //获取监听消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者:"+str);
- // //确认消息
- // channel.basicAck(tag,true);
- }catch (Exception e){
- loger.error("程序异常------------");
- }
- }
- }
5、编写生产者类
- package com.jx.rabbitmq.queue;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Producers {
- //创建日志
- private static final Logger logger = LoggerFactory.getLogger(Producers.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void send(){
- Message message = MessageBuilder.withBody(new String("生产者发送一条消息").getBytes()).build();
- rabbitTemplate.convertAndSend("simple",message);
- logger.info("消息发送完成");
- }
- }
6、编写ProducersTest测试类
- package com.jx.rabbitmq.queue;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
-
- import static org.junit.jupiter.api.Assertions.*;
- @SpringBootTest
- class ProducersTest {
- @Autowired
- private Producers producers;
- @Test
- public void test(){
- producers.send();
- }
-
- }
7、启动springboot
7.1、执行测试类
连续执行2次测试类发现,消费者监听到2条消息,管理工具查看到2条未确认消息,因为配置信息为手动确认,在编写消息监听时注释了确认消息代码
关闭springboot,消息重回队列
7.2、重启springboot
将消息监听类中的注释代码放开,再次执行2次测试类
确认消息有三种模式:无需确认(none)、手动确认(manual)、自动确认(auto)。
在springboot的配置文件里设置spring.rabbitmq.listener.simple.acknowledge-mode
在配置文件里添加spring.rabbitmq.listener.simple.acknowledge-mode=manual即为手动确认消息
二、RabbitMQ常用五种工作模式之二工作模式:
2.1编辑WorkRabbitmqConfig类,其中三个消费端监听器在后面补充
- package com.jx.rabbitmq.work;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
-
- @Configuration
- public class WorkRabbitmqConfig {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(WorkRabbitmqConfig.class);
- //创建队列
- @Bean
- public Queue workQueue(){
- return new Queue("work_queue",true);
- }
- //注入链接工厂
- @Autowired
- private CachingConnectionFactory cachingConnectionFactory;
-
- //监听器容器配置
- @Autowired
- private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
- //注入消费监听器
- @Autowired
- private ConsumersListenerA consumersListenerA;
- @Autowired
- private ConsumersListenerB consumersListenerB;
- @Autowired
- private ConsumersListenerC consumersListenerC;
- //注入监听器容器
- @Bean
- public SimpleMessageListenerContainer workListenerContainerA(){
- //创建监听器容器
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听器容器配置和链接工厂注入监听器容器
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听器容器工厂创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听器容器指定监听队列
- simpleMessageListenerContainer.setQueueNames("work_queue");
- //指定监听器
- simpleMessageListenerContainer.setMessageListener(consumersListenerA);
- return simpleMessageListenerContainer;
- }
- @Bean
- public SimpleMessageListenerContainer workListenerContainerB(){
- //创建监听器容器
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听器容器配置和链接工厂注入监听器容器
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听器容器工厂创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听器容器指定监听队列
- simpleMessageListenerContainer.setQueueNames("work_queue");
- //指定监听器
- simpleMessageListenerContainer.setMessageListener(consumersListenerB);
- return simpleMessageListenerContainer;
- }
- @Bean
- public SimpleMessageListenerContainer workListenerContainerC(){
- //创建监听器容器
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听器容器配置和链接工厂注入监听器容器
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听器容器工厂创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听器容器指定监听队列
- simpleMessageListenerContainer.setQueueNames("work_queue");
- //指定监听器
- simpleMessageListenerContainer.setMessageListener(consumersListenerC);
- return simpleMessageListenerContainer;
- }
-
-
- }
2.2、编辑ConsumersListenerA、ConsumersListenerB、ConsumersListenerC分别实现ChannelAwareMessageListener接口
- package com.jx.rabbitmq.work;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class ConsumersListenerA implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(ConsumersListenerA.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- logger.info("监听到一条消息-------------------");
- //获取tag
- long tag = message.getMessageProperties().getDeliveryTag();
- //获取监听消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者A:"+str+"-----"+tag);
- //确认消息
- channel.basicAck(tag,true);
-
- }catch (Exception e){
- logger.error("监听异常");
- }
- }
- }
- package com.jx.rabbitmq.work;
-
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class ConsumersListenerB implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(ConsumersListenerB.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try{
- logger.info("监听到一条消息-------------------");
- //获取tag
- long tag = message.getMessageProperties().getDeliveryTag();
- //获取消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者B:"+str+"-----"+tag);
- //确认消息
- channel.basicAck(tag,true);
- }catch (Exception e){
- logger.error("监听异常");
- }
- }
- }
- package com.jx.rabbitmq.work;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class ConsumersListenerC implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(ConsumersListenerC.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- logger.info("监听到一条消息-------------------");
- //获取tag
- long tag = message.getMessageProperties().getDeliveryTag();
- //获取消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者C:"+str+"-----"+tag);
- //确认消息
- channel.basicAck(tag,true);
- }catch (Exception e){
- logger.error("监听异常");
- }
-
- }
- }
2.3、编辑WorkProducers生产者类
- package com.jx.rabbitmq.work;
-
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.BeanFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class WorkProducers {
-
- //创建日志
- private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void send() throws Exception{
- for(int i=0;i<3;i++){
- System.out.println("-----"+rabbitTemplate+"-------\n");
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了第"+i+"条消息").getBytes()).build();
- //发送消息
- rabbitTemplate.convertAndSend("work_queue",message);
- logger.info("消息发送完成");
- //线程睡眠100毫秒
- Thread.sleep(100);
- }
-
- }
- }
2.4、启动springboot
2.4.1、首先注释掉生产者代码里的Thread.sleep(100),执行test
在这里可以看到,消息好像并没有轮询分配给三个消费端,且消费端处理数据为什么是无序的呢?
首先看第一个问题,为什么没有轮询?看配置文件,在设置spring.rabbitmq.listener.simple.concurrency时设置了消费数量为3,正好在生产端发送了三条消息到队列,导致三条消息全部被消费者A获取。
再看第二个问题,为什么消息是无序的?因为在理想环境下RabbitMQ发送消息到队列是“有序”的,但实际上由于网络原因、或其他如发送异常等会造成消息实际发送到队列时已经是“无序”了,这里假设其他正常,把Tread.sleep(100)加上再次执行会发现,这个时候消费顺序就是“有序”的
再将生产端代码循环执行20次
三、RabbitMQ常用五种工作模式之三发布/订阅模式:
3.1、编辑application.properties,这里我们用到了消息发送确认,所以需要在配置文件里开启相关配置,在末尾添加。
- #rabbitmq配置
- #访问地址为本机,端口号5672,账号/密码:guest
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- #消费数量
- spring.rabbitmq.listener.simple.concurrency=3
- #最大消费数量
- spring.rabbitmq.listener.simple.max-concurrency=20
- #消费者每次从队列获取的消息数量。
- spring.rabbitmq.listener.simple.prefetch=1
- #消费接收确认机制-手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
- # 开启发送确认
- spring.rabbitmq.publisher-confirms=true
3.2、编辑FanoutRabbitmqConfig类,在这里将RabbitTemplate统一配置,消息到达和未到达交换器的处理
- package com.jx.rabbitmq.fanout;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
-
-
- @Configuration
- public class FanoutRabbitmqConfig {
- private static final Logger logger = LoggerFactory.getLogger(FanoutRabbitmqConfig.class);
- //创建一个队列
- //第一个属性为队列名称,第二个属性为是否持久化队列
- @Bean
- public Queue queueA(){
- return new Queue("queueA",true);
- }
- @Bean
- public Queue queueB(){
- return new Queue("queueB",true);
- }
- //创建链接工厂
- @Autowired
- private CachingConnectionFactory cachingConnectionFactory;
- //创建监听器容器工厂配置
- @Autowired
- private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
- //创建一个fanout类型交换器
- //第一个属性name:交换器名称,第二个属性durable是否持久化,第三个属性autoDelete是否自动删除
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("fanoutExchange",true,true);
- }
-
- //创建绑定关系
- @Bean
- public Binding bindingQueueAToFanoutExchange(Queue queueA,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queueA).to(fanoutExchange);
- }
- @Bean
- public Binding bindingQueueBToFanoutExchange(Queue queueB,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(queueB).to(fanoutExchange);
- }
- @Bean
- @Scope(value = "prototype")
- public RabbitTemplate rabbitTemplate(){
- //创建RabbitMQTemplate对象
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- //设置链接工厂
- rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
- //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
- //第一个属性CorrelationData为消息标识
- //第二个属性为是否送达
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- if(b){
- logger.info("消息已送达至fanoutExchange交换器");
- logger.info("消息标识:"+correlationData.getId());
- }else {
- logger.info("消息未送达");
- logger.info("消息标识:"+correlationData.getId());
- }
- }
- });
- return rabbitTemplate;
- }
-
- //注入监听器容器相关配置
- @Bean
- public SimpleMessageListenerContainer FanoutListenerContainerA(Queue queueA,FanoutConsumersListenerA fanoutConsumersListenerA){
- //创建监听器容器
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听器容器配置和链接工厂注入监听器容器
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听器容器工厂创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听器容器指定监听队列
- simpleMessageListenerContainer.setQueues(queueA);
- //指定监听器
- simpleMessageListenerContainer.setMessageListener(fanoutConsumersListenerA);
- return simpleMessageListenerContainer;
- }
- @Bean
- public SimpleMessageListenerContainer FanoutListenerContainerB(Queue queueB,FanoutConsumersListenerB fanoutConsumersListenerB){
- //创建监听器容器
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //将监听器容器配置和链接工厂注入监听器容器
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //监听器容器工厂创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //监听器容器指定监听队列
- simpleMessageListenerContainer.setQueues(queueB);
- //指定监听器
- simpleMessageListenerContainer.setMessageListener(fanoutConsumersListenerB);
- return simpleMessageListenerContainer;
- }
-
- }
3.3、编辑2个消费端类FanoutConsumersListenerA和FanoutConsumersListenerB,分别监听队列A和B
- package com.jx.rabbitmq.fanout;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class FanoutConsumersListenerA implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(FanoutConsumersListenerA.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- logger.info("监听到一条消息来自于queueA-------------------");
- //获取tag
- long tag = message.getMessageProperties().getDeliveryTag();
- //获取消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者A:"+str);
- //确认消息
- channel.basicAck(tag,true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
- package com.jx.rabbitmq.fanout;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class FanoutConsumersListenerB implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(FanoutConsumersListenerB.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
-
- try {
- logger.info("监听到一条消息来自于queueB-------------------");
- //获取tag
- long tag = message.getMessageProperties().getDeliveryTag();
- //获取消息体
- String str = new String(message.getBody(),"utf-8");
- //打印消息
- System.out.println("消费者B:"+str);
- //确认消息
- channel.basicAck(tag,true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
3.4、编辑生产者类FanoutPruducers
- package com.jx.rabbitmq.fanout;
-
- import com.jx.rabbitmq.work.WorkProducers;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
-
- @Component
- public class FanoutPruducers {
- //创建日志
- private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void send() throws Exception{
- for(int i=0;i<3;i++){
- //创建消息唯一标识
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了第"+i+"条消息").getBytes()).build();
- //发送消息
- //第一个属性exchange指定交换器
- //第二个属性路由键,这里为fanout类型交换器群发所以无需指定
- //第三个属性为需要发送的消息
- //第四个属性为消息唯一标识
- rabbitTemplate.convertAndSend("fanoutExchange","",message,correlationData);
- logger.info("消息发送完成");
- //线程睡眠100毫秒
- Thread.sleep(100);
- }
-
- }
- }
3.5、编辑测试类FanoutPruducersTest
- package com.jx.rabbitmq.fanout;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import static org.junit.jupiter.api.Assertions.*;
- @SpringBootTest
- class FanoutPruducersTest {
- @Autowired
- private FanoutPruducers fanoutPruducers;
- @Test
- public void send()throws Exception{
- fanoutPruducers.send();
- }
- }
3.6、启动springboot
3.6.1、执行test,消息发送到fanout类型交换器,会将消息群发到所有与之绑定的队列中
再看看消息发送确认回调,所有消息都是送达
现在修改一下FanoutPruducers类,将发送指定的exchange更改一下,使消息无法送达,再执行一次test
消息均未送达到exchange,这时我们就需要做相应处理了
四、RabbitMQ常用五种工作模式之一路由模式:
4.1、编辑application.properties,在末尾加入spring.rabbitmq.publisher-returns=true开启消息发送失败返回
-
-
- #rabbitmq配置
- #访问地址为本机,端口号5672,账号/密码:guest
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- #消费数量
- spring.rabbitmq.listener.simple.concurrency=3
- #最大消费数量
- spring.rabbitmq.listener.simple.max-concurrency=20
- #消费者每次从队列获取的消息数量。
- spring.rabbitmq.listener.simple.prefetch=1
- #消费接收确认机制-手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
- # 开启发送确认
- spring.rabbitmq.publisher-confirms=true
-
- # 开启发送失败退回
- spring.rabbitmq.publisher-returns=true
-
4.2、编辑DirectRabbitmqConfig,在rabbitTemplate里设置消息发送确认回调和发送失败返回处理
- package com.jx.rabbitmq.direct;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
-
-
- @Configuration
- public class DirectRabbitmqConfig {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
-
- //链接工厂
- @Autowired
- private CachingConnectionFactory cachingConnectionFactory;
- //监听器容器工厂配置
- @Autowired
- private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
-
- //创建队列
- //第一个属性队列名称,第二个属性是否持久化
- @Bean
- public Queue directQueueA(){
- return new Queue("directQueueA",true);
- }
- @Bean
- public Queue directQueueB(){
- return new Queue("directQueueB",true);
- }
- //创建direct类型交换器
- //第一个属性交换器名称,第二个属性是否持久化,第三个属性是否自动删除,当没有队列与之绑定时会自定删除此交换器
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("directExchange",true,true);
- }
- //创建绑定关系
- @Bean
- public Binding bindingQueueAToDirectExchange(Queue directQueueA,DirectExchange directExchange){
- return BindingBuilder.bind(directQueueA).to(directExchange).with("A");
- }
- @Bean
- public Binding bindingQueueBToDirectExchange(Queue directQueueB,DirectExchange directExchange){
- return BindingBuilder.bind(directQueueB).to(directExchange).with("B");
- }
- //注入RabbitTemplate
- @Bean
- @Scope(value = "prototype")
- public RabbitTemplate directRabbitTemplate(){
- //创建RabbitTemplate
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- //设置链接工厂
- rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
- //开启消息失败退回一定要将Mandatory设置为true,否则无效
- rabbitTemplate.setMandatory(true);
- //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
- //第一个属性CorrelationData为消息标识
- //第二个属性为是否送达
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- if(b){
- logger.info("消息已送达至directExchange交换器");
- logger.info("消息标识:"+correlationData.getId());
- }else {
- logger.info("消息未送达");
- logger.info("消息标识:"+correlationData.getId());
- }
- }
- });
- //消息未送达队列时返回
- //第一个属性失败消息
- //第二个属性失败响应代码
- //第三个属性失败原因
- //第四个属性发送消息到队列失败的交换器类型
- //第五个属性为生产端发送消息时指定的路由规则routing key
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int i, String s, String s1, String s2) {
- System.out.println("发送失败的消息:"+new String(message.getBody()).toString());
- System.out.println(i);
- System.out.println(s);
- System.out.println(s1);
- System.out.println(s2);
- }
- });
- return rabbitTemplate;
- }
-
- //注入监听器容器
- @Bean
- public SimpleMessageListenerContainer directSimpleMessageListenerContainerA(Queue directQueueA,DirectConsumersListenerA directConsumersListenerA){
- //创建监听器容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置监听器容器工厂配置
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //设置监听器容器监听队列
- simpleMessageListenerContainer.setQueues(directQueueA);
- //设置监听器
- simpleMessageListenerContainer.setMessageListener(directConsumersListenerA);
- return simpleMessageListenerContainer;
-
- }
- @Bean
- public SimpleMessageListenerContainer directSimpleMessageListenerContainerB(Queue directQueueB,DirectConsumersListenerB directConsumersListenerB){
- //创建监听器容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置监听器容器工厂配置
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //设置监听器容器监听队列
- simpleMessageListenerContainer.setQueues(directQueueB);
- //设置监听器
- simpleMessageListenerContainer.setMessageListener(directConsumersListenerB);
- return simpleMessageListenerContainer;
-
- }
-
-
-
-
- }
注意:设置消息发送失败返回一定要设置rabbitTemplate.setMandatory(true);否则不会生效
4.3、编辑DirectConsumersListenerA和DirectConsumersListenerB
- package com.jx.rabbitmq.direct;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class DirectConsumersListenerA implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- //获取消息
- String str = new String(message.getBody());
- //打印消息
- System.out.println("消费者A:"+str);
- //确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
- package com.jx.rabbitmq.direct;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class DirectConsumersListenerB implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(DirectRabbitmqConfig.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- //获取消息
- String str = new String(message.getBody());
- //打印消息
- System.out.println("消费者B:"+str);
- //确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
4.4、编辑DirectPruducers和DirectPruducersTest
- package com.jx.rabbitmq.direct;
-
- import com.jx.rabbitmq.work.WorkProducers;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
-
-
- @Component
- public class DirectPruducers {
- //创建日志
- private static final Logger logger = LoggerFactory.getLogger(WorkProducers.class);
- @Autowired
- private RabbitTemplate directRabbitTemplate;
- public void sendA() throws Exception{
- //创建消息唯一标识
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到A").getBytes()).build();
- //发送消息
- //第一个属性exchange指定交换器
- //第二个属性路由键
- //第三个属性为需要发送的消息
- //第四个属性为消息唯一标识
- directRabbitTemplate.convertAndSend("directExchange","A",message,correlationData);
- logger.info("消息发送完成");
- }
- public void sendB() throws Exception{
- //创建消息唯一标识
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到B").getBytes()).build();
- //发送消息
- //第一个属性exchange指定交换器
- //第二个属性路由键
- //第三个属性为需要发送的消息
- //第四个属性为消息唯一标识
- directRabbitTemplate.convertAndSend("directExchange","B",message,correlationData);
- System.out.println(message);
- logger.info("消息发送完成");
- }
- public void sendC() throws Exception{
- //创建消息唯一标识
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了一条消息到C").getBytes()).build();
- //发送消息
- //第一个属性exchange指定交换器
- //第二个属性路由键
- //第三个属性为需要发送的消息
- //第四个属性为消息唯一标识
- directRabbitTemplate.convertAndSend("directExchange","C",message,correlationData);
- System.out.println(message);
- logger.info("消息发送完成");
- }
- }
- package com.jx.rabbitmq.direct;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import static org.junit.jupiter.api.Assertions.*;
-
- @SpringBootTest
- class DirectPruducersTest {
- @Autowired
- private DirectPruducers directPruducers;
-
- @Test
- public void send()throws Exception{
- directPruducers.sendA();
- directPruducers.sendB();
- directPruducers.sendC();
- }
-
-
- }
4.5、启动springboot
4.5.1、执行test
生产端一共发送了三条消息,且三条消息都送到了交换器,但是消费端A和B各监听到一条,
第三条消息的routing key设置为C,但是directExchange交换器并没有匹配到该规则下绑定的队列,导致消息发送失败返回
五、RabbitMQ常用五种工作模式之一主题模式:
5.1、编辑application.properties
-
-
- #rabbitmq配置
- #访问地址为本机,端口号5672,账号/密码:guest
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- #消费数量
- spring.rabbitmq.listener.simple.concurrency=3
- #最大消费数量
- spring.rabbitmq.listener.simple.max-concurrency=20
- #消费者每次从队列获取的消息数量。
- spring.rabbitmq.listener.simple.prefetch=1
- #消费接收确认机制-手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
- # 开启发送确认
- spring.rabbitmq.publisher-confirms=true
-
- # 开启发送失败退回
- spring.rabbitmq.publisher-returns=true
-
5.2、编辑TopicRabbitmqConfig
- package com.jx.rabbitmq.topic;
-
- import com.jx.rabbitmq.direct.DirectConsumersListenerA;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
-
- @Configuration
- public class TopicRabbitmqConfig {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(TopicRabbitmqConfig.class);
- //链接工厂
- @Autowired
- private CachingConnectionFactory cachingConnectionFactory;
- //监听器容器工厂配置
- @Autowired
- private SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer;
- //创建队列
- //第一个属性队列名称,第二个属性是否持久化
- @Bean
- public Queue topicQueueA(){
- return new Queue("topicQueueA",true);
- }
- @Bean
- public Queue topicQueueB(){
- return new Queue("topicQueueB",true);
- }
- @Bean
- public Queue topicQueueC(){
- return new Queue("topicQueueC",true);
- }
- //创建topic类型交换器
- //第一个属性交换器名称,第二个属性是否持久化,第三个属性是否自动删除,当没有队列与之绑定时会自定删除此交换器
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("topicExchange",true,true);
- }
- //创建绑定关系
- @Bean
- public Binding bindingTopicQueueAToTopicExchange(Queue topicQueueA, TopicExchange topicExchange){
- return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topic.A");
- }
- @Bean
- public Binding bindingTopicQueueBToTopicExchange(Queue topicQueueB,TopicExchange topicExchange){
- return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topic.*");
- }
- @Bean
- public Binding bindingTopicQueueCToTopicExchange(Queue topicQueueC,TopicExchange topicExchange){
- return BindingBuilder.bind(topicQueueC).to(topicExchange).with("topic.#");
- }
- //注入RabbitTemplate
- @Bean
- @Scope(value = "prototype")
- public RabbitTemplate topicRabbitTemplate(){
- //创建RabbitTemplate
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- //设置链接工厂
- rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
- //开启消息失败退回一定要将Mandatory设置为true,否则无效
- rabbitTemplate.setMandatory(true);
- //设置消息送达交换器的回调函数,使用此功能必须设置spring.rabbitmq.publisher-confirms=true
- //第一个属性CorrelationData为消息标识
- //第二个属性为是否送达
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- if(b){
- logger.info("消息已送达至topicExchange交换器");
- logger.info("消息标识:"+correlationData.getId());
- }else {
- logger.info("消息未送达");
- logger.info("消息标识:"+correlationData.getId());
- }
- }
- });
- //消息未送达队列时返回
- //第一个属性失败消息
- //第二个属性失败响应代码
- //第三个属性失败原因
- //第四个属性发送消息到队列失败的交换器类型
- //第五个属性为生产端发送消息时指定的路由规则routing key
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(Message message, int i, String s, String s1, String s2) {
- System.out.println("发送失败的消息:"+new String(message.getBody()).toString());
- System.out.println(i);
- System.out.println(s);
- System.out.println(s1);
- System.out.println(s2);
- }
- });
- return rabbitTemplate;
- }
- //注入监听器容器
- @Bean
- public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerA(Queue topicQueueA, TopicConsumersListenerA topicConsumersListenerA){
- //创建监听器容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置监听器容器工厂配置
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //设置监听器容器监听队列
- simpleMessageListenerContainer.setQueues(topicQueueA);
- //设置监听器
- simpleMessageListenerContainer.setMessageListener(topicConsumersListenerA);
- return simpleMessageListenerContainer;
-
- }
- @Bean
- public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerB(Queue topicQueueB, TopicConsumersListenerB topicConsumersListenerB){
- //创建监听器容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置监听器容器工厂配置
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //设置监听器容器监听队列
- simpleMessageListenerContainer.setQueues(topicQueueB);
- //设置监听器
- simpleMessageListenerContainer.setMessageListener(topicConsumersListenerB);
- return simpleMessageListenerContainer;
-
- }
- @Bean
- public SimpleMessageListenerContainer TopicSimpleMessageListenerContainerC(Queue topicQueueC, TopicConsumersListenerC topicConsumersListenerC){
- //创建监听器容器工厂
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置监听器容器工厂配置
- simpleRabbitListenerContainerFactoryConfigurer.configure(simpleRabbitListenerContainerFactory,cachingConnectionFactory);
- //创建监听器容器
- SimpleMessageListenerContainer simpleMessageListenerContainer = simpleRabbitListenerContainerFactory.createListenerContainer();
- //设置监听器容器监听队列
- simpleMessageListenerContainer.setQueues(topicQueueC);
- //设置监听器
- simpleMessageListenerContainer.setMessageListener(topicConsumersListenerC);
- return simpleMessageListenerContainer;
-
- }
-
- }
5.3、编辑TopicConsumersListenerA、TopicConsumersListenerB和TopicConsumersListenerC
- package com.jx.rabbitmq.topic;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
- import java.util.Set;
-
-
- @Component
- public class TopicConsumersListenerA implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerA.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- //获取消息所在队列名
- String queueName = message.getMessageProperties().getConsumerQueue();
- //获取生产端指定匹配规则
- String routingKey = message.getMessageProperties().getReceivedRoutingKey();
- //获取消息
- String str = new String(message.getBody());
- //打印消息
- System.out.println("消费者A监听到消息来自"+queueName+"队列绑定规则为:topic.A,生产端指定匹配规则为"+routingKey+":"+str);
- //确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
- package com.jx.rabbitmq.topic;
-
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class TopicConsumersListenerB implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerB.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- //获取消息所在队列名
- String queueName = message.getMessageProperties().getConsumerQueue();
- //获取匹配规则
- String routingKey = message.getMessageProperties().getReceivedRoutingKey();
- //获取消息
- String str = new String(message.getBody());
- //打印消息
- System.out.println("消费者B监听到消息来自"+queueName+"队列绑定规则为:topic.*,生产端指定匹配规则为"+routingKey+":"+str);
- //确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }catch (Exception e){
- logger.error("程序异常");
- }
- }
- }
- package com.jx.rabbitmq.topic;
-
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class TopicConsumersListenerC implements ChannelAwareMessageListener {
- //创建日志
- private static Logger logger = LoggerFactory.getLogger(TopicConsumersListenerC.class);
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- try {
- //获取消息所在队列名
- String queueName = message.getMessageProperties().getConsumerQueue();
- //获取匹配规则
- String routingKey = message.getMessageProperties().getReceivedRoutingKey();
- //获取消息
- String str = new String(message.getBody());
- //打印消息
- System.out.println("消费者C监听到消息来自"+queueName+"队列绑定规则为:topic.#,生产端指定匹配规则为"+routingKey+":"+str);
- //确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
- }catch (Exception e){
- logger.error("程序异常");
- }
-
- }
- }
5.4、编辑TopicPruducers和TopicPruducersTest
- package com.jx.rabbitmq.topic;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
-
- @Component
- public class TopicPruducers {
- //创建日志
- private static final Logger logger = LoggerFactory.getLogger(TopicPruducers.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void send(String routingKey) throws Exception{
- //创建消息唯一标识
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //创建消息
- Message message = MessageBuilder.withBody(new String("生产者发送了一条消息").getBytes()).build();
- //发送消息
- //第一个属性exchange指定交换器
- //第二个属性路由键
- //第三个属性为需要发送的消息
- //第四个属性为消息唯一标识
- rabbitTemplate.convertAndSend("topicExchange",routingKey,message,correlationData);
- logger.info("消息发送完成");
- }
- }
- package com.jx.rabbitmq.topic;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import static org.junit.jupiter.api.Assertions.*;
-
- @SpringBootTest
- class TopicPruducersTest {
- @Autowired
- private TopicPruducers topicPruducers;
- @Test
- public void send()throws Exception{
- topicPruducers.send("topic.A");
- Thread.sleep(100);
- topicPruducers.send("topic.abc.aaa");
- }
-
- }
5.5、启动springboot
5.5.1、执行test
生产端发送消息指定路由为topic.A匹配到3个队列,绑定关系分别为topic.A、topic.*和topic.#。
生产端发送消息指定路由为topic.abc.aaa匹配到1个队列,绑定关系为topic.#。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。