赞
踩
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
交换机类型
上面的 订阅发布模式
、路由模式
以及主题模式
使用到了不同的交换机,分别是:
Direct Exchange(直连)
直连交换机
被应用在路由模式
下,该交换机需要通过特定的routingKey
来绑定队列,交换机只有接收到了匹配的routingKey
才会将消息转发到对应的队列中,否则就不会转发消息。
路由模式
使用直连交换机
,该模式下根据routingKey
绑定特定的队列。
Fanout Exchange(扇形)
扇形交换机
没有路由键的概念,只需将队列绑定在交换机上,发送到交换机上的消息会转发到交换机所以绑定的队列里面,类似广播,只要打开收音机都能接收到广播消息。扇形交换机
应用于发布订阅模式
。
Topic Exchange(主题)
主题模式
是将路由键根据一个主题进行分类,和直连模式
不同的是,直连模式
绑定特定
的路由键,而主题模式
使用通配符绑定路由键,绑定键有两种:
*
表示可以匹配仅一个
。#
表示可以匹配零个或多个
。+ps:这里说下,截止到作者发稿,目前docker很多站点被封(由于特殊原因导致),博主目前已经研究了一套方法去实现快速拉取镜像,如果您有需要,可在博客下面留言,人数多的话博主会考虑出一期教程帮助大家解决问题。
docker pull rabbitmq:3.13.0
注意修改用户名和密码
docker run -d -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--hostname myRabbit \
--name smkj-rabbitmq \
rabbitmq:3.13.0
参数说明:
启动完成后可以通过 docker ps 命令来查看容器是否启动 还可以设置docker启动时自动启动
docker update smkj-rabbitmq --restart=always
//进入容器内部 我这里使用容器名字进入 也可以使用容器id
docker exec -it smkj-rabbitmq /bin/bash
----------------------------------
//开启web后台管理界面
rabbitmq-plugins enable rabbitmq_management
如果无法访问 可以尝试打开防火墙 如果是在阿里或者腾讯之类的服务器 要打开安全组的端口!! 到这里 我们docker安装RabbitMQ就完成了 接下来进行延迟插件的安装
输入账号密码 都是 admin 登录
插件下载
下载地址 https://www.rabbitmq.com/community-plugins.html
因为我刚才拉的版本是3.13.0 这里也要对应上
将插件上传至服务器,我是放在下面的路径
将刚刚上传的插件拷贝到容器内plugins目录下
docker cp /home/mydata/rabbitmq/rabbitmq_delayed_message_exchange-3.13.0.ez smkj-rabbitmq:/plugins
上传之后进入容器内部
//进入容器 我这里使用容器名字 也可以用容器id进入
docker exec -it smkj-rabbitmq /bin/bash
-------------------------------------
//移动到plugins目录下
cd plugins
-------------------------------------
//查看是否上传成功
ls
启动延时队列
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器,然后重启容器
//我这里还是使用容器名称 也可以使用容器id
docker restart smkj-rabbitmq
容器启动成功之后,登录RabbitMQ的管理界面(ip:15672 访问web界面),找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。
最简单的消息发送
特点
点对点模式
,生产者发送消息经过队列直接发送给消费者。Exchange
交换机。生产消息:
@GetMapping("/simple-send")
public String simpleSend() {
rabbitTemplate.convertAndSend("simple","this is news");
return "ok";
}
消费消息
@RabbitListener(queuesToDeclare = @Queue("simple"))
public void consume(String message) {
System.out.println(message);
}
输出:
this is news
无需创建交换机和绑定队列,只需要匹配发送端和消费端的队列名称就能成功发送消息。
在多个消费者之间分配任务
特点
工作模式
和简单模式
差不多,只需要生产端、消费端、队列。
不同在于一个生产者、一个队列对应多个消费者
,也就是一对多的关系。
在多个消费者之间分配消息,类似轮询发送消息,每个消息都只发给一个消费者。
生产消息:
@GetMapping("/work-send")
public String simpleSend() {
rabbitTemplate.convertAndSend("work","this is news");
return "ok";
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consume(String message) {
System.out.println("first:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consumeSecond(String message) {
System.out.println("second:" + message);
}
创建一个生产者,两个消费者,发送两条消息,两个消费者分别接收到消息,输出:
first:this is news
second:this is news
两个消费者,轮流消费消息。类似nginx负载均衡
。
一次向多个消费者发送消息
特点
发布订阅类似广播消息,每个消息可以同时发送给订阅该消息的消费者,
上图中的X
表示交换机,使用的扇形交换机
(fanout),它将发送的消息发送到所有绑定交换机的队列。
创建队列、交换机以及绑定:
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psFirstQueue() { return new Queue("psFirstQueue"); } @Bean public Queue psSecondQueue() { return new Queue("psSecondQueue"); } @Bean public Queue psThirdQueue() { return new Queue("psThirdQueue"); } @Bean public Binding routingFirstBinding() { return BindingBuilder.bind(psFirstQueue()).to(fanoutExchange()); } @Bean public Binding routingSecondBinding() { return BindingBuilder.bind(psSecondQueue()).to(fanoutExchange()); } @Bean public Binding routingThirdBinding() { return BindingBuilder.bind(psThirdQueue()).to(fanoutExchange()); }
fanoutExchange
。psFirstQueue
、psSecondQueue
、psThirdQueue
。routingKey
,直接绑定即可。@GetMapping("/publish-sub-send")
public String publishSubSend() {
rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, "publish/subscribe hello");
return "ok";
}
无需指定routingKey
,设置为null
。
@RabbitListener(queues = "psFirstQueue")
public void pubsubQueueFirst(String message) {
System.out.println("【first】:" + message);
}
@RabbitListener(queues = "psSecondQueue")
public void pubsubQueueSecond(String message) {
System.out.println("【second】:" + message);
}
@RabbitListener(queues = "psThirdQueue")
public void pubsubQueueThird(String message) {
System.out.println("【third】:" + message);
}
【first】: publish/subscribe hello
【second】: publish/subscribe hello
【third】: publish/subscribe hello
发送一条消息,绑定的队列都能接收到消息。
根据
routingKey
有选择性的接收消息
特点
每个队列根据不同routingKey
绑定交换机
消息发送到交换机后通过routingKey
发送给特定的队列,然后传到消费者消费。
交换由扇形交换机
(fanout)改成直连交换机
(direct)。
创建队列、交换机以及绑定:
@Bean public Queue routingFirstQueue() { return new Queue("routingFirstQueue"); } @Bean public Queue routingSecondQueue() { return new Queue("routingSecondQueue"); } @Bean public Queue routingThirdQueue() { return new Queue("routingThirdQueue"); } @Bean public DirectExchange routingExchange() { return new DirectExchange("routingExchange"); } @Bean public Binding routingFirstBind() { return BindingBuilder.bind(routingFirstQueue()).to(routingExchange()).with("firstRouting"); } @Bean public Binding routingSecondBind() { return BindingBuilder.bind(routingSecondQueue()).to(routingExchange()).with("secondRouting"); } @Bean public Binding routingThirdBind() { return BindingBuilder.bind(routingThirdQueue()).to(routingExchange()).with("thirdRouting"); }
routingExchange
,根据不同的routingKey
绑定不同的队列:firstRouting
路由键绑定routingFirstQueue
队列。secondRouting
路由键绑定routingSecondQueue
队列。thirdRouting
路由键绑定routingThirdQueue
队列。@GetMapping("/routing-first")
public String routingFirst() {
// 使用不同的routingKey 转发到不同的队列
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
rabbitTemplate.convertAndSend("routingExchange","secondRouting"," second routing message");
rabbitTemplate.convertAndSend("routingExchange","thirdRouting"," third routing message");
return "ok";
}
@RabbitListener(queues = "routingFirstQueue")
public void routingFirstListener(String message) {
System.out.println("【routing first】" + message);
}
@RabbitListener(queues = "routingSecondQueue")
public void routingSecondListener(String message) {
System.out.println("【routing second】" + message);
}
@RabbitListener(queues = "routingThirdQueue")
public void routingThirdListener(String message) {
System.out.println("【routing third】" + message);
}
输出:
【routing first】first routing message
【routing second】second routing message
【routing third】third routing message
分析:
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
消息从生产者指定
firstRouting
路由键,找到对应的绑定队列routingFirstQueue
,就被routingFirstQueue
队列消费了。
基于某个主题接收消息
特点
路由模式
发送的消息,是需要指定固定的routingKey
,如果想要针对一类路由。
比如:
.com
结尾的消息。www.
开头的消息。主题模式
就派上场了,路由模式
和主题模式
类似,路由模式
是设置特定的routingKey
绑定唯一的队列,而主题模式
的是使用通配符
匹配一个或者多个
队列。
@Bean public Queue topicFirstQueue() { return new Queue("topicFirstQueue"); } @Bean public Queue topicSecondQueue() { return new Queue("topicSecondQueue"); } @Bean public Queue topicThirdQueue() { return new Queue("topicThirdQueue"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); }
通配符
绑定交换机和交换机:@Bean
public Binding topicFirstBind() {
// .com 为结尾
return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with("*.com");
}
@Bean
public Binding topicSecondBind() {
// www.为开头
return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with("www.#");
}
通配符
有两种,*
和#
,
*
表示可以匹配一个
。#
表示可以匹配多个
。比如:
#.com
表示接收多个
以.com
结尾的字段。
taobao.com
、www.taobao.com
、www.jd.com
。*.com
表示接收一个
以.com
结尾的字段。
taobao.com
、jd.com
。www.taobao.com
、cn.taobao.com
。www.#
可以匹配多个
以www
开头的字段。
www.taobao
、www.jd
。www.*
可以匹配一个
以www
开头的字段。
www.taobao
、www.jd
。www.taobao.com
、www.jd.com
。@GetMapping("/topic-first-send")
public String topicFirstSend() {
rabbitTemplate.convertAndSend("topicExchange","www.taobao.com","www.taobao.com");
rabbitTemplate.convertAndSend("topicExchange","taobao.com","taobao.com");
rabbitTemplate.convertAndSend("topicExchange","www.jd","www.jd");
return "topic ok";
}
@RabbitListener(queues = "topicFirstQueue")
public void topicFirstListener(String message) {
System.out.println("【topic first】" + message);
}
@RabbitListener(queues = "topicSecondQueue")
public void topicSecondListener(String message) {
System.out.println("【topic second】" + message);
}
【topic second】www.taobao.com
【topic first】taobao.com
【topic second】www.jd
www.#
可以匹配多个以www.
开头的路由键,例如www.taobao.com
、www.jd
。而*.com
只能匹配一个以.com
结尾的路由键,例如taobao.com
,而无法匹配www.taobao.com
。
消息有返回值
特点
PRC
模式和上面的几种模式唯一不同的点在于,该模式可以收到消费端的返回值
。
生成端接收消费端的返回值。
消费端添加返回值:
@RabbitListener(queuesToDeclare =@Queue("rpcQueue"))
public String rpcListener(String message) {
System.out.println("【rpc接收消息】" + message);
return "rpc 返回" + message;
}
@GetMapping("/rpc-send")
public void rpcSend() {
Object receive = rabbitTemplate.convertSendAndReceive("rpcQueue","rpc send message");
System.out.println("【发送消息消息】" + receive);
}
【rpc接收消息】rpc send message
【发送端接收消息】rpc 返回rpc send message
整合SpringBoot
实现RabbitMQ
六种工作模式,并详细讲解RabbitMQ
六种工作模式:
routingKey
即可。连接数
交换机
死信
顾名思义,就是死掉的信息,英文是Dead Letter。死信交换机(Dead-Letter-Exchange)
和普通交换机没有区别,都是可以接受信息并转发到与之绑定并能路由到的队列,区别在于死信交换机
是转发死信
的,而和该死信交换机
绑定的队列就是死信队列
。说的再通俗一点,死信交换机和死信队列其实都只是普通的交换机和队列,只不过接受、转发的信息是死信
,其他操作并没有区别。
称为死信
的信息,需要如下几个条件:
死信队列的应用:
代码如下:先创建死信队列和死信交换机 topic,普通队列和普通交换机,topic
package com.hjt.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*** * 死信队列和普通队列配置 */ @Configuration public class DeadLetterQueueConfig { // 普通的交换机和队列 public static final String NORMAL_EXCHANGE = "normal-exchange"; public static final String NORMAL_QUEUE = "normal-queue"; public static final String NORMAL_ROUTING_KEY = "normal.#"; //死信队列和交换机 public static final String DEAD_EXCHANGE = "dead-exchange"; public static final String DEAD_QUEUE = "dead-queue"; public static final String DEAD_ROUTING_KEY = "dead.#"; /** * 创建普通的交换机 * @return */ @Bean public Exchange normalExchange(){ return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build(); } /** * 创建普通的队列,然后绑定死信队列和交换机 * @return */ @Bean public Queue normalQueue(){ //普通队列这里需要绑定死信交换机 return QueueBuilder .durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) //死信交换机 .deadLetterRoutingKey("dead.hello") //绑定的routing_key .build(); } /** * 绑定交换机和队列 * @param normalQueue 队列 * @param normalExchange 交换机 * @return */ @Bean public Binding normalBinding(Queue normalQueue,Exchange normalExchange){ return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs(); } /** * 创建死信交换机 * @return */ @Bean public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } /** * 创建死信队列 * @return */ @Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 死信队列绑定死信交换机,并且设置死信topic * @param deadQueue 队列 * @param deadExchange 交换机 * @return */ @Bean public Binding deadBinding(Queue deadQueue,Exchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
创建普通生成者
package com.hjt.producer; import com.hjt.config.DeadLetterQueueConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /*** * 死信队列生成者发送端 */ @Component public class NormalProducer { @Resource private RabbitTemplate rabbitTemplate; public void publishDead(){ rabbitTemplate.convertAndSend(DeadLetterQueueConfig.NORMAL_EXCHANGE,"normal.topic","今天晚上有什么安排?"); System.out.println("消息发送成功"); } }
创建普通消费者
package com.hjt.consumer; import com.hjt.config.DeadLetterQueueConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /*** * 创建死信消费队列 */ @Component public class NormalConsumer { @RabbitListener(queues = DeadLetterQueueConfig.NORMAL_QUEUE) public void consume(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到normal-queue队列的消息:"+msg); //消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); /** * channel.basicNack(deliveryTag, multiple, requeue) 是 RabbitMQ 的 Java 客户端中用于拒绝(Nack)一条或多条消息的方法。下面是对该方法的参数进行解释: * deliveryTag:消息的交付标签(delivery tag),用于唯一标识一条消息。通过 message.getMessageProperties().getDeliveryTag() 获取消息的交付标签。 * multiple:是否拒绝多条消息。如果设置为 true,则表示拒绝交付标签小于或等于 deliveryTag 的所有消息;如果设置为 false,则只拒绝交付标签等于 deliveryTag 的消息。 * requeue:是否重新入队列。如果设置为 true,则被拒绝的消息会重新放回原始队列中等待重新投递;如果设置为 false,则被拒绝的消息会被丢弃。 */ //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } }
创造死信队列消费者
package com.hjt.consumer; import com.hjt.config.DeadLetterQueueConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; @Component public class DeadConsumer { @RabbitListener(queues = DeadLetterQueueConfig.DEAD_QUEUE) public void consume(String msg, Channel channel, Message message) throws IOException { System.out.println("我是死信队列:"+msg); //消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
执行test测试
package com.hjt.controller; import com.hjt.producer.NormalProducer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping("/dead") public class DealController { @Resource private NormalProducer normalProducer; @GetMapping(value = "/test") public void test(){ normalProducer.publishDead(); } }
调用test的时候,运行结果如下所示
消息发送成功
接收到normal-queue队列的消息:今天晚上有什么安排?
我是死信队列:今天晚上有什么安排?
代码如下:
package com.hjt.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DelayedConfig { public static final String DELAYED_EXCHANGE = "delayed-exchange"; public static final String DELAYED_QUEUE = "delayed-queue"; public static final String DELAYED_ROUTING_KEY = "delayed.#"; /** * 创建一个延迟交换机(Exchange)并返回该交换机对象。 * @return */ @Bean public Exchange delayedExchange(){ //创建了一个HashMap对象,用于存储交换机的属性。然后,将一个名为x-delayed-type的属性和值为"topic"的键值对添加到HashMap中。 HashMap<String, Object> map = new HashMap<>(); map.put("x-delayed-type","topic"); /** * 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。 * ,交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。 */ return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map); } /** * 创建队列 * @return */ @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DELAYED_QUEUE).build(); } /** * 绑定交换机和队列 * @param delayedQueue * @param delayedExchange * @return */ @Bean public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
延迟队列生成者
package com.hjt.producer; import cn.hutool.core.date.DateUtil; import com.hjt.config.DelayedConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; @Component @Slf4j public class DelayProducer { @Resource private RabbitTemplate rabbitTemplate; public void publishDelay() { rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.hello", "大江东去浪淘尽", new MessagePostProcessor() { //创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为50,000毫秒(即50秒) @Override public Message postProcessMessage(Message message) throws AmqpException { log.error("延迟队列发送的时间{}", DateUtil.now()); // //设置消息的延迟时间,单位为毫秒。 // message.getMessageProperties().setDelay(); // 使用x-delay头设置延迟时间(单位为毫秒) Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put("x-delay",5000); return message; } }); System.out.println("消息发送成功"); } }
输出结果
2024-08-08 11:19:18.416 ERROR 23148 --- [nio-8184-exec-1] com.hjt.producer.DelayProducer : 延迟队列发送的时间2024-08-08 11:19:18
消息发送成功
2024-08-08 11:19:23.434 ERROR 23148 --- [ntContainer#1-1] com.hjt.consumer.DelayConsumer : 延迟队列消费的时间2024-08-08 11:19:23
2024-08-08 11:19:23.434 ERROR 23148 --- [ntContainer#1-1] com.hjt.consumer.DelayConsumer : 我是延迟消费,消息为大江东去浪淘尽
这里需要主要,如果是 message.getMessageProperties().setDelay(); 这种设置方式的话,是int类型,因为单位是毫秒,最长时间不会超过
24.8天,如果是 Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put(“x-delay”,5000);这种方式的话,因为单位是long,最长时间可以设置大约相当于 292,471,208 年。
ps:这里需要注意有一个问题:
就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;
解决方法:下载延迟交换机插件(上面有介绍)
上述代码地址
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。