当前位置:   article > 正文

rabbitmq死信队列详解(亲手实践)_php rabbitmq 死信队列

php rabbitmq 死信队列

原文链接:rabbitmq死信队列详解(亲手实践) – 编程屋

目录

1 概念

2 成为死信队列的条件

2.1 队列指定长度 

2.2  消息ttl时间

2.3 消费者拒收消息


1 概念

死信队列:死信队列其实和普通的队列一样,只不过里面存放的消息都是普通队列过期没有消费的。所以,接收没有及时被消费消息的队列为死信队列。

2 成为死信队列的条件

以下条件只要满足一条,即可以成为死信队列。

  1. 队列长度满了:排在前面的消息会被拒收或者进入死信交换机
  2. 消息的ttl时间到了:消息超时未被消费
  3. 消息被拒收了:手动拒绝消息

一个队列设置了队列长度或者过期时间或被拒收,并且设置了死信队列的交换机和死信的路由key。那么消息满足条件就会进入死信队列。

例如:

@Bean("queueB")
public Queue queueB(){
    Map<String, Object> arguments  = new HashMap<>();
    //设置死信交换机
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_XCHANGE);
    //设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","YD");
    //设置ttl
    arguments.put("x-message-ttl",40000);

    return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}

以上只是声明了一个普通队列queueB,然后在该队列设置了过期时间40s,和死信交换机和死信路由key。

注意:死信队列就是一个普通的队列,只不过声明普通队列的时候指定了死信交换机,二者才产生了联系

2.1 队列指定长度 

配置相关队列和交换机

注意:声明一个队列分为3步(声明交换机、声明队列、将队列和交换机路由绑定)

  1. package com.liubujun.rabbitmqspringbootdemo.config;
  2. import com.rabbitmq.client.AMQP;
  3. import com.sun.javafx.collections.MappingChange;
  4. import jdk.nashorn.internal.objects.NativeUint8Array;
  5. import org.springframework.amqp.core.Binding;
  6. import org.springframework.amqp.core.Exchange;
  7. import org.springframework.amqp.core.Queue;
  8. import org.springframework.amqp.core.TopicExchange;
  9. import org.springframework.beans.factory.annotation.Qualifier;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.stereotype.Component;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * @Author: liubujun
  16. * @Date: 2023/6/3 16:15
  17. */
  18. @Component
  19. public class DeadQueueConfig {
  20. //队列
  21. public static final String FORMAL_QUEUE = "formal_queue";
  22. public static final String DEAD_QUEUE = "dead_queue";
  23. //交换机
  24. public static final String FORMAL_EXCHANGE = "formal_exchange";
  25. public static final String DEAD_EXCHANGE = "dead_exchaneg";
  26. //路由key
  27. public static final String FORMAL_ROUNTE_KEY = "formal_rounte_key";
  28. public static final String DEAD_ROUNTE_KEY = "dead_route_key";
  29. /**
  30. * 普通队列交换机声明
  31. * 交换机类型:topic:处理路由键,按模式匹配,向符合规则的队列投递消息
  32. * name 交换机名称
  33. * durable 是否持久化
  34. * autoDelete 是否删除
  35. * arguments 用于设置其他参数
  36. * @return
  37. */
  38. @Bean
  39. public Exchange getFormalExchange(){
  40. return new TopicExchange(FORMAL_EXCHANGE,true,false,null);
  41. }
  42. /**
  43. * 声明普通队列,并设置与死信队列联系
  44. * name 队列名称
  45. * durable 是否持久化
  46. * exclusive 是否排外 如果是排外的,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,
  47. * 类似于加锁,并在连接断开connection.close()时自动删除
  48. * autoDelete 是否删除
  49. * arguments 用于设置其他参数
  50. * @return
  51. */
  52. @Bean
  53. public Queue getFormalQueue(){
  54. Map<String, Object> map = new HashMap<>();
  55. //设置队列最大长度
  56. map.put("x-max-length",5);
  57. //设置死信队列交换机
  58. map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  59. //设置死信队列路由key
  60. map.put("x-dead-letter-routing-key",DEAD_ROUNTE_KEY);
  61. return new Queue(FORMAL_QUEUE,true,false,false,map);
  62. }
  63. /**
  64. * 将普通队列和交换机绑定
  65. * destination:目标队列或交换器
  66. * destinationType:DesdinationType指出目标是交换器还是对列
  67. * exchange:交换机
  68. * routingKey:路由key
  69. * arguments:参数设置
  70. * @return
  71. */
  72. @Bean
  73. public Binding bingFormalQueue(){
  74. return new Binding(FORMAL_QUEUE, Binding.DestinationType.QUEUE,FORMAL_EXCHANGE,FORMAL_ROUNTE_KEY,null);
  75. }
  76. /**
  77. * 声明死信队列交换机
  78. * @return
  79. */
  80. @Bean
  81. public Exchange getDeadExchange(){
  82. return new TopicExchange(DEAD_EXCHANGE,true,false,null);
  83. }
  84. /**
  85. * 声明死信队列
  86. * @return
  87. */
  88. @Bean
  89. public Queue getDeadQueue(){
  90. return new Queue(DEAD_QUEUE,true,false,false, null);
  91. }
  92. /**
  93. * 将死信队列和交换机绑定
  94. * @return
  95. */
  96. @Bean
  97. public Binding bingDeadQueue(){
  98. return new Binding(DEAD_QUEUE, Binding.DestinationType.QUEUE,DEAD_EXCHANGE,DEAD_ROUNTE_KEY,null);
  99. }
  100. }

