当前位置:   article > 正文

RabbitMQ(五)死信队列与延时队列

死信队列

一、介绍死信队列、延时队列

1、死信队列

DLX,全称为 Dead-Letter-Exchange,可以称为死信交换机,也有人称之为死信邮箱,当消息在一个正常队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列

消息变成死信,可能由于以下原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定。实际上就是设置某一个队列的属性,当这个队列中存在死信时,rabbitMQ就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

想要使用死信队列,只需要在定义队列的时候设置队列参数【x-dead-letter-exchange 指定交换机】即可

2、延时队列

延时队列存储的对象是对应的延时消息。所谓 “延时消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

在RabbitMQ中并没有支持延时队列的功能,延时队列可以通过 【过期时间 + 死信队列】 来实现

注意:创建的延时队列千万不要有消费者,一旦有消费者进行这个队列的消费,那么就不会是特定延时了,目的就是要这个队列里的消息TTL过时

具体流程如下:

 

二、延时队列的实现

结构大致如下 

 

 

1、基本的springboot配置依赖

资源文件(rabbitConfig.properties)

从中获取交换机、队列、路由key信息名

  1. #正常的订单交换机(路由模式)
  2. learn_annotation_delay_OrderTopicExchange=learn_annotation_delay_OrderTopicExchange
  3. #订单延时队列(实现超时关闭订单功能)
  4. learn_annotation_delay_OrderTopictQueue_DelayCancelQueue=learn_annotation_delay_OrderTopictQueue_DelayCancelQueue
  5. #进入延时队列的路由key
  6. learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key=delay_OrderTopictQueue_DelayCancelQueue_key
  7. #死信交换机
  8. learn_annotation_delay_DirectExchange_DLX=learn_annotation_delay_DirectExchange_DLX
  9. #死信队列
  10. learn_annotation_delay_DirectQueue_DLX_Queue=learn_annotation_delay_DirectQueue_DLX_Queue
  11. #死信队列路由key
  12. learn_annotation_delay_DirectQueue_DLX_Queue_key=delay_DLX_key

2、绑定延时队列与交换机

