当前位置:   article > 正文

RabbitMQ-死信队列-普通队列满了添加消息会怎样?_rabbitmq队列满了怎么办

rabbitmq队列满了怎么办

1:死信队列架构图

 

2:业务描述:

发10条消息,到固定长度为6的普通队列,普通队列有哪些消息?死信队列有哪些消息?

3:结论

普通队列满了之后继续添加消息,前面的消息会挤出到死信队列中。如:
普通队列长度为6,添加10个进去后,先进去的4个会到死信队列中,最后6个消息还在普通队列中。
发送的消息
普通队列的消息
死信队列的消息
9876543210
987654
3210

4:代码实现

1、工具类代码

  1. public class RabbitmqUtils {
  2. private static Logger logger = LoggerFactory.getLogger(RabbitmqUtils.class);
  3. // 死信相关队列
  4. public static String NORMAL_EXCHANGE_NAME = "normal_exchange";
  5. public static String NORMAL_EXCHANGE_ROUTING_KEY = "normal_routing_key";
  6. public static String NORMAL_EXCHANGE_QUEUE_NAME = "normal_queue";
  7. public static String DEAD_EXCHANGE_NAME = "dead_exchange";
  8. public static String DEAD_EXCHANGE_QUEUE_NAME = "dead_queue";
  9. public static String DEAD_EXCHANGE_ROUTING_KEY = "dead_routing_key";
  10. // 队列持久化
  11. public static boolean DURABLE = true;
  12. public static Connection connection;
  13. public static Channel channel;
  14. static {
  15. logger.info("getChannel begin...");
  16. // 创建链接工厂
  17. ConnectionFactory factory = new ConnectionFactory();
  18. logger.info("getChannel factory:{}", factory.toString());
  19. factory.setHost("192.168.6.8");
  20. factory.setPort(5672);
  21. factory.setUsername("admin");
  22. factory.setPassword("123");
  23. // 创建链接
  24. try {
  25. connection = factory.newConnection();
  26. } catch (IOException e) {
  27. throw new RuntimeException(e);
  28. } catch (TimeoutException e) {
  29. throw new RuntimeException(e);
  30. }
  31. logger.info("getChannel connection:{}", connection.toString());
  32. try {
  33. channel = connection.createChannel();
  34. } catch (IOException e) {
  35. throw new RuntimeException(e);
  36. }
  37. logger.info("getChannel channel:{}", channel.toString());
  38. logger.info("getChannel success!");
  39. }
  40. /**
  41. * 死信队列实战
  42. *
  43. * @return
  44. * @throws IOException
  45. */
  46. public static Channel createDeadExchangeAndQueue() throws IOException {
  47. // 删除已存在的交换机
  48. channel.exchangeDelete(RabbitmqUtils.NORMAL_EXCHANGE_NAME);
  49. channel.exchangeDelete(RabbitmqUtils.DEAD_EXCHANGE_NAME);
  50. // 删除已存在的队列
  51. channel.queueDelete(RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME);
  52. channel.queueDelete(RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME);
  53. // 创建普通交换机
  54. channel.exchangeDeclare(
  55. RabbitmqUtils.NORMAL_EXCHANGE_NAME,
  56. BuiltinExchangeType.DIRECT,
  57. RabbitmqUtils.DURABLE);
  58. // 正常队列绑定死信队列信息并设置队列的长度
  59. HashMap<String, Object> arguments = new HashMap<>();
  60. arguments.put("x-dead-letter-exchange",RabbitmqUtils.DEAD_EXCHANGE_NAME);
  61. arguments.put("x-dead-letter-routing-key",RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
  62. arguments.put("x-max-length",6);
  63. // 创建普通队列
  64. channel.queueDeclare(
  65. RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
  66. RabbitmqUtils.DURABLE,
  67. false,false,arguments);
  68. // 将不同队列绑定到普通交换机上
  69. channel.queueBind(
  70. RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
  71. RabbitmqUtils.NORMAL_EXCHANGE_NAME,
  72. RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY);
  73. // 创建死信交换机
  74. channel.exchangeDeclare(
  75. RabbitmqUtils.DEAD_EXCHANGE_NAME,
  76. BuiltinExchangeType.DIRECT,
  77. RabbitmqUtils.DURABLE);
  78. // 创建死信队列
  79. channel.queueDeclare(
  80. RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
  81. RabbitmqUtils.DURABLE,
  82. false,false,null);
  83. // 将死信队列绑定到交换机上
  84. channel.queueBind(
  85. RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
  86. RabbitmqUtils.DEAD_EXCHANGE_NAME,
  87. RabbitmqUtils.DEAD_EXCHANGE_ROUTING_KEY);
  88. return channel;
  89. }
  90. }

2、生产者代码

  1. public class DeadProducer {
  2. private static Logger logger = LoggerFactory.getLogger(DeadProducer.class);
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitmqUtils.createDeadExchangeAndQueue();
  5. // 发送消息
  6. for (int i=0; i< 10; i++) {
  7. String message = "info"+i;
  8. channel.basicPublish(
  9. RabbitmqUtils.NORMAL_EXCHANGE_NAME,
  10. RabbitmqUtils.NORMAL_EXCHANGE_ROUTING_KEY,
  11. null,
  12. message.getBytes());
  13. logger.info("发送的消息内容为:{}",message);
  14. }
  15. }
  16. }

