当前位置:   article > 正文

Rabbitmq中的死信队列

Rabbitmq中的死信队列

背景

        RabbitMQ死信队列俗称,备胎队列;消息中间件因为某原因拒收消息后,可以转移到死信队列中存放,死信队列也可以交换机和路由key

原理

        死信队列和普通队列区别不是很大

        普通与死信队列都有自己独立的交换机和路由key、队列和消费者。

区别:

        1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到

普通队列中缓存起来,普通队列对应有自己独立普通消费者。

        2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

普通队列中,普通队列发现该消息一直没有被消费者消费

的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机

对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

产生死信队列的原因

工具类:

  1. public class RabbitMqUtils {
  2. //得到一个连接的 channel
  3. public static Channel getChannel() throws Exception{
  4. //创建一个连接工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("127.0.0.1");
  7. factory.setUsername("guest");
  8. factory.setPassword("guest");
  9. //创建连接
  10. Connection connection = factory.newConnection();
  11. //创建通道
  12. Channel channel = connection.createChannel();
  13. return channel;
  14. }
  15. }

  1. 消息投递到MQ中存放 消息已经过期  消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。

消息 TTL 过期

生产者代码:

  1. public class Producer {
  2. private static final String NORMAL_EXCHANGE = "normal_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. try (Channel channel = RabbitMqUtils.getChannel()) {
  5. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. //设置消息的 TTL 时间
  7. AMQP.BasicProperties properties = new
  8. AMQP.BasicProperties().builder().expiration("10000").build();
  9. //该信息是用作演示队列个数限制
  10. for (int i = 1; i <11 ; i++) {
  11. String message="info"+i;
  12. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
  13. message.getBytes());
  14. System.out.println("生产者发送消息:"+message);
  15. }
  16. }
  17. }
  18. }

消费端:

普通消费:

  1. public class Consumer01 {
  2. //普通交换机名称
  3. private static final String NORMAL_EXCHANGE = "normal_exchange";
  4. //死信交换机名称
  5. private static final String DEAD_EXCHANGE = "dead_exchange";
  6. public static void main(String[] argv) throws Exception {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明死信和普通交换机 类型为 direct
  9. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  11. //声明死信队列
  12. String deadQueue = "dead-queue";
  13. channel.queueDeclare(deadQueue, false, false, false, null);
  14. //死信队列绑定死信交换机与 routingkey
  15. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  16. //正常队列绑定死信队列信息
  17. Map<String, Object> params = new HashMap<>();
  18. //正常队列设置死信交换机 参数 key 是固定值
  19. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  20. //正常队列设置死信 routing-key 参数 key 是固定值
  21. params.put("x-dead-letter-routing-key", "lisi");
  22. String normalQueue = "normal-queue";
  23. channel.queueDeclare(normalQueue, false, false, false, params);
  24. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
  25. System.out.println("等待接收消息.....");
  26. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  27. String message = new String(delivery.getBody(), "UTF-8");
  28. System.out.println("Consumer01 接收到消息"+message);
  29. };
  30. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
  31. });
  32. }
  33. }

死信消费:

  1. public class Consumer02 {
  2. private static final String DEAD_EXCHANGE = "dead_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. String deadQueue = "dead-queue";
  7. channel.queueDeclare(deadQueue, false, false, false, null);
  8. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  9. System.out.println("等待接收死信队列消息.....");
  10. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  11. String message = new String(delivery.getBody(), "UTF-8");
  12. System.out.println("Consumer02 接收死信队列的消息" + message);
  13. };
  14. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
  15. });
  16. }
  17. }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

  2.关闭消费端->启动生产者 再次查看结果    结果原因:消费端队列关闭,消息时间过期会进入死信队列

2. 队列达到最大的长度 (队列容器已经满了)

生产者代码:

  1. public class Producer {
  2. private static final String NORMAL_EXCHANGE = "normal_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. try (Channel channel = RabbitMqUtils.getChannel()) {
  5. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. //该信息是用作演示队列个数限制
  7. for (int i = 1; i <11 ; i++) {
  8. String message="info"+i;
  9. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,
  10. message.getBytes());
  11. System.out.println("生产者发送消息:"+message);
  12. }
  13. }
  14. }
  15. }

消费端:

