当前位置:   article > 正文

RabbitMQ——死信队列介绍和项目应用_rabbitmq 订单系统死信队列的应用

rabbitmq 订单系统死信队列的应用

前言

最近在做一个BI项目,用到了RabbitMQ异步化生成图表,同时还添加了死信队列处理无法被消费者正常消费的消息。于是便有了这篇文章,下面由我带大家介绍RabbitMQ的死信队列和其在项目中的应用吧。

死信和死信队列的概念

什么是死信?简单来说就是无法被消费和处理的消息。一般生产者将消息投递到broker或者queue,消费者直接从中取出消息进行消费。但有时因为某些原因导致消息不能被消费,导致消息积压在队列中,这样的消息如果没有后续的处理就会变成死信,那么专门存放死信的队列就是死信队列

注意:如果一个消息队列设置了过期时间,在队列过期后其中的消息并不会自动转发到死信队列中,而是会被系统丢弃或执行其他的操作。

什么是死信交换机?

那么什么是死信交换机呢?死信交换机是指专门将死信路由到死信队列的交换机

注意:死信交换机和普通交换机无任何区别,只是路由的消息类别不同。同理,死信队列和普通队列也无任何区别,仅仅是一个专门存放死信的队列而已。无论是死信交换机还是队列,它们的创建方法都和普通的交换机和队列一致,无非是名称和routingKey不同。

产生死信的原因

根据官方文档,我们发现一般有三种场景会产生死信。

  1. 消息超过TTL,即消息过期
  2. 消息被nack或reject,且不予重新入队
  3. 队列达到最大长度

死信队列实战和应用

死信队列的应用并不难,无非就是多定义了一个交换机、routingKey和队列罢了。在声明普通队列时传入Map参数,往Map中put死信队列名称、死信routingKey、消息TTL等参数即可完成死信自动投递到死信队列的流程。通过如下代码即可绑定普通队列和死信交换机了,而且还能设置routingKey和队列长度等参数,无需像传统的那样通过channel绑定。

  1. Map<String, Object> arguments = new HashMap<>();
  2. // 过期时间
  3. arguments.put("x-message-ttl", 10000);
  4. // 正常队列设置死信交换机
  5. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  6. // 设置死信routingKey
  7. arguments.put("x-dead-letter-routing-key", "lisi");
  8. // 设置正常队列的长度限制
  9. arguments.put("x-max-length", 10);

流程图:

 

生产者Producer:

  1. public class Producer {
  2. // 普通交换机名称
  3. public static final String NORMAL_EXCHANGE = "normal_exchange";
  4. public static void main(String[] args) throws IOException {
  5. Channel channel = RabbitMQUtils.getChannel();
  6. //死信消息 设置TTL时间
  7. AMQP.BasicProperties properties = new AMQP.BasicProperties()
  8. .builder().expiration("10000").build();
  9. // 延迟消息
  10. for (int i = 0;i < 10;i++) {
  11. String message = i + "info";
  12. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
  13. }
  14. }
  15. }

普通队列消费者C1:

  1. public class Consumer01 {
  2. // 普通交换机名称
  3. public static final String NORMAL_EXCHANGE = "normal_exchange";
  4. // 死信交换机名称
  5. public static final String DEAD_EXCHANGE = "dead_exchange";
  6. // 普通队列名称
  7. public static final String NORMAL_QUEUE = "normal_queue";
  8. // 死信队列名称
  9. public static final String DEAD_QUEUE = "dead_queue";
  10. public static void main(String[] args) throws IOException {
  11. Channel channel = RabbitMQUtils.getChannel();
  12. // 声明死信和普通交换机,类型为direct
  13. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  14. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  15. // 声明普通队列
  16. Map<String, Object> arguments = new HashMap<>();
  17. // 过期时间
  18. arguments.put("x-message-ttl", 10000);
  19. // 正常队列设置死信交换机
  20. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  21. // 设置死信routingKey
  22. arguments.put("x-dead-letter-routing-key", "lisi");
  23. // 设置正常队列的长度限制
  24. arguments.put("x-max-length", 10);
  25. // 声明普通队列
  26. channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
  27. // 声明死信队列
  28. channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
  29. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
  30. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
  31. System.out.println("consumer01等待接收消息");
  32. DeliverCallback deliverCallback = (consumerTag, message) -> {
  33. String msg = new String(message.getBody(), "UTF-8");
  34. if (msg.equals("info5")) {
  35. System.out.println("consumer01接收的消息:" + new String(message.getBody()));
  36. System.out.println(msg + ":此消息是被拒绝的");
  37. channel.basicReject(message.getEnvelope().getDeliveryTag(), false); //拒绝此消息并不放回普通队列
  38. } else {
  39. System.out.println("consumer01接收的消息:" + new String(message.getBody()));
  40. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  41. }
  42. };
  43. CancelCallback cancelCallback = consumerTag -> {
  44. System.out.println("C1取消消息");
  45. };
  46. channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
  47. }
  48. }