生产者输出日志:

  1. 发送的消息内容为:info0
  2. 发送的消息内容为:info1
  3. 发送的消息内容为:info2
  4. 发送的消息内容为:info3
  5. 发送的消息内容为:info4
  6. 发送的消息内容为:info5
  7. 发送的消息内容为:info6
  8. 发送的消息内容为:info7
  9. 发送的消息内容为:info8
  10. 发送的消息内容为:info9

生产者发送消息后的结果

3、普通消费者1代码

  1. public class OrdinaryConsumer1 {
  2. private static Logger logger = LoggerFactory.getLogger(OrdinaryConsumer1.class);
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitmqUtils.channel;
  5. DeliverCallback deliverCallback = (consumerTag, message) ->{
  6. String msg = new String(message.getBody(), "UTF-8");
  7. logger.info("普通队列接受到的消息是:{}", msg);
  8. };
  9. CancelCallback cancelCallback = (consumerTag)->{
  10. logger.info(consumerTag + "消息被中断");
  11. };
  12. channel.basicConsume(
  13. RabbitmqUtils.NORMAL_EXCHANGE_QUEUE_NAME,
  14. true,
  15. deliverCallback,
  16. cancelCallback);
  17. }
  18. }

普通消费者消费的日志:

  1. 普通队列接受到的消息是:info4
  2. 普通队列接受到的消息是:info5
  3. 普通队列接受到的消息是:info6
  4. 普通队列接受到的消息是:info7
  5. 普通队列接受到的消息是:info8
  6. 普通队列接受到的消息是:info9

4、死信消费者2代码

  1. public class DeadConsumer2 {
  2. private static Logger logger = LoggerFactory.getLogger(DeadConsumer2.class);
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitmqUtils.channel;
  5. DeliverCallback deliverCallback = (consumerTag, message) ->{
  6. String msg = new String(message.getBody(), "UTF-8");
  7. logger.info("死信队列接受到的消息是:{}", msg);
  8. };
  9. CancelCallback cancelCallback = (consumerTag)->{
  10. logger.info(consumerTag + "消息被中断");
  11. };
  12. channel.basicConsume(
  13. RabbitmqUtils.DEAD_EXCHANGE_QUEUE_NAME,
  14. true,
  15. deliverCallback,
  16. cancelCallback);
  17. }
  18. }

死信消费者消费的日志:

  1. 死信队列接受到的消息是:info0
  2. 死信队列接受到的消息是:info1
  3. 死信队列接受到的消息是:info2
  4. 死信队列接受到的消息是:info3

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号