当前位置:   article > 正文

RabbitMQ高级:死信队列详解_rabbitmq dlx

rabbitmq dlx

应用场景:下单20分钟还未完成支付,订单自动取消(转移到死信队列,并更新订单状态)

什么是死信队列?

DLX (全称:Dead-Letter-Exchange 可称为死信交换机),如果一条消息成为死信 dead message,它不是直接丢弃掉,而是再转发到另外一个交换机,由这个交换机来处理这条死信。即DLX,绑定DLX的队列称之为死信队列。

DLX与其它正常交换机无区别,当队列中存在死信时,RabbitMQ自动会将此消息重新发布到设置的死信交换机中,进而被路由到死信队列

结论: 死信队列相当于接盘侠,如果某个队列设置了接盘的死信队列,当队列内的消息无法正常消费时,该条消息会被重新路由到指定的死信队列

如何判断一条消息是死信?

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

消息被应答机制拒绝时、队列是过期队列,队列中的消息过期时、队列满了无法接收新消息

接盘侠设置

这里将展示两种变更为死信消息的场景,一种是消息过期,一种是达到队列最大长度

过期队列:消息过期

1. 编写 Dead_Queue_DirectConfiguration 配置类,声明创建死信交换机、队列、绑定关系

  1. @Configuration
  2. public class Dead_Queue_DirectConfiguration {
  3. // 1.声明创建direct路由模式的交换机——死信交换机
  4. @Bean
  5. public DirectExchange getDead_DirectExchange(){
  6. return new DirectExchange("dead_Exchange",true,false);
  7. }
  8. // 2.声明创建队列——死信队列
  9. @Bean
  10. public Queue getDead_Queue1(){
  11. return new Queue("dead_Queue1",true);
  12. }
  13. // 3.绑定交换机与队列的关系,并设置交换机与队列之间的BindingKey
  14. @Bean
  15. public Binding getBinding_TTL(){
  16. // 只有投递消息时指定的RoutingKey与这个BindingKey(dead)匹配上,消息才会被投递到该队列
  17. return BindingBuilder.bind(getDead_Queue1()).to(getDead_DirectExchange()).with("dead");
  18. }
  19. }

!! 注意:死信交换机和死信队列,与普通交换机、队列的声明创建方式一样

2. 编写 TTL_Queue_DirectConfiguration 配置类,声明创建一个过期队列

关于TTL过期队列和消息的设置,代码和项目结构与上篇博文《RabbitMQ高级:TTL过期队列/消息设置》一致,详细讲解请移步翻阅。

  1. @Configuration
  2. public class TTL_Queue_DirectConfiguration {
  3. // 1.声明创建direct路由模式的交换机
  4. @Bean
  5. public DirectExchange getDirectExchange(){
  6. return new DirectExchange("direct_Exchange",true,false);
  7. }
  8. // 2.声明创建过期队列队列
  9. @Bean
  10. public Queue getTTL_Queue1(){
  11. // 2.1 设置过期队列——该队列内的所有消息过期时间为5秒
  12. Map<String,Object> map = new HashMap<>();
  13. map.put("x-message-ttl",5000);
  14. // 2.2 设置消息接盘侠:消息过期后,不自动删除,而是将消息重新路由到dead_Exchange交换机
  15. map.put("x-dead-letter-exchange","dead_Exchange");
  16. // 2.3 设置消息接盘侠的具体路由key
  17. map.put("x-dead-letter-routing-key","dead"); // 如果是fanout模式的死信队列,则这里不需要设置投递的RoutingKey
  18. return new Queue("ttl_Queue1",true,false,false,map);
  19. }
  20. // 3.绑定交换机与队列的关系,并设置交换机与队列之间的BindingKey
  21. @Bean
  22. public Binding getBinding_Queue_TTL(){
  23. // 只有投递消息时指定的RoutingKey与这个BindingKey(ttl)匹配上,消息才会被投递到ttl_Queue1队列
  24. return BindingBuilder.bind(getTTL_Queue1()).to(getDirectExchange()).with("ttl");
  25. }
  26. }

重点代码详解

1. 过期队列设置:表示设置该队列是个过期队列,队列内的消息5秒后将过期,消息会被自动删除

map.put("x-message-ttl",5000);

2. 设置死信交换机:也就是说,如果当前队列的消息无法正常消费,比如超过5秒到期了,消息不会自动删除,而是会被重新投递到dead_Exchange交换机里,死信消息的接盘侠。

map.put("x-dead-letter-exchange","dead_Exchange");