死信队列消费者C2:

  1. public class Consumer02 {
  2. // 死信队列名称
  3. public static final String DEAD_QUEUE = "dead_queue";
  4. public static void main(String[] args) throws IOException {
  5. Channel channel = RabbitMQUtils.getChannel();
  6. System.out.println("consumer02等待接收消息");
  7. DeliverCallback deliverCallback = (consumerTag, message) -> {
  8. System.out.println("consumer02接收的消息:" + new String(message.getBody()));
  9. };
  10. CancelCallback cancelCallback = consumerTag -> {
  11. System.out.println("C2取消消息");
  12. };
  13. channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
  14. }
  15. }

依次启动生产者,和两个消费者,并停掉普通队列的消费者,我们发现生产者发送的消息被死信队列消费者C2给接收了。

在上面的代码中,我在普通队列中设置了消息的TTL为5s,但是我又在生产者设置发送的消息TTL为10s,那么RabbitMQ会以哪个为准呢?其实RabbitMQ会以较短的TTL为准

BI项目添加死信队列

声明交换机、队列和routingKey的配置类

  1. @Configuration
  2. public class TtlQueueConfig {
  3. private final String COMMON_EXCHANGE = "bi_common_exchange"; // 普通交换机名称
  4. private final String COMMON_QUEUE = "bi_common_queue"; // 普通队列名称
  5. private final String DEAD_LETTER_EXCHANGE = "bi_dead_letter_exchange"; // 死信交换机名称
  6. private final String DEAD_LETTER_QUEUE = "bi_dead_letter_queue"; // 死信队列名称
  7. private final String COMMON_ROUTINGKEY = "bi_common_routingKey"; // 普通routingKey
  8. private final String DEAD_LETTER_ROUTINGKEY = "bi_dead_letter_routingKey"; // 死信routingKey
  9. // 普通交换机
  10. @Bean("commonExchange")
  11. public DirectExchange commonExchange() {
  12. return new DirectExchange(COMMON_EXCHANGE);
  13. }
  14. // 死信交换机
  15. @Bean("deadLetterExchange")
  16. public DirectExchange deadLetterExchange() {
  17. return new DirectExchange(DEAD_LETTER_EXCHANGE);
  18. }
  19. // 普通队列
  20. @Bean("commonQueue")
  21. public Queue commonQueue() {
  22. Map<String, Object> map = new HashMap<>(3);
  23. map.put("x-message-ttl", 20000);
  24. map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
  25. map.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTINGKEY);
  26. return QueueBuilder.durable(COMMON_QUEUE).withArguments(map).build();
  27. }
  28. // 死信队列
  29. @Bean("deadLetterQueue")
  30. public Queue deadLetterQueue() {
  31. return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
  32. }
  33. @Bean
  34. public Binding commonQueueBindingCommonExchange(@Qualifier("commonQueue") Queue commonQueue,
  35. @Qualifier("commonExchange") DirectExchange commonExchange) {
  36. return BindingBuilder.bind(commonQueue).to(commonExchange).with(COMMON_ROUTINGKEY);
  37. }
  38. @Bean
  39. public Binding deadQueueBindingDeadExchange(@Qualifier("deadLetterQueue") Queue deadLetterQueue,
  40. @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
  41. return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTINGKEY);
  42. }
  43. }
  44. 普通消费者(负责异步生成图表)
  45. @Component
  46. @Slf4j
  47. public class BIMessageConsumer {
  48. @Resource
  49. private ChartService chartService;
  50. @Resource
  51. private RabbitTemplate rabbitTemplate;
  52. @Resource
  53. private AIManager aiManager;
  54. @Resource
  55. RedisTemplate<String, Object> redisTemplate;
  56. // 制定消费者监听哪个队列和消息确认机制
  57. @SneakyThrows
  58. @RabbitListener(queues = {"bi_common_queue"}, ackMode = "MANUAL")
  59. public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
  60. log.info("receiveMessage is {}", message);
  61. if(StringUtils.isBlank(message)) {
  62. // 如果失败,消息拒绝
  63. channel.basicNack(deliveryTag, false, false);
  64. log.info("消息为空拒绝接收");
  65. log.info("此消息正在被转发到死信队列中");
  66. }
  67. long chartId = Long.parseLong(message);
  68. Chart chart = chartService.getById(chartId);
  69. if (chart == null) {
  70. channel.basicNack(deliveryTag, false, false);
  71. log.info("图标为空拒绝接收");
  72. throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空");
  73. }
  74. // 先修改图表任务状态为“执行中”。等执行成功后,修改为“已完成”、保存执行结果;执行失败后,状态修改为“失败”,记录任务失败信息。
  75. Chart updateChart = new Chart();
  76. updateChart.setId(chart.getId());
  77. updateChart.setStatus("running");
  78. boolean b = chartService.updateById(updateChart);
  79. if (!b) {
  80. channel.basicNack(deliveryTag, false, false);
  81. handlerChartUpdateError(chart.getId(), "更新图表执行状态失败");
  82. return;
  83. }
  84. // 调用AI
  85. String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, buildUserInput(chart));
  86. String[] splits = result.split("【【【【【");
  87. if (splits.length < 3) {
  88. channel.basicNack(deliveryTag, false, false);
  89. handlerChartUpdateError(chart.getId(), "AI生成错误");
  90. return;
  91. }
  92. String genChart = splits[1].trim();
  93. String genResult = splits[2].trim();
  94. Chart updateChartResult = new Chart();
  95. updateChartResult.setId(chart.getId());
  96. updateChartResult.setGenChart(genChart);
  97. updateChartResult.setGenResult(genResult);
  98. updateChartResult.setStatus("succeed");
  99. boolean updateResult = chartService.updateById(updateChartResult);
  100. if (!updateResult) {
  101. channel.basicNack(deliveryTag, false, false);
  102. handlerChartUpdateError(chart.getId(), "更新图表成功状态失败");
  103. }
  104. Long userId = chartService.queryUserIdByChartId(chartId);
  105. String myChartId = String.format("lingxibi:chart:list:%s", userId);
  106. redisTemplate.delete(myChartId);
  107. // 如果任务执行成功,手动执行ack
  108. channel.basicAck(deliveryTag, false);
  109. }
  110. private void handlerChartUpdateError(long chartId, String execMessage) {
  111. Chart updateChartResult = new Chart();
  112. updateChartResult.setId(chartId);
  113. updateChartResult.setStatus("failed");
  114. updateChartResult.setExecMessage(execMessage);
  115. boolean updateResult = chartService.updateById(updateChartResult);
  116. if (!updateResult) {
  117. log.error("更新图表失败状态失败" + chartId + "," + execMessage);
  118. }
  119. }
  120. /**
  121. * 构建用户输入
  122. * @param chart
  123. * @return
  124. */
  125. private String buildUserInput(Chart chart) {
  126. String goal = chart.getGoal();
  127. String chartType = chart.getChartType();
  128. String csvData = chart.getChartData();
  129. // 构造用户输入
  130. StringBuilder userInput = new StringBuilder();
  131. userInput.append("分析需求:").append("\n");
  132. // 拼接分析目标
  133. String userGoal = goal;
  134. if (StringUtils.isNotBlank(chartType)) {
  135. userGoal += ",请使用" + chartType;
  136. }
  137. userInput.append(userGoal).append("\n");
  138. userInput.append("原始数据:").append("\n");
  139. // 压缩后的数据
  140. userInput.append(csvData).append("\n");
  141. return userInput.toString();
  142. }
  143. }