普通消费:

  1. public class Consumer01 {
  2. //普通交换机名称
  3. private static final String NORMAL_EXCHANGE = "normal_exchange";
  4. //死信交换机名称
  5. private static final String DEAD_EXCHANGE = "dead_exchange";
  6. public static void main(String[] argv) throws Exception {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明死信和普通交换机 类型为 direct
  9. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  11. //声明死信队列
  12. String deadQueue = "dead-queue";
  13. channel.queueDeclare(deadQueue, false, false, false, null);
  14. //死信队列绑定死信交换机与 routingkey
  15. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  16. //正常队列绑定死信队列信息
  17. Map<String, Object> params = new HashMap<>();
  18. //正常队列设置死信交换机 参数 key 是固定值
  19. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  20. //正常队列设置死信 routing-key 参数 key 是固定值
  21. params.put("x-dead-letter-routing-key", "lisi");
  22. params.put("x-max-length",6); //设置队列长度为6
  23. String normalQueue = "normal-queue";
  24. channel.queueDeclare(normalQueue, false, false, false, params);
  25. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
  26. System.out.println("等待接收消息.....");
  27. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  28. String message = new String(delivery.getBody(), "UTF-8");
  29. System.out.println("Consumer01 接收到消息"+message);
  30. };
  31. channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
  32. });
  33. }
  34. }

死信消费:

  1. public class Consumer02 {
  2. private static final String DEAD_EXCHANGE = "dead_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. String deadQueue = "dead-queue";
  7. channel.queueDeclare(deadQueue, false, false, false, null);
  8. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  9. System.out.println("等待接收死信队列消息.....");
  10. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  11. String message = new String(delivery.getBody(), "UTF-8");
  12. System.out.println("Consumer02 接收死信队列的消息" + message);
  13. };
  14. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
  15. });
  16. }
  17. }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列只能存储6条消息,剩下的消息进入死信队列

3. 消费者消费多次消息失败,就会转移存放到死信队列中

注意:与其他不同的是,需要开启手动应答 

生产者代码:

  1. public class Producer {
  2. private static final String NORMAL_EXCHANGE = "normal_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. try (Channel channel = RabbitMqUtils.getChannel()) {
  5. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. //该信息是用作演示队列个数限制
  7. for (int i = 1; i <11 ; i++) {
  8. String message="info"+i;
  9. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,
  10. message.getBytes());
  11. System.out.println("生产者发送消息:"+message);
  12. }
  13. }
  14. }
  15. }

消费端:

普通消费:

  1. public class Consumer01 {
  2. //普通交换机名称
  3. private static final String NORMAL_EXCHANGE = "normal_exchange";
  4. //死信交换机名称
  5. private static final String DEAD_EXCHANGE = "dead_exchange";
  6. public static void main(String[] argv) throws Exception {
  7. Channel channel = RabbitMqUtils.getChannel();
  8. //声明死信和普通交换机 类型为 direct
  9. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  11. //声明死信队列
  12. String deadQueue = "dead-queue";
  13. channel.queueDeclare(deadQueue, false, false, false, null);
  14. //死信队列绑定死信交换机与 routingkey
  15. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  16. //正常队列绑定死信队列信息
  17. Map<String, Object> params = new HashMap<>();
  18. //正常队列设置死信交换机 参数 key 是固定值
  19. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  20. //正常队列设置死信 routing-key 参数 key 是固定值
  21. params.put("x-dead-letter-routing-key", "lisi");
  22. String normalQueue = "normal-queue";
  23. channel.queueDeclare(normalQueue, false, false, false, params);
  24. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
  25. System.out.println("等待接收消息.....");
  26. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  27. String message = new String(delivery.getBody(), "UTF-8");
  28. if(message.equals("info5")){
  29. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
  30. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
  31. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
  32. }else {
  33. System.out.println("Consumer01 接收到消息"+message);
  34. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  35. }
  36. };
  37. boolean autoAck = false;
  38. channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
  39. });
  40. }
  41. }

死信消费:

  1. public class Consumer02 {
  2. private static final String DEAD_EXCHANGE = "dead_exchange";
  3. public static void main(String[] argv) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  6. String deadQueue = "dead-queue";
  7. channel.queueDeclare(deadQueue, false, false, false, null);
  8. channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  9. System.out.println("等待接收死信队列消息.....");
  10. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  11. String message = new String(delivery.getBody(), "UTF-8");
  12. System.out.println("Consumer02 接收死信队列的消息" + message);
  13. };
  14. channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
  15. });
  16. }
  17. }

运行结果查询:

  1.启动消费端->启动死信队列->启动生产者 查看结果

         结果原因:消费端队列消息中为info5时,消费端拒绝签收,只能进入死信队列

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/928393
推荐阅读
相关标签
  

闽ICP备14008679号