3. 接盘侠的RoutingKey:如果创建的死信交换机的类型是direct路由,或者topics主题模式,那么需要指定消息的RoutingKey,将被投递到哪个队列中,因为交换机下可能绑定了多个队列,交换机与队列之间绑定了一个key ,即BindingKey,投递消息时指定一个RoutingKey,只有BindingKey与RoutingKey匹配成功,该条消息才会被交换机投递到相应BindingKey的队列

map.put("x-dead-letter-routing-key","dead"); 

!! 注意:如果是fanout模式的死信交换机,就无需设置 x-dead-letter-routing-key 参数!!

上述map中的三个参数key是固定的,需要去图形化界面中复制

3. 编写 OrderService 类,模拟用户下单,通过MQ进行消息的分发

  1. @Service
  2. public class OrderService {
  3. @Autowired
  4. private RabbitTemplate template;
  5. /**
  6. * 模拟用户创建订单
  7. * @param userId 客户ID
  8. * @param productId 产品ID
  9. * @param num 数量
  10. */
  11. public void createOrder_ttl_queue(String userId, String productId, int num){
  12. // 1.根据商品ID查询库存是否充足
  13. // 2.生成订单
  14. String orderId = UUID.randomUUID().toString();
  15. System.out.println("订单生成成功....");
  16. // 3.将订单id封装成MQ消息,投递到交换机
  17. /**@params1 :交换机名称
  18. * @params2 :RoutingKey路由键/队列名称
  19. * @params3 :消息内容
  20. */
  21. template.convertAndSend("direct_Exchange","ttl",orderId);
  22. }
  23. }
  24. }

4. 运行测试类,调用orderService 创建订单方法

5. 图形化查看队列结果

6. 查看五秒后的结果

我们也可以点进去队列,查看队列详情

结果分析:由上述结果可知,针对过期队列,队列里的消息到期后,会自动删除。如果该队列设置了死信队列,那么过期的消息会被重新投递到死信队列。

注意:如果是对单条消息设置过期时间,而非设置过期队列,消息过期会自动删除,不会进入到死信队列!

达到队列最大长度

代码与上述例子一致,唯一不同点是,在声明队列时,需要设置队列的长度,如下2.1,如果在原有代码上新增或修改队列属性,需要删除之前的队列和交换机,否则启动异常!

  1. /**
  2. * 消息死信原因
  3. * 1.消息被拒绝(消息消费时,应答拒绝)
  4. * 2.消息过期 (仅过期队列内的消息才会投递到死信队列,针对单条消息过期不会进入)
  5. * 3.队列达到最大长度
  6. */
  7. @Bean
  8. public Queue getTTL_Queue1(){
  9. // 2.1 设置当前队列最大长度5
  10. Map<String,Object> map = new HashMap<>();
  11. map.put("x-max-length",5);
  12. // 2.2 设置消息接盘侠:消息过期后,不自动删除,而是将消息重新路由到dead_Exchange交换机
  13. map.put("x-dead-letter-exchange","dead_Exchange");
  14. // 2.3 设置消息接盘侠的具体路由key:死信交换机是路由模式的交换机
  15. map.put("x-dead-letter-routing-key","dead"); // 如果是fanout模式的死信队列,则这里不需要设置投递的RoutingKey
  16. return new Queue("ttl_Queue1",true,false,false,map);
  17. }

2. 测试发送10条消息

  1. // 往队列中投递10条消息,使其超出队列最大长度5
  2. @Test
  3. void createOrder_ttl_queue_maxLength() {
  4. for (int i = 1; i <= 10; i++) {
  5. orderService.createOrder_direct("1001", "1", 12);
  6. }
  7. }

3. 运行测试

4. 查看图形化结果

由上图可知,超出过期队列设置的最大长度5,则剩下的消息都会自动被投递到死信队列。 

创建队列注意事项

假设队列已经存在的情况,去重新修改队列的属性,或者新增属性,启动会报异常

  1. @Bean
  2. public Queue ttlQueue() {
  3.  
  4.    // 问题描述:队列已经存在,设置了5秒过期,此时我修改为6秒,重启项目后会报错
  5.    Map<String,Object> map = new HashMap<>();
  6.    map.put("x-message-ttl",5000);
  7.  
  8.    // ttl_queue已经存在了
  9.    return new Queue("ttl_queue", true,false,false,args);
  10. }

解决方案

  1. 删除该队列
  2. 创建新的队列

一般选择第二种方式,因为实际线上高速运转的开发场景,还有消费者监听队列,删除队列是非常危险的行为,可能存在消息还未消费完或消费中,可以创建新的队列,将线上新产生的消息修改路由到新队列中

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/569749
推荐阅读
相关标签
  

闽ICP备14008679号