当前位置:   article > 正文

RabbitMQ的死信队列详解及实现_获取死信队列中的信息

获取死信队列中的信息

死信的概念

死信的处理方式

演示

1.引入spring-boot-start-amqp依赖

2.配置application.yml文件

3.创建两个队列,一个是业务队列,一个是死信队列

4.在controller中模拟生产者发送信息

5.启动程序,用postman调用发送信息接口


死信的概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说。

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

死信的处理方式

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

  • 丢弃,如果不是很重要,可以选择丢弃
  • 记录死信入库,然后做后续的业务分析或处理
  • 通过死信队列,由负责监听死信的应用程序进行处理

综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理

                         

演示死信队列处理,这里设定产生死信的场景是设置队列中的消息有效期为10s,超出时间未被消费者接收,就会自动添加到死信队列,消费者端监听死信队列,并进行业务处理。

演示

演示环境

RabbitMQ3.8.5+Spring Boot 2.3.0 RELEASE+JAVA 8

1.引入spring-boot-start-amqp依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.配置application.yml文件

  1. rabbitmq:
  2. host: localhost
  3. port: 5673
  4. username: guest
  5. password: guest
  6. # 开启消息确认机制
  7. publisher-confirm-type: correlated
  8. # 开启消息发送到队列失败返回
  9. publisher-returns: true

3.创建两个队列,一个是业务队列,一个是死信队列

  1. package com.tdrc.common.core.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @author dpf
  12. * @version 1.0
  13. * @date 2020-6-22 9:52
  14. * @instruction ...
  15. */
  16. @Configuration
  17. public class RabbitExChangeConfig {
  18. /**
  19. * 业务交换机
  20. */
  21. public static final String DESTINATION_NAME = "rabbitMq_direct";
  22. /**
  23. * 业务队列名称
  24. */
  25. public static final String SMS_QUEUE = "Sms_msg";
  26. /**
  27. * 死信队列交换机名称
  28. */
  29. public static final String DEAD_LETTER_EXCHANGE_NAME="deadLetter_direct";
  30. /**
  31. * 死信队列名称
  32. */
  33. public static final String DEAD_LETTER_QUEUE = "deadLetter_queue";
  34. /**
  35. * RouteKey
  36. */
  37. public static final String SMS_ROUTING_KEY = "sms";
  38. /**
  39. * 配置死信交换机
  40. * @return
  41. */
  42. @Bean
  43. public DirectExchange deadLetterDirectExchange(){
  44. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  45. }
  46. /**
  47. * 配置死信队列
  48. * @return
  49. */
  50. @Bean
  51. public Queue deadLetterQueue(){
  52. return new Queue(DEAD_LETTER_QUEUE);
  53. }
  54. /**
  55. * 绑定死信队列和死信交换机
  56. * @return
  57. */
  58. @Bean
  59. Binding deadLetterBindingDirect() {
  60. return BindingBuilder.bind(deadLetterQueue()).to(deadLetterDirectExchange()).with(SMS_ROUTING_KEY);
  61. }
  62. /**
  63. * 配置队列
  64. * @return
  65. */
  66. @Bean
  67. public Queue smsDirectQueue() {
  68. Map<String, Object> args = new HashMap<>(16);
  69. // 队列消息过期时间
  70. args.put("x-message-ttl", 10000);
  71. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  72. args.put("x-dead-letter-routing-key", SMS_ROUTING_KEY);
  73. // args.put("x-expires", 5000);队列过期时间
  74. // args.put("x-max-length",5 );
  75. return new Queue(SMS_QUEUE, true,false,false,args);
  76. }
  77. /**
  78. * 配置交换机
  79. * @return
  80. */
  81. @Bean
  82. public DirectExchange directExchange() {
  83. return new DirectExchange(DESTINATION_NAME);
  84. }
  85. /**
  86. * 交换机与队列绑定
  87. * @return
  88. */
  89. @Bean
  90. Binding smsBindingDirect() {
  91. return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with(SMS_ROUTING_KEY);
  92. }
  93. @Bean
  94. public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  95. SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
  96. new SimpleRabbitListenerContainerFactory();
  97. //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
  98. simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
  99. //这边设置消息确认方式由自动确认变为手动确认
  100. simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  101. //设置消息预取数量
  102. // simpleRabbitListenerContainerFactory.setPrefetchCount(1);
  103. return simpleRabbitListenerContainerFactory;
  104. }
  105. /**
  106. * 每个rabbitTemplate方法只可以有一个回调,不然会报错 only one ConfirmCallback is supported by each RabbitTemplate,解决办法是配成多利的
  107. *
  108. * @param connectionFactory
  109. * @return
  110. */
  111. @Bean
  112. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  113. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  114. //成功回调
  115. template.setConfirmCallback(new Callback());
  116. // 开启mandatory模式(开启失败回调)
  117. template.setMandatory(true);
  118. //失败回调
  119. template.setReturnCallback(new Callback());
  120. return template;
  121. }
  122. }

4.在controller中模拟生产者发送信息

  1. @Resource
  2. private RabbitTemplate rabbitTemplate;
  3. @GetMapping("/sendSms")
  4. private void sendSms() throws InterruptedException {
  5. String msg = "HelloWorld rabbitmq";
  6. for(Integer i=0;i<10;i++){
  7. CorrelationData correlationData = new CorrelationData(i.toString());
  8. rabbitTemplate.convertAndSend(RabbitExChangeConfig.DESTINATION_NAME, RabbitExChangeConfig.SMS_ROUTING_KEY, msg+i ,correlationData);
  9. }
  10. }

5.启动程序,用postman调用发送信息接口

  1. @GetMapping("/sendSms")
  2. private void sendSms() throws InterruptedException {
  3. String msg = "HelloWorld rabbitmq";
  4. for(Integer i=0;i<10;i++){
  5. CorrelationData correlationData = new CorrelationData(i.toString());
  6. rabbitTemplate.convertAndSend(RabbitExChangeConfig.DESTINATION_NAME, RabbitExChangeConfig.SMS_ROUTING_KEY, msg+i ,correlationData);
  7. }
  8. }

启动程序前消息对列中无程序内创建的业务队列和死信队列。

消息发送后会产生两个交换机和两个队列,一个队列是Sms_msg,一个是deadLetter_queue,消息记录为10条

10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。 

程序中添加死信队列消费者监控代码,重新启动程序

  1. @RabbitListener(queues = RabbitExChangeConfig.DEAD_LETTER_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
  2. public void reciveDeadLetter(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
  3. long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  4. try {
  5. System.out.println("死信队列消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
  6. /**
  7. * 手动ack
  8. * deliveryTag:该消息的index
  9. * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
  10. */
  11. channel.basicAck(deliveryTag, false);
  12. } catch (Exception e) {
  13. //消息退回 (可以在可视化界面看到)
  14. //批量退回 退回之后重回消息队列 true false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
  15. channel.basicNack(deliveryTag, false, true);
  16. //单条退回 channel.basicReject();
  17. }
  18. }

程序启动后消费端接收到死信对列里的信息

测试查看死信消息队列中没有需要接收的消息。

实际环境我们还需要对死信队列进行一个监听和处理,当然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/509800
推荐阅读
相关标签
  

闽ICP备14008679号