当前位置:   article > 正文

RabbitMQ 死信队列应用_rabbitmq 死信队列场景

rabbitmq 死信队列场景

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.7.15</version>
  5. </dependency>

5.2 RabbitMQ配置

  1. @Configuration
  2. public class RabbitConfig {
  3. /**
  4. * 死信队列消息模型构建----------------------------------------------------------------------------------
  5. **/
  6. // 创建普通队列
  7. @Bean
  8. public Queue basicQueue() {
  9. Map<String, Object> params = new HashMap<>(8);
  10. // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
  11. params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);
  12. // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
  13. params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);
  14. // 注意这里是毫秒单位,这里我们给10秒
  15. params.put("x-message-ttl", 10*1000);
  16. return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);
  17. }
  18. //创建“基本消息模型”的基本交换机,面向生产者
  19. @Bean
  20. public TopicExchange basicExchange() {
  21. //创建并返回基本交换机实例
  22. return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);
  23. }
  24. //创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者
  25. @Bean
  26. public Binding basicBinding() {
  27. //创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)
  28. return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);
  29. }
  30. // 创建死信交换机
  31. @Bean
  32. public TopicExchange deadLetterExchange() {
  33. //创建并返回死信交换机实例
  34. return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);
  35. }
  36. // 创建第二个中转站
  37. // 创建死信队列
  38. @Bean
  39. public Queue deadLetterQueue() {
  40. return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);
  41. }
  42. // 创建死信路由及其绑定
  43. @Bean
  44. public Binding deadLetterBinding() {
  45. return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);
  46. }
  47. public static class Exchange {
  48. public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";
  49. public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";
  50. }
  51. public static class RoutingKey {
  52. //交换机与报表队列绑定的RoutingKey
  53. public static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";
  54. public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";
  55. }
  56. /**
  57. * 队列名称
  58. * @author peng.zhang
  59. * @date 2024/01/30
  60. */
  61. public static class MyQueue {
  62. //报表队列名称
  63. public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";
  64. //死信队列名称
  65. public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";
  66. }
  67. }

5.3 消息生产者

  1. @RestController
  2. @RequestMapping("/test")
  3. @Slf4j
  4. public class TestController {
  5. @Resource
  6. private RabbitTemplate rabbitTemplate;
  7. /**
  8. * 发送消息到死信队列
  9. */
  10. @PostMapping("/testDeadQueue")
  11. public String testDeadQueue() {
  12. // 设置生产者到交换机的确认回调
  13. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  14. log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);
  15. });
  16. // 设置消息未被队列接收时的返回回调
  17. rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {
  18. log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),
  19. replyCode, replyText, ex, routing);
  20. });
  21. // 生成关联数据并发送消息到交换机
  22. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  23. // 消息内容
  24. String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
  25. rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);
  26. log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);
  27. return "OK";
  28. }
  29. }

5.4 消息消费者

  1. @Component
  2. @Slf4j
  3. public class DeadLetterConsumer {
  4. /**
  5. * 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。
  6. * 为测试目的抛出 IOException 以模拟异常。
  7. *
  8. * @param messageBody 消息负载
  9. * @param headers 消息头
  10. * @param channel 用于消息确认的通道
  11. * @throws IOException 如果抛出异常
  12. */
  13. @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)
  14. @RabbitHandler
  15. public void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {
  16. /**
  17. * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
  18. * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
  19. * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
  20. */
  21. Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  22. log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);
  23. /**
  24. * multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
  25. * 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
  26. */
  27. // ACK,确认一条消息已经被消费
  28. // channel.basicAck(deliveryTag, false);
  29. // 对应的业务操作。。。。。
  30. // doBusiness();
  31. // 模拟消息拒绝
  32. channel.basicNack(tag, false, false);
  33. }
  34. /**
  35. * 处理业务逻辑
  36. */
  37. private void doBusiness() {
  38. System.out.println("here do some business code");
  39. }
  40. /**
  41. * 监听死信队列并处理消息。
  42. *
  43. * @param data 消息内容
  44. * @param tag 消息标签
  45. * @param channel 通道
  46. */
  47. @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)
  48. @RabbitHandler
  49. public void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {
  50. log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);
  51. // 对应的业务操作。。。。。
  52. }
  53. }

5.5 YML配置

  1. spring:
  2. rabbitmq:
  3. username: rabbitmq
  4. password: rabbitmq
  5. port: 5672
  6. host: 127.0.0.1
  7. #publisher-confirm-type参数有三个可选值:
  8. #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
  9. #CORRELATED:消息从生产者发送到交换机后触发回调方法。
  10. #NONE(默认):关闭发布确认模式。
  11. publisher-confirm-type: correlated
  12. template:
  13. receive-timeout: 1800000
  14. reply-timeout: 1800000
  15. retry:
  16. enabled: false
  17. listener:
  18. direct:
  19. retry:
  20. enabled: true
  21. default-requeue-rejected: false
  22. simple:
  23. retry:
  24. # 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
  25. enabled: true
  26. # 最大重试次数
  27. max-attempts: 1
  28. # 重试间隔时间(单位毫秒)
  29. initial-interval: 10000
  30. # 重试最大时间间隔(单位毫秒)
  31. max-interval: 300000
  32. # 应用于前一重试间隔的乘法器
  33. multiplier: 5
  34. default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/631273
推荐阅读
相关标签
  

闽ICP备14008679号