赞
踩
前端页面访问:http://localhost:15672/
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
MQ的优势:
MQ分类:
1.ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较
低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
2.Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。
优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消 息响应时间变长,使用短轮询方式,社区更新较慢;
3.RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场 景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,**MQ 功能较为完善,还是分布式的,扩展性好,**支持 10 亿级别的消息堆积。
缺点:支持的客户端语言不多,目前是 java 及 c++。
4.RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP等,社区活跃度高;更新频率相当高
缺点:商业版需要收费,学习成本较高
概念:
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
四大核心概念:
生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
RabbitMQ六大模式:
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Channel:。Channel 是在 connection 内部建立的逻辑连接,Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
brew install rabbitmq
//下载好是rabbitMQ 3.10.2版本
cd /opt/homebrew/Cellar/rabbitmq/3.10.2/
// 启用rabbitmq management插件
sudo sbin/rabbitmq-plugins enable rabbitmq_management
// 后台启动
sudo rabbitmq-server -detached //参数detatched表示以守护线程方式启动
// 查看状态
sudo rabbitmqctl status
// 访问可视化监控插件的界面
// 浏览器内输入 http://localhost:15672,默认的用户名密码都是guest,登录后可以在Admin那一列菜单内添加自己的用户
//关闭
sudo rabbitmqctl stop
添加用户
//查看当前用户列表
sudo rabbitmqctl list_users
//添加用户
sudo rabbitmqctl add_user sixcandy sixcandy
//将用户角色设置成为超级管理员
sudo rabbitmqctl set_user_tags sixcandy administrator
//设置用户权限
sudo rabbitmqctl set_permissions -p "/" sixcandy ".*" ".*" ".*"
“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区
创建一个maven工程:
导入以下依赖:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
消息生产者代码:
package com.candy.rabbit.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 消息生产者 */ public class Product { //队列名称 public static final String QUEUE_NAME = "hello"; //发消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂id:连接RabbitMQ队列 factory.setHost("127.0.0.1"); factory.setUsername("sixcandy"); factory.setPassword("sixcandy"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /**创建一个队列( 队列名称, 队列里面的消息是否持久化 默认消息存储在内存中, 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费, 是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除, 其他参数 ) **/ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发消息 String message = "hello,sixcandy!"; /** * basicPublish( * 发送到哪个交换机上, * 路由的key * 其他参数, * 发送消息的消息体 * ) */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("嗖!fly!"); } }
消费者代码:
package com.candy.rabbit.hello; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消费者代码 */ public class Consumer { //队列名称 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //工厂id:连接RabbitMQ队列 factory.setHost("127.0.0.1"); factory.setUsername("sixcandy"); factory.setPassword("sixcandy"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); System.out.println("我一直都在等待···"); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback=(consumerTag, delivery)-> { //拿到消息体 String message= new String(delivery.getBody()); //打印消息体 System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback=(consumerTag)-> { System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * basicConsume( * 消费哪个队列, * 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答, * 消费者未成功消费的回调, * 消费者取消消费的回调 * ) */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
抽取RabbitMqUtils工具类:
/** * rabbitmq工具类 */ public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("sixcandy"); factory.setPassword("sixcandy"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
工作线程代码:
package com.candy.rabbit.WorkQueues; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 工作线程——消费者 */ public class Work1 { //队列名称 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws Exception { //调取工具类,获得信道 Channel channel = RabbitMqUtils.getChannel(); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback=(consumerTag, delivery)-> { //拿到消息体 String message= new String(delivery.getBody()); //打印消息体 System.out.println("接收到的消息:"+message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback=(consumerTag)-> { System.out.println("消息消费被中断"); }; //消息的接收 System.out.println("c1等待····"); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
启动两次main函数,就会有两个线程:
生产者代码:
package com.candy.rabbit.WorkQueues; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.Scanner; /** * 生产者——发送大量消息 */ public class Task1 { //队列名称 public static final String QUEUE_NAME = "hello"; //发送消息 public static void main(String[] args) throws Exception { //调取工具类,获得信道 Channel channel = RabbitMqUtils.getChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台接收消息从而发送消息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){//是否还有下一个输入 String message = scanner.next();//获取到当前输入的字符串 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("fly:"+message); } } }
消息应答:为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
分为自动应答(了解即可)、手动应答(以下都是讲手动应答):手动应答的好处是可以批量应答并且减少网络拥堵
应答的方法
1.Channel.basicAck(用于肯定确认) : RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
2.Channel.basicNack(用于否定确认)
3.Channel.basicReject(用于否定确认) :与 Channel.basicNack 相比少一个参数(批量应答) 不处理该消息了直接拒绝,可以将其丢弃了
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
multiple : true 代表批量应答 channel 上未应答的消息,false代表只应答当前消息、
消息生产者:
package com.candy.rabbit.WorkQueues; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.Scanner; /** * 手动应答——生产者 */ public class Task2 { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); Scanner scanner= new Scanner(System.in); System.out.println("--请输入暗号:--"); while (scanner.hasNext()){ String anhao = scanner.nextLine(); channel.basicPublish("",TASK_QUEUE_NAME,null,anhao.getBytes(StandardCharsets.UTF_8)); System.out.println("暗号是:"+anhao); } } }
消息的两个消费者:
package com.candy.rabbit.WorkQueues; import com.candy.rabbit.utils.RabbitMqUtils; import com.candy.rabbit.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 手动应答——消费者1 */ public class Work3 { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("浩翔在等待···"); //消息消费的时候如何处理消息 DeliverCallback deliverCallback=(consumerTag, delivery)->{ String message= new String(delivery.getBody(),"utf-8"); SleepUtils.sleep(1); System.out.println("暗号是:"+message); /** * 1.消息标记 tag * 2.是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //采用手动应答 boolean ack = false; channel.basicConsume(TASK_QUEUE_NAME,ack,deliverCallback,(consumerTag ->{ System.out.println(consumerTag+"不听不听,王八念经"); })); } }
package com.candy.rabbit.WorkQueues; import com.candy.rabbit.utils.RabbitMqUtils; import com.candy.rabbit.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 手动应答——消费者2 */ public class Work4 { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("linlin在等待···"); //消息消费的时候如何处理消息 DeliverCallback deliverCallback=(consumerTag, delivery)->{ String message= new String(delivery.getBody(),"utf-8"); SleepUtils.sleep(7); System.out.println("暗号是:"+message); /** * 1.消息标记 tag * 2.是否批量应答未应答消息 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //采用手动应答 boolean ack = false; channel.basicConsume(TASK_QUEUE_NAME,ack,deliverCallback,(consumerTag ->{ System.out.println(consumerTag+"不听不听,王八念经"); })); } }
睡眠工具类:
package com.candy.rabbit.utils;
/**
* 睡眠工具类
*/
public class SleepUtils {
public static void sleep(int second){
try { Thread.sleep(1000*second);
} catch (InterruptedException _ignored) { Thread.currentThread().interrupt();
}
}
}
确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。
队列持久化设置(生产者):
//设置持久化
boolean duration = true;
channel.queueDeclare(TASK_QUEUE_NAME,duration,false,false,null);
消息持久化设置(生产者):
//设置消息持久化
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,anhao.getBytes(StandardCharsets.UTF_8));
不公平分发:
//不公平分发(消费者)
int unfair = 1;
channel.basicQos(unfair);
一共发送了4个消息:
第一个消费者获取到3个消息:
第二个消费者只获得1个消息:
预取值:
第一个消费者:
//不公平分发,预期值:4
int unfair = 4;
channel.basicQos(unfair);
第二个消费者:
//不公平分发,预期值:2
int unfair = 2;
channel.basicQos(unfair);
一共有6个信息:
第一个消费者收到4个信息:
第二个消费者收到2个消息:
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始)
开启发布确认的方法:
单个确认发布
//单个确认 public static void publishOne() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量发消息 for (int i = 0;i<MSG_COUNT;i++){ String msg = "tnt" + i; channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8)); boolean isSucc = channel.waitForConfirms(); if (isSucc){ System.out.println("boom!!!"); } } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MSG_COUNT+"个消息,耗时:"+(end-begin)+"ms"); }
批量确认发布
//批量确认 public static void publishMany() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量确认长度 int confirmSize = 2; //魏确认消息个数 int notConfirm= 0; //批量发消息 for (int i = 0;i<MSG_COUNT;i++){ String msg = "tnt" + i; channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8)); if (notConfirm == confirmSize) { channel.waitForConfirms(); notConfirm = 0; } } //为了确保还有剩余没有确认消息 再次确认 if (notConfirm > 0) { channel.waitForConfirms(); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MSG_COUNT+"个消息,耗时:"+(end-begin)+"ms"); }
异步确认发布
//异步确认发布 public static void publishAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); channel.confirmSelect(); //保存未确认信息,线程安全有序的一个哈希表,适用于高并发的情况 // 1.轻松的将序号与消息进行关联 // 2.轻松批量删除条目 只要给到序列号 // 3.支持并发访问 ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>(); //消息确认成功回调函数 ConfirmCallback ack = (deliveryTag,multiple)->{ if (multiple){ ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); }else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的:"+deliveryTag); }; //消息确认失败回调函数 ConfirmCallback notack = (deliveryTag,multiple)->{ String notconfirmed = outstandingConfirms.get(deliveryTag); System.out.println("未确认:"+notconfirmed); }; /** * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.确认失败消息的回调 */ channel.addConfirmListener(ack,notack); //开始时间 long begin = System.currentTimeMillis(); //批量发送 for (int i = 0; i < MSG_COUNT; i++) { String msg = "tnt" + i; /** * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 通过序列号与消息体进行一个关联 * 全部都是未确认的消息体 */ outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8)); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MSG_COUNT+"个消息,耗时:"+(end-begin)+"ms"); }
三种方式效率对比:
// 1.单个确认 Publish1.publishOne();//发布7个消息,耗时:8ms // 2.批量确认 Publish1.publishMany();//发布7个消息,耗时:3ms // 3.异步批量确认 Publish1.publishAsync();//发布7个消息,耗时:耗时:2ms
- 1
- 2
- 3
- 4
- 5
- 6
生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)
交换机类型:直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout),无名用空串表示,就是默认交换机
创建临时队列:
String queueName = channel.queueDeclare().getQueue();
- 1
绑定:
我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式 称为 ”发布/订阅”
Fanout(发布订阅模式/广播模式):Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。不论routingkey是否相同,只要是绑定了一样的交换机都会收到消息。
消费者:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 交换机——消费者 */ public class ReLogs1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //生成随机队列,队列名称是随机的,当断开连接时,会自动删除 String queue = channel.queueDeclare().getQueue(); //绑定交换机与队列(队列名,交换机,RoutingKey) channel.queueBind(queue,EXCHANGE_NAME,""); System.out.println("--dadada--"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("TNT消费了:"+message); }; channel.basicConsume(queue,true,deliverCallback,consumerTag -> { }); } }
生产者:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.Scanner; /** * 交换机——生产者 */ public class EmitLog { private static final String EXCHAGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHAGE_NAME,"fanout"); System.out.println("请输入:"); Scanner sc = new Scanner(System.in); while (sc.hasNext()){ String msg = sc.nextLine(); channel.basicPublish(EXCHAGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送了:"+msg); } } }
生产者发送了两个消息:
消费者1都收到了:
消费者2也都收到了:
Direct(路由模式):队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);**绑定之后的 意义由其交换类型决定。
消费者1:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 直接交换机——消费者 */ public class ReDirect1 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String queueName = "tnt"; channel.queueDeclare(queueName, false, false, false, null); //一个队列绑定不同的routingkey channel.queueBind(queueName, EXCHANGE_NAME, "boom1"); channel.queueBind(queueName, EXCHANGE_NAME, "boom2"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("TNT消费了:"+message); }; channel.basicConsume(queueName,deliverCallback,consumerTag -> { }); } }
消费者2:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 直接交换机——消费者 */ public class ReDirect2 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String queueName = "baomihua"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "sdfj"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("baomihua消费了:"+message); }; channel.basicConsume(queueName,deliverCallback,consumerTag -> { }); } }
生产者:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Scanner; /** * 直接交换机——生产者 */ public class EmitDirect { private static final String EXCHAGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHAGE_NAME,"direct"); //创建多个绑定 Map<String,String> binding = new HashMap<>(); binding.put("boom1","boom"); binding.put("boom2","boomboom"); binding.put("sdfj","lf"); for (Map.Entry<String, String> bindingKeyEntry: binding.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHAGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } }
生产者给不同的routingkey发送不同的消息:
消费者1:
消费者2:
Topic():比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型
必须是一个单 词列表,以点号分隔开。
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
当队列绑定关系是下列这种情况时需要引起注意:
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了 ;
如果队列绑定键当中没有出现#或者,那么该队列绑定类型就是* direct 了
消费者1:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 主题交换机——消费者 */ public class ReTopic1 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String queueName = "tnt"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.tnt.*"); // channel.queueBind(queueName, EXCHANGE_NAME, "boom2"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("TNT消费了:"+message); }; channel.basicConsume(queueName,deliverCallback,consumerTag -> { }); } }
消费者2:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * 主题交换机——消费者 */ public class ReTopic2 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String queueName = "baomihua"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "baomihua.*.*"); // channel.queueBind(queueName, EXCHANGE_NAME, "boom2"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("baomihua消费了:"+message); }; channel.basicConsume(queueName,deliverCallback,consumerTag -> { }); } }
生产者:
package com.candy.rabbit.exchange; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.HashMap; import java.util.Map; /** * 主题交换机——生产者 */ public class EmitTopic { private static final String EXCHAGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHAGE_NAME,"topic"); //创建多个绑定 Map<String,String> binding = new HashMap<>(); binding.put("sdfj.tnt.baomihua","被tnt接收"); binding.put("baomihua.tnt.sdfj","被爆米花、tnt接收"); binding.put("candy.tnt.candy","只被tnt接收"); binding.put("baomihua.candy.candy","只被爆米花接收"); for (Map.Entry<String, String> bindingKeyEntry: binding.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHAGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } }
概念:顾名思义就是无法被消费的消息
产生的原因:消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
消息TTL过期
消费者1:
package com.candy.rabbit.dead; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; /** * 死信队列——消费者1 */ public class Consumer1 { //普通交换机 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 private static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明普通队列、死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE,"direct"); channel.exchangeDeclare(DEAD_EXCHANGE,"direct"); //声明死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //绑定死信交换机与死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"sdfj"); //声明普通队列,添加其他参数 Map<String, Object> params = new HashMap<>(); // //设置过期时间,10s,可以在生产者设置 // params.put("x-messge-ttl",1000); //正常队列绑定死信交换机,key为固定值 params.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信routingKey,key为固定值 params.put("x-dead-letter-routing-key","sdfj"); channel.queueDeclare(NORMAL_QUEUE,false,false,false,params); //绑定普通交换机与普通队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"tnt"); //接收消息 System.out.println("tnt正在接听···"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("TNT听到了:"+message); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> { }); } }
生产者:
package com.candy.rabbit.dead; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; /** * 死信队列——生产者 */ public class Product1 { //普通交换机 private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //死信消息,设置TTL时间 AMQP.BasicProperties pros = new AMQP.BasicProperties().builder().expiration("10000").build(); //发10条消息 for (int i = 0; i < 10; i++) { String msg = "tnt"+i; channel.basicPublish(NORMAL_EXCHANGE,"tnt",pros,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("爆米花说"+msg); } } }
消费者2:
package com.candy.rabbit.dead; import com.candy.rabbit.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; /** * 死信队列——消费者2 */ public class Consumer2 { //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE,"direct"); //声明死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //绑定死信交换机与死信队列 channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"sdfj"); //接收消息 System.out.println("sdfj正在接听···"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("sdfj听到了:"+message); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> { }); } }
启动生产者,发送10条消息:
由于没有启动消费者1,过了10秒后消息过期,进入死信队列:
启动消费者2,消费死信队列:
队列达到最大长度
修改生产者:
修改消费者1:
//设置队列最大长度,超过的部分会转到死信队列
params.put("x-max-length",7);
运行前要将原来的队列删除,因为参数修改了
多出来的三条消息就被放入死信队列里面了:
消息被拒绝
修改消费者1:
//接收消息 System.out.println("tnt正在接听···"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); //如果消息等于tnt8,拒绝消费 if (message.equals("tnt8")){ System.out.println("TNT拒绝了:"+message); //拒绝消费 channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false); }else { System.out.println("TNT听到了:"+message); //接收 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; //开启手动应答 boolean autoAck = false; channel.basicConsume(NORMAL_QUEUE,autoAck,deliverCallback,consumerTag -> { });
消息被拒绝之后,放入死信队列:
死信队列消费被拒绝的消息:
设置TTL
- 消息设置TTL
- 队列设置TTL
创建一个springboot工程,导入依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build>
修改配置文件:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=sixcandy
spring.rabbitmq.password=sixcandy
添加Swagger配置类:
package com.candy.ttl.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("rabbitmq接口文档") .description("本文档描述了rabbitmq微服务接口定义") .version("1.0") .contact(new Contact("candy","http://baidu.com","sixcandy@126.com")) .build(); } }
代码架构图:
声明交换机、队列配置文件
package com.candy.ttl.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 声明交换机、队列配置文件 */ @EnableRabbit @Configuration public class TtlQueueConfig { //普通交换机 private static final String X_EXCHANGE = "x_exchange"; //死信交换机 private static final String Y_DEAD_EXCHANGE = "y_dead_exchange"; //普通队列 private static final String QUEUE_A = "queue_a"; private static final String QUEUE_B = "queue_b"; //死信队列 private static final String QUEUE_DEAD = "queue_dead"; //声明x、y交换机 @Bean("xExchange")//别名 public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } @Bean("yExchange")//别名 public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_EXCHANGE); } //声明三个队列 //过期时间为10秒 @Bean("queueA")//别名 public Queue queueA(){ Map<String,Object> settings = new HashMap<>(3); //声明队列绑定的死信交换机 settings.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); //声明当前队列死信路由routingkey settings.put("x-dead-letter-routing-key","sdfj"); //声明队列TTL settings.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A).withArguments(settings).build(); } //过期时间为40秒 @Bean("queueB")//别名 public Queue queueB(){ Map<String,Object> settings = new HashMap<>(3); //声明队列绑定的死信交换机 settings.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); //声明当前队列死信路由routingkey settings.put("x-dead-letter-routing-key","sdfj"); //声明队列TTL settings.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B).withArguments(settings).build(); } @Bean("queueDead") public Queue queueDead(){ return QueueBuilder.durable(QUEUE_DEAD).build(); } //绑定关系 //队列A与交换机X @Bean public Binding queueABindX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //队列B与交换机X @Bean public Binding queueBBindX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //死信队列与死信交换机 @Bean public Binding queueDeadBindX(@Qualifier("queueDead") Queue queueDead,@Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueDead).to(yExchange).with("sdfj"); } }
生产者:
package com.candy.ttl.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 生产者 */ @Slf4j @RequestMapping("ttl") @RestController public class sendMsg { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sentMsg/{msg}") public void sendMsg(@PathVariable String msg){ log.info("当前时间:{},发送一条信息给两个队列:{}",new Date(),msg); //交换机、routingkey、消息对象 rabbitTemplate.convertAndSend("x_exchange","XA","10s秒消息:"+msg); rabbitTemplate.convertAndSend("x_exchange","XB","40s秒消息:"+msg); } }
消费者:
package com.candy.ttl.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 消费者 */ @Slf4j @Component public class DeadListener { @RabbitListener(queues = "queue_dead") public void receiveDead(Message message, Channel channel)throws Exception{ String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg); } }
启动soringboot,访问:#http://localhost:8080/ttl/sentMsg/遇见
最终的演示成果:
延迟队列优化:新建一个新的队列,不设置这个队列的TTL
修改声明交换机、队列文件:
//新的普通队列 private static final String QUEUE_C = "queue_c"; //不设置TTL @Bean("queueC")//别名 public Queue queueC(){ Map<String,Object> settings = new HashMap<>(2); //声明队列绑定的死信交换机 settings.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE); //声明当前队列死信路由routingkey settings.put("x-dead-letter-routing-key","sdfj"); return QueueBuilder.durable(QUEUE_C).withArguments(settings).build(); } //队列C与交换机X @Bean public Binding queueCBindX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }
修改生产者:
@GetMapping("sentTtlMsg/{msg}/{ttl}")
public void sendMsg(@PathVariable String msg,@PathVariable String ttl){
rabbitTemplate.convertAndSend("x_exchange","XC",msg, (message)->{
//设置延长时间
message.getMessageProperties().setExpiration(ttl);
return message;
});
log.info("当前时间:{},发送一条时长{}毫秒的信息给TTL队列:{}",new Date(),ttl,msg);
}
分别访问:
http://localhost:8080/ttl/sentTtlMsg/爆米花/10000
http://localhost:8080/ttl/sentTtlMsg/侠/20000
结果演示:
延迟插件的配置文件
package com.candy.ttl.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 延迟插件配置文件 */ @Configuration public class DelayConfig { //交换机 public static final String DELAYED_ENAME = "delayed.exchange"; //队列 public static final String DELAYED_QNAME = "delayed.queue"; //routingkey public static final String DELAYED_KEY = "delayed.key"; //声明交换机,自定义一个交换机 @Bean public CustomExchange delayedExchange(){ //设置交换机的其他参数 Map<String,Object> settings = new HashMap<>(1); settings.put("x-delayed-type","direct"); /** * new CustomExchange()参数:交换机的名称、交换机类型、是否需要持久化、是否需要自动删除、其他参数 */ return new CustomExchange(DELAYED_ENAME,"x-delayed-message",true,false,settings); } //声明队列 @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QNAME); } //绑定交换机与队列 @Bean public Binding delayedEandQ(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_KEY).noargs(); } }
生产者:
//基于插件延迟
@GetMapping("sentDelayMsg/{msg}/{time}")
public void sendMsg(@PathVariable String msg,@PathVariable Integer time){
rabbitTemplate.convertAndSend(DelayConfig.DELAYED_ENAME,DelayConfig.DELAYED_KEY,msg, (message)->{
//设置延迟时间
message.getMessageProperties().setDelay(time);
return message;
});
log.info("当前时间:{},发送一条时长{}毫秒的信息给延迟队列delayed.queue:{}",new Date(),time,msg);
}
消费者:
package com.candy.ttl.consumer; import com.candy.ttl.config.DelayConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 基于插件的延迟消息——消费者 */ @Slf4j @Component public class DelayConsumer { //监听消息 @RabbitListener(queues = DelayConfig.DELAYED_QNAME) public void reDelay(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg); } }
分别访问:http://localhost:8080/ttl/sentDelayMsg/你要快乐/20000、
http://localhost:8080/ttl/sentDelayMsg/男儿歌/2000
演示效果:
确认机制:
代码架构:
需要在配置文件中添加下面这句话:
#none禁用发布确认模式,默认值
#correlated发布消息成功到交换器后会触发回调方法
#simple 和correlated一样的效果,并且发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果
spring.rabbitmq.publisher-confirm-type=correlated
配置文件:
package com.candy.ttl.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 发布确认的配置累 */ @Configuration public class ConfirmConfig { public static final String CONFIRM_ENAME = "confirm.exchange"; public static final String CONFIRM_QNAME = "confirm.queue"; public static final String CONFIRM_KEY = "confirm.key"; @Bean public DirectExchange confirmEx(){ return new DirectExchange(CONFIRM_ENAME); } @Bean public Queue confirmQue(){ return QueueBuilder.durable(CONFIRM_QNAME).build(); } @Bean public Binding queBindE(@Qualifier("confirmQue") Queue confirmQue, @Qualifier("confirmEx") DirectExchange confirmEx){ return BindingBuilder.bind(confirmQue).to(confirmEx).with(CONFIRM_KEY); } }
回调函数
package com.candy.ttl.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; /** * 回调接口 */ @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { /** * 交换机不管是否收到消息的一个回调方法 * CorrelationData * 1.交换机收到消息,回调:correlationData(回调消息、ID),true,null * 2.交换机没有收到消息,回调:correlationData(回调消息、ID),false,失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; if(ack){ log.info("交换机已经收到 id 为{}的消息",id); } else{ log.info("交换机还未收到 id 为{}消息,由于原因:{}",id,cause); } } }
生产者:
package com.candy.ttl.controller; import com.candy.ttl.config.ConfirmConfig; import com.candy.ttl.config.MyCallBack; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @Slf4j @RestController @RequestMapping("/confirm") public class ConfirmController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; //注入接口 @PostConstruct 一定要在上面两个上面注入才可以,要不然会报空指针异常 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallBack); } @GetMapping("/send/{msg}") public void send(@PathVariable String msg){ CorrelationData correlationData1=new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_ENAME,ConfirmConfig.CONFIRM_KEY,msg,correlationData1); //错误的routingkey CorrelationData correlationData2=new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_ENAME,ConfirmConfig.CONFIRM_KEY+"2",msg,correlationData2); log.info("发送消息:{}",msg); } }
消费者:
package com.candy.ttl.consumer;
import com.candy.ttl.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QNAME)
public void reDelay(Message message){
String msg = new String(message.getBody());
log.info("接收确认信息:{}",msg);
}
}
演示效果:
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
在配置文件中添加一下代码:
spring.rabbitmq.publisher-returns=true
添加回退消息的方法:
/**
* 消息失败后回调
* returnedMessage(消息,失败编码,失败原因,交换机,routingkey)
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.error("消息:{},被交换机{}退回,退回原因:{},路由key:{}",message,s1,s,s2);
}
然后在生产者中注入接口:
效果:
代码架构图:
在原来的基本上添加一个备份交换机,一个备份队列和一个警报队列
修改配置文件:
package com.candy.ttl.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 发布确认的配置 */ @Configuration public class ConfirmConfig { public static final String CONFIRM_ENAME = "confirm.exchange"; public static final String CONFIRM_QNAME = "confirm.queue"; public static final String CONFIRM_KEY = "confirm.key"; //备份交换机 public static final String BACKUP_ENAME = "backup.exchange"; public static final String BACKUP_QNAME = "backup.queue"; public static final String WARNING_QNAME = "warning.queue"; @Bean public DirectExchange confirmEx(){ return ExchangeBuilder.directExchange(CONFIRM_ENAME).durable(true) //设置备份交换机 .withArgument("alternate-exchange",BACKUP_ENAME).build(); } @Bean public Queue confirmQue(){ return QueueBuilder.durable(CONFIRM_QNAME).build(); } @Bean public Binding queBindE(@Qualifier("confirmQue") Queue confirmQue, @Qualifier("confirmEx") DirectExchange confirmEx){ return BindingBuilder.bind(confirmQue).to(confirmEx).with(CONFIRM_KEY); } @Bean public FanoutExchange backupEx(){ return new FanoutExchange(BACKUP_ENAME); } @Bean public Queue backupQue(){ return QueueBuilder.durable(BACKUP_QNAME).build(); } @Bean public Queue warningQue(){ return QueueBuilder.durable(WARNING_QNAME).build(); } @Bean public Binding backupqueBindE(@Qualifier("backupQue") Queue backupQue, @Qualifier("backupEx") FanoutExchange backupEx){ return BindingBuilder.bind(backupQue).to(backupEx); } @Bean public Binding warningqueBindE(@Qualifier("warningQue") Queue warningQue, @Qualifier("backupEx") FanoutExchange backupEx){ return BindingBuilder.bind(warningQue).to(backupEx); } }
警报队列消费者:
/**
* 警报队列消费者
*/
@Slf4j
@Component
public class WarningConsumer {
@RabbitListener(queues = ConfirmConfig.WARNING_QNAME)
public void reDelay(Message message){
String msg = new String(message.getBody());
log.info("警报队列接收确认信息:{}",msg);
}
}
注:如果回退消息和备份交换机都开启使用,未被接收的消息会优先进入备份交换机,备份交换机的优先级高。
效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。