赞
踩
DLX,全称为 Dead-Letter-Exchange,可以称为死信交换机,也有人称之为死信邮箱,当消息在一个正常队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信,可能由于以下原因:
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定。实际上就是设置某一个队列的属性,当这个队列中存在死信时,rabbitMQ就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
想要使用死信队列,只需要在定义队列的时候设置队列参数【x-dead-letter-exchange 指定交换机】即可。
延时队列存储的对象是对应的延时消息。所谓 “延时消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中并没有支持延时队列的功能,延时队列可以通过 【过期时间 + 死信队列】 来实现
注意:创建的延时队列千万不要有消费者,一旦有消费者进行这个队列的消费,那么就不会是特定延时了,目的就是要这个队列里的消息TTL过时
具体流程如下:
结构大致如下
资源文件(rabbitConfig.properties)
从中获取交换机、队列、路由key信息名
-
- #正常的订单交换机(路由模式)
- learn_annotation_delay_OrderTopicExchange=learn_annotation_delay_OrderTopicExchange
- #订单延时队列(实现超时关闭订单功能)
- learn_annotation_delay_OrderTopictQueue_DelayCancelQueue=learn_annotation_delay_OrderTopictQueue_DelayCancelQueue
- #进入延时队列的路由key
- learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key=delay_OrderTopictQueue_DelayCancelQueue_key
-
-
- #死信交换机
- learn_annotation_delay_DirectExchange_DLX=learn_annotation_delay_DirectExchange_DLX
- #死信队列
- learn_annotation_delay_DirectQueue_DLX_Queue=learn_annotation_delay_DirectQueue_DLX_Queue
- #死信队列路由key
- learn_annotation_delay_DirectQueue_DLX_Queue_key=delay_DLX_key
目的:定义延时队列并设置对应的死性交换机与路由key;绑定死性队列与交换机
- package com.marvin.demo.common.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.core.env.Environment;
-
- /**
- * 延迟队列配置【创建所需交换机、队列,并且绑定】
- */
- @Configuration
- @PropertySource("classpath:rabbitConfig.properties")
- public class RabbitMqDelayResourceConfig {
-
- //读取资源文件信息
- @Autowired
- private Environment env;
-
-
- //定义订单交换机【就是一个普通的交换机】
- @Bean("orderExchange")
- public Exchange orderExchange() {
- return ExchangeBuilder.topicExchange(env.getProperty("learn_annotation_delay_OrderTopicExchange")).build();
- }
-
- //定义延时队列【订单超时自动关闭队列】
- /**
- * 定义延时队列 And 设置死信消息的转发配置【方法一】
- *
- * 1、创建一个延时队列【该队列不能有消费者,队列消息才会是死信消息】
- * 2、设置该队列产生死信时发送到指定死信交换机
- * 3、死信队列的routing key
- *
- * @return
- */
- @Bean("delayOrderCancelQueue")
- public Queue delayOrderCancelQueue() {
- return QueueBuilder.durable(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue"))
- // x-dead-letter-exchange 声明了队列里的死信消息转发到的DLX名称,
- .withArgument("x-dead-letter-exchange", env.getProperty("learn_annotation_delay_DirectExchange_DLX"))
- // x-dead-letter-routing-key 声明了这些死信消息在转发时携带的 routing-key 名称。
- .withArgument("x-dead-letter-routing-key", env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key")).build();
- }
- // /**
- // * 定义延时队列 And 设置死信消息的转发配置【方法二】
- // */
- // @Bean("delayQueue")
- // public Queue delayQueue() {
- // //设置死信交换机和路由key
- // Map<String, Object> params = new HashMap<>();
- // // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
- // params.put("x-dead-letter-exchange", env.getProperty("learn_annotation_delay_DirectExchange_DLX"));
- // // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
- // params.put("x-dead-letter-routing-key", env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key"));
- // return new Queue(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue"), true, false, false, params);
- // }
-
- //绑定延时队列与订单交换机
- @Bean("delayOrderCancelBinding")
- public Binding delayOrderCancelBinding(@Qualifier("orderExchange")Exchange orderExchange, @Qualifier("delayOrderCancelQueue")Queue delayOrderCancelQueue) {
- return BindingBuilder.bind(delayOrderCancelQueue).to(orderExchange).with(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key")).noargs();
- }
-
-
-
- //定义死信交换机
- @Bean("dlxExchange")
- public Exchange dlxExchange() {
- return ExchangeBuilder.directExchange(env.getProperty("learn_annotation_delay_DirectExchange_DLX")).build();
- }
-
- //定义死信队列
- @Bean("dlxQueue")
- public Queue dlxQueue() {
- return QueueBuilder.durable(env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue")).build();
- }
-
- //绑定死信队列与交换机
- @Bean("dlxBinding")
- public Binding dlxBinding(@Qualifier("dlxExchange")Exchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {
- return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key")).noargs();
- }
- }
目的:发送有实效的消息到延时队列,从死信队列接收消息;从而实现消息的延时消费
- package com.marvin.demo.service.producer;
-
- import com.marvin.demo.entity.UserBean;
- import com.marvin.demo.model.UserRequestModel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.core.env.Environment;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
- import java.util.UUID;
-
- @Slf4j
- @Component
- @PropertySource("classpath:rabbitConfig.properties")
- public class ProducerService {
-
- @Autowired
- private Environment env;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- /**
- * 发送消息到延时队列【订单延时关闭场景】
- */
- public void sendMsgToDelayQueue() {
-
- //订单交换机
- String orderExchange = env.getProperty("learn_annotation_delay_OrderTopicExchange");
- //延时队列路由key(消息路由到订单延时关闭队列)
- String routingKey = env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key");
-
- UserBean userBean = new UserBean(1, "1_a", "aa");
- UserRequestModel userRequest = new UserRequestModel();
- userRequest.setType("junitTest");
- userRequest.setStatus("success");
- userRequest.setDesc(String.format("method【sendMsgToDelayQueue()】routingKey【%s】", routingKey));
- userRequest.setContent(userBean);
-
- //设置消息的唯一标识(根据情况可加可不加)
- String uuid = UUID.randomUUID().toString();
- CorrelationData correlationData = new CorrelationData(uuid);
-
- /**
- * 参数事例
- * rabbitTemplate.convertAndSend(String exchange, String routingKey, final Object message, final MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData)
- *
- * @param exchange 交换机
- * @param routingKey 路由key
- * @param message 消息
- * @param messagePostProcessor 消息处理器
- * @param correlationData 消息的唯一标识(根据情况可加可不加)
- */
- rabbitTemplate.convertAndSend(orderExchange, routingKey, userRequest, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- //设置消息过期时间 10 秒
- message.getMessageProperties().setExpiration("10000");
- return message;
- }
- }, correlationData);
- log.info("Send method【sendMsgToDelayQueue()】routingKey【{}】", routingKey);
- log.info("向延时队列发送时间:【{}】", new Date());
-
- }
- }
- package com.marvin.demo.service.consumer;
-
- import com.marvin.demo.entity.UserBean;
- import com.marvin.demo.model.UserRequestModel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.*;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.messaging.handler.annotation.Payload;
- import org.springframework.stereotype.Component;
- import org.springframework.util.ObjectUtils;
- import java.util.Date;
-
- /**
- * 死信队列消息监听服务
- */
- @Slf4j
- @Component
- @PropertySource("classpath:rabbitConfig.properties")
- @RabbitListener(queues = "learn_annotation_delay_DirectQueue_DLX_Queue")
- public class ConsumerDLXService {
-
- @RabbitHandler
- public void processDLX(@Payload UserRequestModel userRequestModel){
- log.info("Enter ConsumerDLXService --> processDLX()~~~~~~~~~~~~~~~~~~~");
- log.info("死信队列消费时间:【{}】", new Date());
- System.out.println("ConsumerDLXService queue1 msg:" + userRequestModel);
- System.out.println("ConsumerDLXService Object1 UserRequestModel:" + userRequestModel.toString());
- //获取真正的数据对象
- UserBean userBean = userRequestModel.getContent();
- System.out.println("ConsumerDLXService Object1 UserBean:" + (ObjectUtils.isEmpty(userBean) ? null : userBean.toString()));
- }
-
-
-
-
- }
打印日志如下,可以看到从发送到延时队列,到死信队列消费,经过的耗时刚好是我们设定的10秒(真实场景下需要考虑消费者的执行效率耗时)
- 2022-06-11 00:00:48.065 INFO 11600 --- [nio-8004-exec-1] c.m.d.service.producer.ProducerService : Send method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key】
- 2022-06-11 00:00:48.076 INFO 11600 --- [ 127.0.0.1:5672] c.m.d.common.rabbitmq.MessageCallBack : 【成功】【confirmCallback】Client消息发送到Exchange ==================== correlationData【CorrelationData [id=09745a87-5ada-4876-aa87-f59b654b19b6]】, cause: 【null】
- 2022-06-11 00:00:48.082 INFO 11600 --- [nio-8004-exec-1] c.m.d.service.producer.ProducerService : 向延时队列发送时间:【Sat Jun 11 00:00:48 CST 2022】
- 2022-06-11 00:00:58.243 INFO 11600 --- [ntContainer#0-1] c.m.d.s.consumer.ConsumerDLXService : Enter ConsumerDLXService --> processDLX()~~~~~~~~~~~~~~~~~~~
- 2022-06-11 00:00:58.243 INFO 11600 --- [ntContainer#0-1] c.m.d.s.consumer.ConsumerDLXService : 死信队列消费时间:【Sat Jun 11 00:00:58 CST 2022】
- ConsumerDLXService queue1 msg:UserRequestModel{type='junitTest', status='success', desc='method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key】', content=UserBean{id=1, username='1_a', pwd='aa'}}
- ConsumerDLXService Object1 UserRequestModel:UserRequestModel{type='junitTest', status='success', desc='method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key】', content=UserBean{id=1, username='1_a', pwd='aa'}}
- ConsumerDLXService Object1 UserBean:UserBean{id=1, username='1_a', pwd='aa'}
从rabbitMQ服务可以看到,消息是先进入延时队列中的 ,该队列消息超时后,会转发到设定的死信队列中
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。