普通消费者(负责异步生成图表信息)

  1. @Component
  2. @Slf4j
  3. public class BIMessageConsumer {
  4. @Resource
  5. private ChartService chartService;
  6. @Resource
  7. private RabbitTemplate rabbitTemplate;
  8. @Resource
  9. private AIManager aiManager;
  10. @Resource
  11. RedisTemplate<String, Object> redisTemplate;
  12. // 制定消费者监听哪个队列和消息确认机制
  13. @SneakyThrows
  14. @RabbitListener(queues = {"bi_common_queue"}, ackMode = "MANUAL")
  15. public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
  16. log.info("receiveMessage is {}", message);
  17. if(StringUtils.isBlank(message)) {
  18. // 如果失败,消息拒绝
  19. channel.basicNack(deliveryTag, false, false);
  20. log.info("消息为空拒绝接收");
  21. log.info("此消息正在被转发到死信队列中");
  22. }
  23. long chartId = Long.parseLong(message);
  24. Chart chart = chartService.getById(chartId);
  25. if (chart == null) {
  26. channel.basicNack(deliveryTag, false, false);
  27. log.info("图标为空拒绝接收");
  28. throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空");
  29. }
  30. // 先修改图表任务状态为“执行中”。等执行成功后,修改为“已完成”、保存执行结果;执行失败后,状态修改为“失败”,记录任务失败信息。
  31. Chart updateChart = new Chart();
  32. updateChart.setId(chart.getId());
  33. updateChart.setStatus("running");
  34. boolean b = chartService.updateById(updateChart);
  35. if (!b) {
  36. channel.basicNack(deliveryTag, false, false);
  37. handlerChartUpdateError(chart.getId(), "更新图表执行状态失败");
  38. return;
  39. }
  40. // 调用AI
  41. String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, buildUserInput(chart));
  42. String[] splits = result.split("【【【【【");
  43. if (splits.length < 3) {
  44. channel.basicNack(deliveryTag, false, false);
  45. handlerChartUpdateError(chart.getId(), "AI生成错误");
  46. return;
  47. }
  48. String genChart = splits[1].trim();
  49. String genResult = splits[2].trim();
  50. Chart updateChartResult = new Chart();
  51. updateChartResult.setId(chart.getId());
  52. updateChartResult.setGenChart(genChart);
  53. updateChartResult.setGenResult(genResult);
  54. updateChartResult.setStatus("succeed");
  55. boolean updateResult = chartService.updateById(updateChartResult);
  56. if (!updateResult) {
  57. channel.basicNack(deliveryTag, false, false);
  58. handlerChartUpdateError(chart.getId(), "更新图表成功状态失败");
  59. }
  60. Long userId = chartService.queryUserIdByChartId(chartId);
  61. String myChartId = String.format("lingxibi:chart:list:%s", userId);
  62. redisTemplate.delete(myChartId);
  63. // 如果任务执行成功,手动执行ack
  64. channel.basicAck(deliveryTag, false);
  65. }
  66. private void handlerChartUpdateError(long chartId, String execMessage) {
  67. Chart updateChartResult = new Chart();
  68. updateChartResult.setId(chartId);
  69. updateChartResult.setStatus("failed");
  70. updateChartResult.setExecMessage(execMessage);
  71. boolean updateResult = chartService.updateById(updateChartResult);
  72. if (!updateResult) {
  73. log.error("更新图表失败状态失败" + chartId + "," + execMessage);
  74. }
  75. }
  76. /**
  77. * 构建用户输入
  78. * @param chart
  79. * @return
  80. */
  81. private String buildUserInput(Chart chart) {
  82. String goal = chart.getGoal();
  83. String chartType = chart.getChartType();
  84. String csvData = chart.getChartData();
  85. // 构造用户输入
  86. StringBuilder userInput = new StringBuilder();
  87. userInput.append("分析需求:").append("\n");
  88. // 拼接分析目标
  89. String userGoal = goal;
  90. if (StringUtils.isNotBlank(chartType)) {
  91. userGoal += ",请使用" + chartType;
  92. }
  93. userInput.append(userGoal).append("\n");
  94. userInput.append("原始数据:").append("\n");
  95. // 压缩后的数据
  96. userInput.append(csvData).append("\n");
  97. return userInput.toString();
  98. }
  99. }

死信队列消费者(负责处理死信)

收到死信后我是直接确认了,这种方式可能不好,你也可以换成其他方式比如重新入队,或者写入数据库并打上日志等等。

  1. @Component
  2. @Slf4j
  3. public class TtlQueueConsumer {
  4. @Resource
  5. BIMessageProducer biMessageProducer;
  6. @SneakyThrows
  7. @RabbitListener(queues = "bi_dead_letter_queue", ackMode = "MANUAL")
  8. public void doTTLMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
  9. log.info("已经接受到死信消息:{}", message);
  10. biMessageProducer.sendMessage(message);
  11. channel.basicAck(deliveryTag, false);
  12. }
  13. }

如果我的文章对你有帮助的话,不妨给我点个赞呗,我会持续带来不一样的内容。如果对Java相关知识感兴趣的话,可以关注我,带你走进Java的世界。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/569684
推荐阅读
相关标签
  

闽ICP备14008679号