赞
踩
目录
首先来说一下什么是MQ,顾名思义它就是队列,用来存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式。那么为什么要使用它,主要有三大好处:削峰、解耦、异步
常见的购物秒杀、高铁抢票,在同一时间点发生的请求实在是太多了,服务器处理不过来,这时就可以把请求放到 MQ 里面缓冲一下。把一秒内收到的 1 万个请求放到队列里面,然后我们可以花 10 分钟去消费队列里的请求。
比如有一个服务 A 每天都采集数据并计算各种数据,服务 B 需要调用服务 A 的接口获取数据,就在 A 开一个接口获取。那如果出现新的服务 C,D,E 它们都需要服务 A 的数据呢,如果和之前一样都直接调用 A 的接口去获取数据,那么这几个服务就耦合了。如果 A 把数据放到 MQ 里,B,C,D,E 服务它们谁要数据就自己去 MQ 里面拿,这样就达到了解耦的目的。
服务 A 需要调用服务 B 的接口,并需要知道服务 B 的处理结果。一般要么就 A 那边同步等待接口的响应,要么就是 A 那边过一会再调用 B 的一个接口查询,要么就是 B 完成之后调用 A 的一个接口通知。 这样要异步的话,至少要写两个接口,如果使用 MQ,自动就带有回调功能。很好的满足了异步的需求。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 |
时效性 | ms | us | ms | ms级以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
优点 | 成熟的产品、较全的文档、各种协议支持好 | 1、并发能力强、性能好、延迟低、文档齐全,开源提供管理界面、社区活跃高、更新频率高。 2、功能比较完善,稳定、易用、跨平台 | MQ功能比较完善,扩展性佳 | 有优秀的第三方管理界面,并且在大数据领域的实时计算以及日志采集被大规模使用 |
缺点 | 官方社区现在对 ActiveMQ 5.X 维护越来越少,高吞吐量场景较少使用 | 商业版需要收费,学习成本较高。 | 支持的客户端语言不多,目前是 java 及 c++,其中 c++ 不成熟;社区活跃度一般 | 1、单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长。 2、使用短轮询方式,实时性取决于轮询间隔时间; 3、消费失败不支持重试; 4、支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 5、社区更新较慢 |
Kafka 主要是基于 pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,首选 Kafka 。
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务消峰,在大量交易涌入时,后端可能无法计时处理的情况。RocketMQ 在稳定性上可能更值得信赖。这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以使用 RocketMQ。
结合 erlang 语言本身的并发优势,性能好,时效性微秒级、社区活跃度也较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
RabbitMQ 是一个消息中间件:它用来收并转发消息。通俗来讲,我们可以把它当做是一个快递站点,当你要发送一个包裹时,只需要将包裹放到快递站,快递员最终会把你的快递送到收件人那里。只不过RabbitMQ 与快递站的主要区别在于,它不处理快件而只是接收、存储和转发消息数据。
(1)生产者(Producer):产生并发送消息的程序是生产者(Producer )
(2)交换机(exchange):一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定。
(3)队列(Queue):消息只能存储在队列中。队列仅受主机的内存和磁盘限制约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
(4)消费者(Consumer):消费者大多时候是一个等待接收消息的程序。注意:生产者,消费者和消息中间件很多时候并不在一个机器上。同一个应用程序既可以是生产者又可以是消费者。
(1)Broker:接收和分发新消息的应用,也就是消息队列服务器实体。
(2)Virtual host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。
(3)Connection:publisher/consumer 和 broker 之间的 TCP 连接。
(4)Channel:信道,多路复用连接中的一条独立的双向数据流通道。Channel 是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。
(5)Exchange:Message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发到 队列(queue )中去。常用的类型有:direct(point-to-point), topic (publish-subscribe) and fanout (multicast)
(6)Queue:队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
(7)Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含有 routing key,binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
到此为止,基础知识我们已经学习的差不多了,接下来就开始安装RabbitMQ
=========================================================================
官网下载:Downloading and Installing RabbitMQ — RabbitMQ
1、将下载的文件上传到Linux服务器
2、安装文件(按照以下顺序安装)
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- yum install socat -y
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
3、常用命令(按照以下顺序执行)
(1)添加开机启动 RabbitMQ 服务: chkconfig rabbitmq-server on
(2)启动服务:/sbin/service rabbitmq-server start
(3)查看服务状态:/sbin/service rabbitmq-server status
(4)停止服务(选择执行) :/sbin/service rabbitmq-server stop
(5)开启 web 管理插件: rabbitmq-plugins enable rabbitmq_management
(6)用默认账号(guest)密码(guest)访问地址 http://47.105.220.36:15672/出现权限问题
4、添加一个新的用户
(1)创建账号:rabbitmqctl add_user admin admin
(2)设置用户角色:rabbitmqctl set_user_tags admin administrator
(3)设置用户权限:
(4)当前用户和角色:rabbitmqctl list_users
(5)重新启动服务:/sbin/service rabbitmq-server restart
5、利用新创建的用户 admin 进行登录
6. 重置命令
(1)关闭应用命令:rabbitmqctl stop_app
(2)清除命令:rabbitmqctl reset
(3)启动应用命令:rabbitmqctl start_app
安装完成后,接下来开始学习使用RabbitMQ
=========================================================================
编写生产者(Producer)和消费者(Consumer)两个程序。生产者用来发送消息,消费者用来接收并打印消息内容。
如下图所示:“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代 表消息缓冲区。
踩过的坑:
- <dependencies>
- <!--rabbitmq 依赖客户端-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.8.0</version>
- </dependency>
- <!--操作文件流的一个依赖-->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.6</version>
- </dependency>
- </dependencies>
- public class Producer {
-
- //队列名称
- private static final String QUEUE_NAME = "hello";
- //发消息
- public static void main(String[] args) throws Exception {
-
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂ip 连接RabbitMQ的队列
- factory.setHost("47.105.220.36");
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("admin");
-
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
-
- /**
- * 生成一个队列
- * 1.队列名称
- * 2.队列里面的消息是否持久化(磁盘) 默认情况下存储在内存中
- * 3.该队列是否只供一个消费者进行消费,是否可以消息共享,true可以多个消费者消费, false只能一个消费者消费
- * 4.是否自动删除 最后一个消费者断开连接之后,该队列是否自动删除 true自动删除 false不自动删除
- * 5.其他参数(Map)
- */
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- //发消息
- /**
- * 发送一个消息
- * 1.发送到哪个交换机,"" 代表默认交换机
- * 2.路由的key值是那一个 本次是队列名称
- * 3.其他参数信息
- * 4.发送消息的消息体,需要转换成二进制
- */
- String message = "hello world";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
- System.out.println("消息发送完毕");
- }
- }
- public class Consumer {
- //队列名称
- private static final String QUEUE_NAME = "hello";
-
- //接收消息
- public static void main(String[] args) throws Exception {
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂ip 连接RabbitMQ的队列
- factory.setHost("47.105.220.36");
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("admin");
-
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
-
- //声明接收消息
- DeliverCallback deliverCallback = (consumerTag,message) -> {
- System.out.println(new String(message.getBody()));
- };
- //取消消息时的回调
- CancelCallback cancelCallback = consumerTag ->{
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者消费消息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
- * 3.消费者未成功消费的回调
- * 4.消费者取消消费的回调
- */
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- }
- }
工作队列(又称任务队列),主要思想是避免立即执行资源密集型任务,我们可以在安排任务之后再执行。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务,并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
工作队列(work queue)是采用多个工作的线程,接收处理从队列中出来的大量消息,避免一个工作线程(一个消费者)接收处理大量的消息,因为单一处理速度肯定慢。
轮询消费消息指的是轮流消费消息,即每个工作队列都会获取一个消息进行消费,并且获取的消息按照顺序依次往下轮流。
5.1.1、首先把 RabbitMQ 的配置参数封装为一个工具类:RabbitMQUtils
- public class RabbitMQUtils {
-
- public static Channel getChannel() throws Exception {
-
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂IP 连接RabbitMQ对列
- factory.setHost("47.105.220.36");
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("admin");
-
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
-
- return channel;
- }
- }
5.1.2、创建两个工作队列,并且启动
- public class Work01 {
-
- //队列的名称
- public static final String QUEUE_NAME="hello";
-
- //接收消息
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- //消息被接收时,执行下面的内容
- DeliverCallback deliverCallback = (consumerTag, message) ->{
- System.out.println("接收到的消息:"+new String(message.getBody()));
- };
-
- //消息接受被取消时,执行下面的内容
- CancelCallback cancelCallback = consumerTag -> {
- System.out.println(consumerTag+"消息被消费者取消消费接口回调逻辑");
- };
-
- //消息的接收
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
- }
- }
创建好一个工作队列,只需要以多线程方式启动两次该 main 函数即可,以 c1、c2 区别两个消息队列。多线程开启方式如下图所示,首先将work01启动:
5.1.3、创建一个生产者,发送消息进程
- public class Task01 {
-
- //队列名称
- public static final String QUEUE_NAME = "hello";
-
- //发送大量消息
- public static void main(String[] args) throws Exception {
-
- Channel channel = RabbitMQUtils.getChannel();
- //队列的声明
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- //发送消息
- //从控制台当中接受信息
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("消息发送完成:" + message);
- }
- }
- }
5.1.4、结果演示
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况?
这时候我们将丢失正在处理的以及后续发送给该消费者的消息,因为它无法接收到。
怎么办呢?为了保证消息在发送过程中不丢失,rabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitMQ 它已经处理了,rabbitMQ 才可以对该消息执行删除操作。
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
除此之外,这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样就有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死。
所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
Channel.basicAck
(肯定确认应答):- /**
- * 1.消息标记
- * 2.是否应用于多消息
- * */
- basicAck(long deliveryTag, boolean multiple);
Channel.basicReject
(否定确认应答)- /**
- * 该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
- * 1.消息标记,表示拒绝 deliveryTag 对应的消息
- * 2.表示是否 requeue:true 则重新入队列,false 则丢弃或者进入死信队列
- * */
- basicReject(long deliveryTag, boolean requeue);
Channel.basicNack
(用于否定确认):示己拒绝处理该消息,可以将其丢弃了- /**
- * 该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
- * 1.消息标记,表示拒绝 deliveryTag 对应的消息
- * 2.示否应用于多消息
- * 3.表示是否 requeue,与 basicReject 区别就是同时支持多个消息,可以拒绝签收该消费者先前接收未 ack 的所有消息。拒绝签收后的消息也会被自己消费到。
- * */
- basicNack(long deliveryTag, boolean multiple, boolean requeue);
手动应答的好处是可以批量应答并且减少网络拥堵
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
不建议使用批量应答
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK (应答)确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。
消息默认采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。
实现效果:消费者启用两个线程,消费者1: 1s消费一个消息,消费者 2: 10s消费一个消息,然后在消费者 2 消费消息的时候,停止运行,这时正在消费的消息是否会重新进入队列,而后给消费者 1 消费呢?
1、工具类
- public class SleepUtils {
- public static void sleep(int second){
- try {
- Thread.sleep(1000*second);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
2、消息生产者
- /**
- * desc:消息在手动应答是不丢失、放回队列中重新消费
- */
- public class Task02 {
-
- //队列名称
- public static final String TASK_QUEUE_NAME = "ACK_QUEUE";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
- //在控制台中输入信息
- Scanner scanner = new Scanner(System.in);
- System.out.println("请输入信息:");
- while (scanner.hasNext()){
- String message = scanner.next();
- channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
- System.out.println("生产者发出消息:"+ message);
- }
- }
- }
3、消费者1
- /**
- * desc:消息在手动应答是不丢失、放回队列中重新消费
- */
- public class Work03 {
-
- //队列名称
- public static final String TASK_QUEUE_NAME = "ACK_QUEUE";
-
- //接受消息
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- System.out.println("C1等待接受消息处理时间较短");
-
- DeliverCallback deliverCallback =(consumerTag, message) ->{
- //沉睡1S
- SleepUtils.sleep(1);
- System.out.println("接受到的消息:"+new String(message.getBody(),"UTF-8"));
- //手动应答
- /**
- * 1.消息的标记Tag
- * 2.是否批量应答 false表示不批量应答信道中的消息
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
- };
-
- CancelCallback cancelCallback = (consumerTag -> {
- System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
-
- });
- //采用手动应答
- boolean autoAck = false;
- channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
- }
- }
4、消费者 2,修改沉睡时间即可
- //沉睡10S
- SleepUtils.sleep(10);
5、效果演示
生产者发送消息 DD 到队列,此时是 消费者2来消费该消息,但是由于它处理时间较长,在还未处理完时间里停止运行,也就是说 消费者2 还没有执行到 ack 代码的时候就被停掉了,此时会看到消息被 消费者1 接收到,说明消息 DD 被重新入队,然后分配给能处理消息的 消费者1处理。
当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
之前我们创建的队列都是非持久化的,RabbitMQ 如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为true,代表开启持久化。
在消息生产者开启持久化:
注意:如果之前声明的队列不是持久化的,那么需要把原先队列先删除,或者重新创建一个持久化的队列,不然会报错。
需要在消息生产者发布消息的时候,开启消息的持久化
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮询分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实并不友好。
为了避免这种情况,我们在消费者消费消息之前,设置参数 channel.basicQos(1);
开启成功后,启动消费者,可以看到如下结果:
不公平分发思想:如果一个工作队列还没有处理完或者没有应答签收一个消息,则拒绝 RabbitMQ 分配新的消息到该工作队列。此时 RabbitMQ 会优先分配给其他已经处理完消息或者空闲的工作队列。如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker (工作队列)或者改变其他存储任务的策略。
换句话说就是如果这个任务我还没有处理完或者我还没有应答你,你先别给我,我不要。
效果演示:
预取值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 11,22,33,44,55,并且通道的预取计数设置为 5,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=55 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。
通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
注意:不公平分发和预取值分发都用到 basicQos 方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发
效果演示:设置预取值,生产者发送 6条消息到 MQ 中
生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理(一般不这么办)。
confirm 模式最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认方法默认是关闭的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。
- //开启发布确认
- channel.confirmSelect();
注意:确认发布指的是成功发送到了队列,并不是消费者消费了消息。
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,会阻塞所有后续消息的发因为如果没有确认发布的消息就布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
- public class ConfirmMessage {
-
- //单个发消息的个数
- public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
-
- public static void main(String[] args) throws Exception {
- publishMessageIndividually();//发布1000个单独确认消息,耗时:599ms
- }
- //单个确认
- public static void publishMessageIndividually() throws Exception{
- Channel channel = RabbitMQUtils.getChannel();
- //队列的声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName,false,true,false,null);
-
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long begin = System.currentTimeMillis();
-
- //批量发消息
- for (int i = 0; i < 1000; i++) {
- String message = i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- //单个消息就马上进行发布确认
- boolean flag = channel.waitForConfirms();
- if(flag){
- System.out.println("消息发送成功");
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");
- }
- }
单个确认发布方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。
当然这种方式缺点也很明显:当发生故障导致发布出现问题时,不知道是哪个消息出问题了。与此同时这种方案在发生故障时也一样会阻塞消息的发布。
- public class ConfirmMessage2 {
-
- //批量发消息的个数
- public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
-
- public static void main(String[] args) throws Exception {
- publishMessageBatch(); //发布1000个批量确认消息,耗时:111ms
- }
-
- //批量发布确认
- public static void publishMessageBatch() throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //队列的声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName, false, true, false, null);
-
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long begin = System.currentTimeMillis();
-
- //批量确认消息大小
- int batchSize =100;
-
- //批量发送消息,批量发布确认
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message=i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
-
- //判断达到100条消息的时候,批量确认一次
- if((i+1)%batchSize==0){
- //发布确认
- channel.waitForConfirms();
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
- }
- }
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都很好,利用了回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面来看一下异步确认是如何实现的:
添加回调函数,在回调函数里进行确认发布:
- public class ConfirmMessage3 {
-
- public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
-
- public static void main(String[] args) throws Exception {
- publishMessageAsync(); //发布1000个异步发布确认消息,耗时:43ms
- }
-
- //异步发布确认
- public static void publishMessageAsync() throws Exception{
-
- Channel channel = RabbitMQUtils.getChannel();
- //队列的声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName, false, true, false, null);
-
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long begin = System.currentTimeMillis();
-
- /**
- * 1.消息的标记
- * 2.是否为批量确认
- */
- //消息确认回调的函数
- ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
- System.out.println("确认的消息:"+deliveryTag);
- };
- //消息确认失败回调函数
- ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
- System.out.println("未确认的消息:"+deliveryTag);
- };
-
- //准备消息的监听器 监听哪些消息成功了,哪些消息失败了
- /**
- * 1.监听哪些消息成功了
- * 2.监听哪些消息失败了
- */
- channel.addConfirmListener(ackCallback,nackCallback);//异步通知
-
- //批量发送消息
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message=i+"消息";
- channel.basicPublish("",queueName,null,message.getBytes());
- }
-
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时:"+(end-begin)+"ms");
- }
- }
在实际案例中,我们需要将发布的消息存入 Map 里,方便获取。headMap
方法用于将已发布的消息存入Map 缓存区里,然后清除该缓存区已确认的内容。因为 headMap
方法是浅拷贝,所以清除了缓存区,相当于清除了内容的地址,也就清除了队列的确认的消息,剩下的就是未确认的。
如何处理异步未确认消息?
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
- public class ConfirmMessage3 {
-
- public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
-
- public static void main(String[] args) throws Exception {
- publishMessageAsync(); //发布1000个异步发布确认消息,耗时:43ms
- }
-
- //异步发布确认
- public static void publishMessageAsync() throws Exception{
-
- Channel channel = RabbitMQUtils.getChannel();
- //队列的声明
- String queueName = UUID.randomUUID().toString();
- channel.queueDeclare(queueName, false, true, false, null);
-
- //开启发布确认
- channel.confirmSelect();
-
- /**
- * 线程安全有序的一个哈希表,适用于高并发的情况下
- * 1.轻松的将序号与消息进行关联
- * 2.轻松批量删除条目 只要给到序号
- * 3.支持高并发(多线程)
- */
- ConcurrentSkipListMap<Long,String> outstandingConfirms=
- new ConcurrentSkipListMap<>();
-
- /**
- * 消息确认回调的函数
- * 消息确认成功
- * 1.消息的标记
- * 2.是否为批量确认
- * */
- ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
- if(multiple) {
- //2.删除掉已经确认的消息 剩下的就是未确认的消息
- ConcurrentNavigableMap<Long, String> confirmed =
- outstandingConfirms.headMap(deliveryTag);
- confirmed.clear();
- }else {
- outstandingConfirms.remove(deliveryTag);
- }
- //单个删除已确认的消息
- System.out.println("确认的消息:" + deliveryTag);
- };
- /**
- * 消息确认失败回调函数
- * 1.消息的标记
- * 2.是否为批量确认
- */
- ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
- //处理未确认消息 打印
- String message = outstandingConfirms.get(deliveryTag);
- System.out.println("未确认的消息是:"+message+"---------未确认的消息标记:"+deliveryTag);
- };
-
- /**
- * 准备消息的监听器 监听那些消息成功了,哪些消息失败了
- * 1.监听哪些消息成功了
- * 2.监听哪些消息失败了
- */
- channel.addConfirmListener(ackCallback,nackCallback);//异步通知
-
- //开始时间
- long begin = System.currentTimeMillis();
-
- //批量发送消息
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String message=i+"消息";
- channel.basicPublish("",queueName,null,message.getBytes());
- //1.此处记录下所有要发送的消息 消息的总和
- outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
- }
-
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时:"+(end-begin)+"ms");
- }
- }
以上 3 种发布确认速度对比:
(1)单独发布消息:同步等待确认,简单,但吞吐量非常有限。
(2)批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
(3)异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微有那么一丢丢的复杂。
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。生产者只能将消息发送到交换机(exchange)。
交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们放到许多队列中还是说应该丢弃它们,这些都需要交换机的类型来决定。
直接(direct)
主题(topic)
扇出(fanout)
在此之前我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串("")进行标识。
第一个参数是交换机的名称。空字符串 表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey) 绑定指定的 key。
在此之前我们使用的是具有特定名称的队列(比如 hello 和 ack_queue )。队列的名称对于我们来说至关重要,我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
什么是 bingding 呢,它其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定。
.
一个发送,多个接受,相当于发布/订阅模式。
Fanout 这种类型非常简单,它不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。
为了说明这种模式,这里构建了一个简单的日志系统。它由两个程序组成:
ReceiveLogs01
接收消息打印在控制台
- //消费者1
- public class ReceiveLogs01 {
-
- //交换机名称
- private static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //声明一个交换机
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //声明一个队列 临时队列
- /**
- * 生成一个临时的队列,队列的名称是随机的
- * 当消费者断开与队列的连接的时候 队列就自动删除
- */
- String queueName = channel.queueDeclare().getQueue();
- /**
- * 绑定交换机与队列
- */
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- System.out.println("等待接收消息,把接收到的消息打印在屏幕上...");
- //接收消息
- //消费者成功接收消息时回调接口
- DeliverCallback deliverCallback = (consumerTag, message) ->{
- System.out.println("控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
- };
- channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
- }
- }
ReceiveLogs02
把消息写出到文件
- public class ReceiveLogs02 {
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] argv) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- /**
- * 生成一个临时的队列 队列的名称是随机的
- * 当消费者断开和该队列的连接时 队列自动删除
- */
- String queueName = channel.queueDeclare().getQueue();
- /**
- * 把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
- */
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println("等待接收消息,把接收到的消息写到文件.....");
- //接收消息
- //消费者成功接收消息时回调接口
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- File file = new File("C:\\work\\rabbitmq_info.txt");
- FileUtils.writeStringToFile(file,message,"UTF-8");
- System.out.println("数据写入文件成功");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
生产者EmitLog
发送消息给两个消费者进行消费
- /**
- * desc:负责进行发消息给交换机
- */
- public class EmitLog {
-
- //交换机名称
- public static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- /**
- * 声明一个exchange
- * 1.exchange的名称
- * 2.exchange的类型
- */
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
- System.out.println("生产者发出消息:"+message);
- }
- }
- }
通过Fanout 交换机我们能够向许多消费者发送日志消息。在这一模块中我们向其中添加一些特别的功能——让某个消费者订阅发布的部分消息。例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去它绑定的 routingKey 队列。
这里就需要用到bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey
- //创建绑定代码
- channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
如上图,如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的效果和 fanout 相同。
生产者:将消息到 direct_logs
交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。
- public class DirectLogs {
-
- //交换机名称
- public static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.next();
- channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
- System.out.println("生产者发出消息:"+message);
- }
- }
- }
C1 消费者:绑定 console 队列,routingKey 为 info、warning
- public class ReceiveLogsDirect01 {
-
- public static final String EXCHANGE_NAME="direct_logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //声明一个direct交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //声明一个队列
- channel.queueDeclare("console",false,false,false,null);
-
- channel.queueBind("console",EXCHANGE_NAME,"info");
- channel.queueBind("console",EXCHANGE_NAME,"warning");
- //接收消息
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
- };
- //消费者取消消息时回调接口
- channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
-
- }
- }
C2 消费者:绑定 disk 队列,routingKey 为 error
- public class ReceiveLogsDirect02 {
-
- public static final String EXCHANGE_NAME="direct_logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //声明一个direct交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //声明一个队列
- channel.queueDeclare("disk",false,false,false,null);
- channel.queueBind("disk",EXCHANGE_NAME,"error");
-
- //接收消息
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
- };
- //消费者取消消息时回调接口
- channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});
- }
- }
演示效果:此时生产者发出的消息被消费者1接收,消费者2不接收消息。
上述我们通过direct交换机,实现了有选择性地接收日志。尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性。
比如我们想接收的日志类型有 info.base 和 info.advantage,而某个队列只想 info.base 的消息,这时候direct 就办不到了,只能使用 topic 类型。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表(最多不能超过 255 个字节),以点号分隔开。这些单词可以是任意单词。
注意:在这个规则列表中,有两个常见的替换符需要注意
例子 | 说明 |
---|---|
quick.orange.rabbit | 被队列 Q1Q2 接收到 |
lazy.orange.elephant | 被队列 Q1Q2 接收到 |
quick.orange.fox | 被队列 Q1 接收到 |
lazy.brown.fox | 被队列 Q2 接收到 |
lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
可以发现:
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
生产者发送多个消息到交换机,交换机按照通配符分配消息到不同的队列中,队列由消费者进行消费。
生产者 EmitLogTopic
- public class EmitLogTopic {
-
- //交换机的名称
- public static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- /**
- * Q1-->绑定的是
- * 中间带 orange 带 3 个单词的字符串(*.orange.*)
- * Q2-->绑定的是
- * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
- * 第一个单词是 lazy 的多个单词(lazy.#)
- */
- HashMap<String, String> bindingKeyMap = new HashMap<>();
- bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
- bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
- bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
- bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
- bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
- bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
- bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
- bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
- for (Map.Entry<String,String> bindingKeyEntry : bindingKeyMap.entrySet()){
- String routingKey = bindingKeyEntry.getKey();
- String message = bindingKeyEntry.getValue();
- channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
- System.out.println("生产者发出消息:"+message);
- }
- }
- }
消费者C1:
- public class ReceiveLogsTopic01 {
-
- //交换机的名称
- public static final String EXCHANGE_NAME = "topic_logs";
-
- //接收消息
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //声明队列
- String queueName = "Q1";
- channel.queueDeclare(queueName,false,false,false,null);
- channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
- System.out.println("等待接收消息...");
-
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println(new String(message.getBody(),"UTF-8"));
- System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
- };
- //接收消息
- channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
- }
- }
消费者C2:
- public class ReceiveLogsTopic02 {
-
- //交换机的名称
- public static final String EXCHANGE_NAME = "topic_logs";
-
- //接收消息
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //声明队列
- String queueName = "Q2";
- channel.queueDeclare(queueName,false,false,false,null);
- channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
- channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
-
- System.out.println("等待接收消息...");
-
- DeliverCallback deliverCallback = (consumerTag,message) -> {
- System.out.println(new String(message.getBody(),"UTF-8"));
- System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
- };
- //接收消息
- channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
- }
- }
测试结果: 和7.6.3中的匹配案例进行对比,结果一致。
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列,造成的后果就是队列阻塞。
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。有一个比较常见的情境就是,用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
消息 TTL 过期:TTL是 Time To Live 的缩写, 也就是生存时间
队列达到最大长度:队列满了,无法再添加数据到 MQ 中
消息被拒绝:(basic.reject 或 basic.nack) 并且 requeue = false(表示不重新入队)
交换机类型是 direct,两个消费者C1,C2,一个生产者,两个队列:消息队列和死信队列。
1、生产者(Producer):
- /**
- * desc:死信队列之生产者
- */
- public class Producer {
- //普通交换机的名称
- public static final String NORMAL_EXCHANGE = "normal_exchange";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- //死信消息 设置ttl时间 live to time 单位是ms
- AMQP.BasicProperties properties =
- new AMQP.BasicProperties().builder().expiration("10000").build();
- for (int i = 1; i <11 ; i++) {
- String message = "info"+i;
- channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
- }
- }
- }
2、消费者C1(启动之后关闭该消费者 模拟其接收不到消息)
- /**
- * 消费者C1
- */
- public class Consumer01 {
-
- //普通交换机的名称
- public static final String NORMAL_EXCHANGE = "normal_exchange";
- //死信交换机的名称
- public static final String DEAD_EXCHANGE = "dead_exchange";
-
- //普通队列的名称
- public static final String NORMAL_QUEUE = "normal_queue";
- //死信队列的名称
- public static final String DEAD_QUEUE = "dead_queue";
-
- public static void main(String[] args) throws Exception {
-
- Channel channel = RabbitMQUtils.getChannel();
-
- //声明死信和普通交换机,类型为direct
- channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
- channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
-
- //声明普通队列
- Map<String,Object> arguments = new HashMap<>();
- //过期时间 10s 由生产者指定 更加灵活
- arguments.put("x-message-ttl",10000);
- //正常的队列设置死信交换机
- arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//图中红箭头
- //设置死信routingKey
- arguments.put("x-dead-letter-routing-key","lisi");
-
-
- channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
- /
- //声明死信队列
- channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
-
- //绑定普通的交换机与队列
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
-
- //绑定死信的交换机与死信的队列
- channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
- System.out.println("等待接收消息...");
-
- DeliverCallback deliverCallback = (consumerTag,message) ->{
- System.out.println("Consumer01接受的消息是:"+new String(message.getBody(),"UTF-8"));
- };
-
- channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> {});
- }
-
- }
先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息。
生产者未启动发送消息时
生产者发送了10条消息,此时正常消息队列有10条消息未消费
时间过去10秒,正常队列里面的消息由于没有被消费,消息进入死信队列
消费者 C2 (以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)
- public class Consumer02 {
- private static final String DEAD_EXCHANGE = "dead_exchange";
- public static void main(String[] argv) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
- channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
- String deadQueue = "dead-queue";
- channel.queueDeclare(deadQueue, false, false, false, null);
- channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
- System.out.println("等待接收死信队列消息.....");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Consumer02 接收死信队列的消息" + message);
- };
- channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
- });
- }
- }
效果演示:启动C2,此时C2将死信队列中的消息消费完成。
1、消息生产者代码去掉 TTL 属性,basicPublish
的第三个参数改为 null
- /**
- * desc:死信队列之生产者
- */
- public class Producer {
- //普通交换机的名称
- public static final String NORMAL_EXCHANGE = "normal_exchange";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- //死信消息 设置ttl时间 live to time 单位是ms
- // AMQP.BasicProperties properties =
- // new AMQP.BasicProperties().builder().expiration("10000").build();
- for (int i = 1; i <11 ; i++) {
- String message = "info"+i;
- channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
- }
- }
- }
2、C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)
(注意:因为参数改变了,所以需要把原先队列删除)
- //设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
- arguments.put("x-max-length",6);
生产者未启动发送消息时
生产者发送了10条消息,此时正常消息队列有6条消息未消费,死信队列有4条消息未消费
效果演示:启动C2消费者,此时死信队列中的4条消息被成功消费
1、消息生产者代码同上述生产者代码一致
- public class Producer {
- //普通交换机的名称
- public static final String NORMAL_EXCHANGE = "normal_exchange";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMQUtils.getChannel();
-
- //死信消息 设置ttl时间 live to time 单位是ms
- /*AMQP.BasicProperties properties =
- new AMQP.BasicProperties().builder().expiration("10000").build();*/
- for (int i = 1; i <11 ; i++) {
- String message = "info"+i;
- channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
- }
- }
- }
2、消费者 C1 拒收消息 "info5",开启手动应答(保持开启不用关闭)
- public class Consumer01 {
-
- //普通交换机的名称
- public static final String NORMAL_EXCHANGE = "normal_exchange";
- //死信交换机的名称
- public static final String DEAD_EXCHANGE = "dead_exchange";
-
- //普通队列的名称
- public static final String NORMAL_QUEUE = "normal_queue";
- //死信队列的名称
- public static final String DEAD_QUEUE = "dead_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
-
- Channel channel = RabbitMQUtils.getChannel();
-
- //声明死信和普通交换机,类型为direct
- channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
- channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
-
- //声明普通队列
- Map<String,Object> arguments = new HashMap<>();
- //过期时间 10s 由生产者指定 更加灵活
- //arguments.put("x-message-ttl",10000);
- //正常的队列设置死信交换机
- arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//图中红箭头
- //设置死信routingKey
- arguments.put("x-dead-letter-routing-key","lisi");
- //设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
- //arguments.put("x-max-length",6);
-
- channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
- /
- //声明死信队列
- channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
-
- //绑定普通的交换机与队列
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
-
- //绑定死信的交换机与死信的队列
- channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
- System.out.println("等待接收消息...");
-
- DeliverCallback deliverCallback = (consumerTag,message) ->{
- String msg = new String(message.getBody(), "UTF-8");
- if(msg.equals("info5")){
- System.out.println("Consumer01接受的消息是:"+msg+": 此消息是被C1拒绝的");
- //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
- channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
- }else {
- System.out.println("Consumer01接受的消息是:"+msg);
- channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
- }
-
- };
- //开启手动应答,也就是关闭手动应答
- channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
- }
-
- }
生产者未启动发送消息时
生产者发送了10条消息,可以发现死信队列有1条消息未消费
效果演示:启动C2消费者,此时死信队列中的1条消息被成功消费
队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间(也就是我们前面所说的过期时间),单位是毫秒。
目前有两种方法设置消息的TTL,第一种方法是通过队列的属性设置,队列中的所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用那么消息的TTL以两者之间较小的那个数值为准。下面先来简单认识一下这两种队列:
在创建队列的时候设置队列的 x-message-ttl 属性,这个我们前面已经操作过了。
- Map<String, Object> params = new HashMap<>();
- params.put("x-message-ttl",5000);
- return QueueBuilder.durable("QA").withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒
消息设置TTL,其实就是针对每条消息设置 TTL。
1、消息在队列中的生存时间一旦超过设置的TTL值就会变成死信,消费者将无法再收到该消息
2、如果不设置 TTL ,则表示此消息不会过期
3、如果TTL设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃
- rabbitTemplate.converAndSend("X","XC",message,correlationData -> {
- correlationData.getMessageProperties().setExpiration("5000");
- });
1、创建一个 Maven 工程或者 Spring Boot工程
2、添加依赖
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.5</version>
- <relativePath/>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.47</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <!--RabbitMQ 依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
3、创建或修改 application.yml
文件
- server:
- port: 8280
- spring:
- rabbitmq:
- host: 47.105.220.36
- port: 5672
- username: admin
- password: admin
4、Maven 项目新建主启动类
- @SpringBootApplication
- public class RabbitmqSpringbootApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(RabbitmqSpringbootApplication.class, args);
- }
- }
创建两个队列 QA 和 QB,两个队列的 TTL 分别设置为 10S 和 40S,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
之前我们将配置队列的信息都写在了生产者和消费者代码中,现在可写在配置类中。而此时的生产者只负责发消息,消费者只负责接收消息。
- /**
- * desc:TTL队列 配置文件类代码
- */
- @Configuration
- public class TtlQueueConfig {
-
- //普通交换机的名称
- public static final String X_EXCHANGE="X";
- //死信交换机的名称
- public static final String Y_DEAD_LETTER_EXCHANGE="Y";
- //普通队列的名称
- public static final String QUEUE_A="QA";
- public static final String QUEUE_B="QB";
- //死信队列的名称
- public static final String DEAD_LETTER_QUEUE="QD";
-
- //声明xExchange 别名
- @Bean("xExchange")
- public DirectExchange xExchange(){
- return new DirectExchange(X_EXCHANGE);
- }
-
- //声明yExchange 别名
- @Bean("yExchange")
- public DirectExchange yExchange(){
- return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
- }
-
- //声明普通队列 要有ttl 为10s
- @Bean("queueA")
- public Queue queueA(){
- Map<String,Object> arguments = new HashMap<>(3);
- //设置死信交换机
- arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
- //设置死信RoutingKey
- arguments.put("x-dead-letter-routing-key","YD");
- //设置TTL 10s 单位是ms
- arguments.put("x-message-ttl",10000);
- return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
- }
-
- //声明普通队列 要有ttl 为40s
- @Bean("queueB")
- public Queue queueB(){
- Map<String,Object> arguments = new HashMap<>(3);
- //设置死信交换机
- arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
- //设置死信RoutingKey
- arguments.put("x-dead-letter-routing-key","YD");
- //设置TTL 10s 单位是ms
- arguments.put("x-message-ttl",40000);
- return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
- }
-
- //声明死信队列
- @Bean("queueD")
- public Queue queueD(){
- return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
- }
-
- //声明队列 QA 绑定 X 交换机
- @Bean
- public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
- @Qualifier("xExchange") DirectExchange xExchange){
- return BindingBuilder.bind(queueA).to(xExchange).with("XA");
- }
-
- //声明队列 QB 绑定 X 交换机
- @Bean
- public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
- @Qualifier("xExchange") DirectExchange xExchange){
- return BindingBuilder.bind(queueB).to(xExchange).with("XB");
- }
-
- //声明队列 QD 绑定 Y 交换机
- @Bean
- public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
- @Qualifier("yExchange") DirectExchange yExchange){
- return BindingBuilder.bind(queueD).to(yExchange).with("YD");
- }
-
- }
Controller 层代码,获取消息,放到 RabbitMQ里面。
- /**
- * desc:发送延迟消息
- */
- @Slf4j
- @RestController
- @RequestMapping("/ttl")
- public class SendMsgController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //开始发消息
- @GetMapping("/sendMsg/{message}")
- public void sendMsg(@PathVariable("message") String message){
- log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
- rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
- rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
- }
- }
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
-
- @Slf4j
- @Component
- public class DeadLetterQueueConsumer {
- @RabbitListener(queues = "QD")
- public void receiveD(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody());
- log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
- }
- }
发起一个请求:http://localhost:8280/ttl/sendMsg/嘻嘻嘻
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
就现在的情况来看,延时队列功能是实现了,但是如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?显然这样处理是不合适的。
我们可以新增了一个队列 QC,该队列不设置 TTL 时间,而是根据前端的请求确定 TTL 时间,绑定关系如下:
新增一个配置文件类(或者放在之前的配置文件中),用于新增队列 QC。
- @Configuration
- public class MsgTtlQueueConfig {
-
- //普通队列的名称
- public static final String QUEUE_C = "QC";
-
- //死信交换机的名称
- public static final String Y_DEAD_LETTER_EXCHANGE="Y";
-
- //声明QC
- @Bean("queueC")
- public Queue QueueC(){
- Map<String,Object> arguments = new HashMap<>(3);
- //设置死信交换机
- arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
- //设置死信RoutingKey
- arguments.put("x-dead-letter-routing-key","YD");
- return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
- }
- //声明队列 QC 绑定 X 交换机
- @Bean
- public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
- @Qualifier("xExchange") DirectExchange xExchange){
- return BindingBuilder.bind(queueC).to(xExchange).with("XC");
- }
- }
在Controller 中新增一个方法用来接收的请求,要求带有 TTL 时间。
- //开始发消息 发TTL
- @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
- public void sendMsg(@PathVariable("message") String message,
- @PathVariable("ttlTime") String ttlTime){
- log.info("当前时间:{},发送一条时长是{}毫秒TTL信息给队列QC:{}",
- new Date().toString(),ttlTime,message);
- rabbitTemplate.convertAndSend("X","XC",message,msg -> {
- //发送消息的时候的延迟时长
- msg.getMessageProperties().setExpiration(ttlTime);
- return msg;
- });
- }
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
-
- @Slf4j
- @Component
- public class DeadLetterQueueConsumer {
- @RabbitListener(queues = "QD")
- public void receiveD(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody());
- log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
- }
- }
重启,然后发送请求:
http://localhost:8280/ttl/sendExpirationMsg/你好1/20000
http://localhost:8280/ttl/sendExpirationMsg/你好2/2000
表面看起来似乎没什么问题,仔细观看时间,我们可以发现 你好1 延时20秒是正确的,但是你好2,延时2秒,却是等你好1消费完再进行消费的。由此可以发现,如果属性上设置TTL的方式,消息可能并不会按时死亡。
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
接下来我们通过插件来解决这一问题。
1、官网下载 (opens new window)
2、上传插件到Linux上RabbitMQ 的安装目录下的 plgins 目录下(如下是我的安装目录):
- # RabbitMQ 安装目录
- /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8
- # RabbitMQ 的 plgins 所在目录
- /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
3、进入目录后执行下面命令让该插件生效,然后重启 RabbitMQ
注意:
(1)安装命令不能出现插件版本和后缀,如
rabbitmq-plugins enable rabbitmq_delayed_message_exchange-3.8.0.ez
会报错。(2)必须是
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
,后面不允许填入版本和文件后缀。(3)在下载时注意要尽量保持一致,不然在装插件时容易报错。
- # 安装
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- # 重启服务
- systemctl restart rabbitmq-server
4、查看是否安装成功。
打开 Web 界面,查看交换机的新增功能列表,如果多出了如图所示,代表成功添加插件
添加插件之前: 添加插件之后:
1、代码架构图
创建一个队列 delayed.queue,自定义一个交换机 delayed.exchange,绑定关系如下:
2、配置类代码
新增一个配置类 DelayedQueueConfig
,也可以放在原来的配置文件里:
- @Configuration
- public class DelayedQueueConfig {
-
- //交换机
- public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
- //队列
- public static final String DELAYED_QUEUE_NAME = "delayed.queue";
- //routingKey
- public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
-
-
- //声明队列
- @Bean
- public Queue delayedQueue(){
- return new Queue(DELAYED_QUEUE_NAME);
- }
-
- //声明自定义交换机,基于插件的交换机
- @Bean
- public CustomExchange delayedExchange(){
-
- Map<String,Object> arguments = new HashMap<>();
- arguments.put("x-delayed-type","direct"); //延时类型,因为这里的routingkey是一个固定的值
- /**
- * 1.交换机的名称
- * 2.交换机的类型 x-delayed-message
- * 3.是否需要持久化
- * 4.是否需要自动删除
- * 5.其他的参数
- */
- return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
- true,false,arguments);
- }
-
- //绑定
- @Bean
- public Binding delayedQueueBindingDelayedExchange(
- @Qualifier("delayedQueue") Queue delayedQueue,
- @Qualifier("delayedExchange")CustomExchange delayedExchange){
- return BindingBuilder.bind(delayedQueue).to(delayedExchange)
- .with(DELAYED_ROUTING_KEY).noargs();
- }
- }
3、生产者代码
- //开始发消息,基于插件的 消息及 延迟的时间
- @GetMapping("/sendDelayMsg/{message}/{delayTime}")
- public void sendMsg(@PathVariable("message") String message,
- @PathVariable("delayTime") Integer delayTime){
- log.info("当前时间:{},发送一条时长是{}毫秒TTL信息给延迟队列delayed.queue:{}",
- new Date().toString(),delayTime,message);
-
- rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
- DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
- //发送消息的时候的延迟时长 单位ms
- msg.getMessageProperties().setDelay(delayTime);
- return msg;
- });
- }
4、消费者代码:监听延时队列,如果有消息进入该队列,则打印到控制台。
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
-
- @Slf4j
- @Component
- public class DeadLetterQueueConsumer {
- @RabbitListener(queues = "QD")
- public void receiveD(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody());
- log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
- }
- }
5、效果演示:
http://localhost:8280/ttl/sendDelayMsg/hello1/20000
http://localhost:8280/ttl/sendDelayMsg/hello2/2000
此时我们可以看到hello1 需要20秒才进入延时队列,而hello2 用时2 秒后直接进入延时队列且无需等待 hello1。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。