目的:定义延时队列并设置对应的死性交换机与路由key;绑定死性队列与交换机

  1. package com.marvin.demo.common.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.PropertySource;
  8. import org.springframework.core.env.Environment;
  9. /**
  10. * 延迟队列配置【创建所需交换机、队列,并且绑定】
  11. */
  12. @Configuration
  13. @PropertySource("classpath:rabbitConfig.properties")
  14. public class RabbitMqDelayResourceConfig {
  15. //读取资源文件信息
  16. @Autowired
  17. private Environment env;
  18. //定义订单交换机【就是一个普通的交换机】
  19. @Bean("orderExchange")
  20. public Exchange orderExchange() {
  21. return ExchangeBuilder.topicExchange(env.getProperty("learn_annotation_delay_OrderTopicExchange")).build();
  22. }
  23. //定义延时队列【订单超时自动关闭队列】
  24. /**
  25. * 定义延时队列 And 设置死信消息的转发配置【方法一】
  26. *
  27. * 1、创建一个延时队列【该队列不能有消费者,队列消息才会是死信消息】
  28. * 2、设置该队列产生死信时发送到指定死信交换机
  29. * 3、死信队列的routing key
  30. *
  31. * @return
  32. */
  33. @Bean("delayOrderCancelQueue")
  34. public Queue delayOrderCancelQueue() {
  35. return QueueBuilder.durable(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue"))
  36. // x-dead-letter-exchange 声明了队列里的死信消息转发到的DLX名称,
  37. .withArgument("x-dead-letter-exchange", env.getProperty("learn_annotation_delay_DirectExchange_DLX"))
  38. // x-dead-letter-routing-key 声明了这些死信消息在转发时携带的 routing-key 名称。
  39. .withArgument("x-dead-letter-routing-key", env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key")).build();
  40. }
  41. // /**
  42. // * 定义延时队列 And 设置死信消息的转发配置【方法二】
  43. // */
  44. // @Bean("delayQueue")
  45. // public Queue delayQueue() {
  46. // //设置死信交换机和路由key
  47. // Map<String, Object> params = new HashMap<>();
  48. // // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
  49. // params.put("x-dead-letter-exchange", env.getProperty("learn_annotation_delay_DirectExchange_DLX"));
  50. // // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
  51. // params.put("x-dead-letter-routing-key", env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key"));
  52. // return new Queue(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue"), true, false, false, params);
  53. // }
  54. //绑定延时队列与订单交换机
  55. @Bean("delayOrderCancelBinding")
  56. public Binding delayOrderCancelBinding(@Qualifier("orderExchange")Exchange orderExchange, @Qualifier("delayOrderCancelQueue")Queue delayOrderCancelQueue) {
  57. return BindingBuilder.bind(delayOrderCancelQueue).to(orderExchange).with(env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key")).noargs();
  58. }
  59. //定义死信交换机
  60. @Bean("dlxExchange")
  61. public Exchange dlxExchange() {
  62. return ExchangeBuilder.directExchange(env.getProperty("learn_annotation_delay_DirectExchange_DLX")).build();
  63. }
  64. //定义死信队列
  65. @Bean("dlxQueue")
  66. public Queue dlxQueue() {
  67. return QueueBuilder.durable(env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue")).build();
  68. }
  69. //绑定死信队列与交换机
  70. @Bean("dlxBinding")
  71. public Binding dlxBinding(@Qualifier("dlxExchange")Exchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {
  72. return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(env.getProperty("learn_annotation_delay_DirectQueue_DLX_Queue_key")).noargs();
  73. }
  74. }

3、发送延时队列消息

 目的:发送有实效的消息到延时队列,从死信队列接收消息;从而实现消息的延时消费

  1. package com.marvin.demo.service.producer;
  2. import com.marvin.demo.entity.UserBean;
  3. import com.marvin.demo.model.UserRequestModel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.AmqpException;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.MessagePostProcessor;
  8. import org.springframework.amqp.rabbit.connection.CorrelationData;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.context.annotation.PropertySource;
  12. import org.springframework.core.env.Environment;
  13. import org.springframework.stereotype.Component;
  14. import java.util.Date;
  15. import java.util.UUID;
  16. @Slf4j
  17. @Component
  18. @PropertySource("classpath:rabbitConfig.properties")
  19. public class ProducerService {
  20. @Autowired
  21. private Environment env;
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. /**
  25. * 发送消息到延时队列【订单延时关闭场景】
  26. */
  27. public void sendMsgToDelayQueue() {
  28. //订单交换机
  29. String orderExchange = env.getProperty("learn_annotation_delay_OrderTopicExchange");
  30. //延时队列路由key(消息路由到订单延时关闭队列)
  31. String routingKey = env.getProperty("learn_annotation_delay_OrderTopictQueue_DelayCancelQueue_key");
  32. UserBean userBean = new UserBean(1, "1_a", "aa");
  33. UserRequestModel userRequest = new UserRequestModel();
  34. userRequest.setType("junitTest");
  35. userRequest.setStatus("success");
  36. userRequest.setDesc(String.format("method【sendMsgToDelayQueue()】routingKey【%s】", routingKey));
  37. userRequest.setContent(userBean);
  38. //设置消息的唯一标识(根据情况可加可不加)
  39. String uuid = UUID.randomUUID().toString();
  40. CorrelationData correlationData = new CorrelationData(uuid);
  41. /**
  42. * 参数事例
  43. * rabbitTemplate.convertAndSend(String exchange, String routingKey, final Object message, final MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData)
  44. *
  45. * @param exchange 交换机
  46. * @param routingKey 路由key
  47. * @param message 消息
  48. * @param messagePostProcessor 消息处理器
  49. * @param correlationData 消息的唯一标识(根据情况可加可不加)
  50. */
  51. rabbitTemplate.convertAndSend(orderExchange, routingKey, userRequest, new MessagePostProcessor() {
  52. @Override
  53. public Message postProcessMessage(Message message) throws AmqpException {
  54. //设置消息过期时间 10 秒
  55. message.getMessageProperties().setExpiration("10000");
  56. return message;
  57. }
  58. }, correlationData);
  59. log.info("Send method【sendMsgToDelayQueue()】routingKey【{}】", routingKey);
  60. log.info("向延时队列发送时间:【{}】", new Date());
  61. }
  62. }

4、死信队列接收消息进行消费

  1. package com.marvin.demo.service.consumer;
  2. import com.marvin.demo.entity.UserBean;
  3. import com.marvin.demo.model.UserRequestModel;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.annotation.*;
  6. import org.springframework.context.annotation.PropertySource;
  7. import org.springframework.messaging.handler.annotation.Payload;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.util.ObjectUtils;
  10. import java.util.Date;
  11. /**
  12. * 死信队列消息监听服务
  13. */
  14. @Slf4j
  15. @Component
  16. @PropertySource("classpath:rabbitConfig.properties")
  17. @RabbitListener(queues = "learn_annotation_delay_DirectQueue_DLX_Queue")
  18. public class ConsumerDLXService {
  19. @RabbitHandler
  20. public void processDLX(@Payload UserRequestModel userRequestModel){
  21. log.info("Enter ConsumerDLXService --> processDLX()~~~~~~~~~~~~~~~~~~~");
  22. log.info("死信队列消费时间:【{}】", new Date());
  23. System.out.println("ConsumerDLXService queue1 msg:" + userRequestModel);
  24. System.out.println("ConsumerDLXService Object1 UserRequestModel:" + userRequestModel.toString());
  25. //获取真正的数据对象
  26. UserBean userBean = userRequestModel.getContent();
  27. System.out.println("ConsumerDLXService Object1 UserBean:" + (ObjectUtils.isEmpty(userBean) ? null : userBean.toString()));
  28. }
  29. }

结果验证

打印日志如下,可以看到从发送到延时队列,到死信队列消费,经过的耗时刚好是我们设定的10秒(真实场景下需要考虑消费者的执行效率耗时) 

  1. 2022-06-11 00:00:48.065 INFO 11600 --- [nio-8004-exec-1] c.m.d.service.producer.ProducerService : Send method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key
  2. 2022-06-11 00:00:48.076 INFO 11600 --- [ 127.0.0.1:5672] c.m.d.common.rabbitmq.MessageCallBack : 【成功】【confirmCallback】Client消息发送到Exchange ==================== correlationData【CorrelationData [id=09745a87-5ada-4876-aa87-f59b654b19b6]】, cause: 【null
  3. 2022-06-11 00:00:48.082 INFO 11600 --- [nio-8004-exec-1] c.m.d.service.producer.ProducerService : 向延时队列发送时间:【Sat Jun 11 00:00:48 CST 2022
  4. 2022-06-11 00:00:58.243 INFO 11600 --- [ntContainer#0-1] c.m.d.s.consumer.ConsumerDLXService : Enter ConsumerDLXService --> processDLX()~~~~~~~~~~~~~~~~~~~
  5. 2022-06-11 00:00:58.243 INFO 11600 --- [ntContainer#0-1] c.m.d.s.consumer.ConsumerDLXService : 死信队列消费时间:【Sat Jun 11 00:00:58 CST 2022
  6. ConsumerDLXService queue1 msg:UserRequestModel{type='junitTest', status='success', desc='method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key】', content=UserBean{id=1, username='1_a', pwd='aa'}}
  7. ConsumerDLXService Object1 UserRequestModel:UserRequestModel{type='junitTest', status='success', desc='method【sendMsgToDelayQueue()】routingKey【delay_OrderTopictQueue_DelayCancelQueue_key】', content=UserBean{id=1, username='1_a', pwd='aa'}}
  8. ConsumerDLXService Object1 UserBean:UserBean{id=1, username='1_a', pwd='aa'}

rabbitMQ服务可以看到,消息是先进入延时队列中的 ,该队列消息超时后,会转发到设定的死信队列中

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/930150
推荐阅读
相关标签
  

闽ICP备14008679号