生产者:发送6条消息,看rabbitmq中队列变化

  1. @GetMapping("/sendMessageTtl/{message}")
  2. public void sendMessageTtl(@PathVariable String message){
  3. log.info("当前时间发送:{},发送5条消息给两个TTL队列:{}",new Date().toString(),message);
  4. for (int i = 0; i < 6; i++) {
  5. rabbitTemplate.convertAndSend(DeadQueueConfig.FORMAL_EXCHANGE,DeadQueueConfig.FORMAL_ROUNTE_KEY,message);
  6. }
  7. }

rabbitmq控制台:

普通队列有5条消息,而死信队列有1条消息。

因为在声明普通队列的时候,已经说明了队列最大长度为5,那么多余的消息就会根据配置的参数找到对应的交换机进而找到对应的路由,然后路由到对应的队列(死信队列) 。

2.2  消息ttl时间

继续沿用上面的配置,只不过修改下普通队列的参数。

  1. @Bean
  2. public Queue getFormalQueue(){
  3. Map<String, Object> map = new HashMap<>();
  4. //设置队列超时时间
  5. map.put("x-message-ttl",5000);
  6. //设置死信队列交换机
  7. map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  8. //设置死信队列路由key
  9. map.put("x-dead-letter-routing-key",DEAD_ROUNTE_KEY);
  10. return new Queue(FORMAL_QUEUE,true,false,false,map);
  11. }

发送消息:

可以发现,发送给普通队列的消息,超时没有被消费,都进入到了死信队列中。 

2.3 消费者拒收消息

沿用上面的配置,并在声明普通队列的时候去掉消息的过期时间。

注意:需要在rabbitmq控制台删除队列,不然项目启动会报错。

添加消费者:

  1. @Slf4j
  2. @Component
  3. public class DeadQueueConsumer {
  4. /**
  5. * 监听死信队列
  6. */
  7. // @RabbitListener(queues = DeadQueueConfig.DEAD_QUEUE)
  8. // public void listenDeadQueue(Message message, Channel channel){
  9. // log.info("接收到死信队列消息:{}",message.getBody());
  10. // }
  11. /**
  12. * 监听普通队列
  13. */
  14. @RabbitListener(queues = "formal_queue")
  15. public void listenFormalQueue(Message message, Channel channel) throws IOException {
  16. log.info("接收到普通队列消息:{}",message.getBody());
  17. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  18. //拒绝消息
  19. channel.basicReject(deliveryTag,false);
  20. }
  21. }

rabbitmq控制台结果: 

消息在消费者端被拒收后,直接被放进了死信队列。

以上只是部分内容,为了维护方便,本文已迁移到新地址:rabbitmq死信队列详解(亲手实践) – 编程屋

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/603234
推荐阅读
相关标签
  

闽ICP备14008679号