当前位置:   article > 正文

RabbitMQ (三)消息重试_retries exhausted for message

retries exhausted for message

1 RabbitMQ自带的重试机制

1 示例代码

rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新“推送”给消费者,直至消息消费成功为止

开启自带的重试机制,需要如下几个配置:

1 开启消费者手动应答机制,对应的springboot配置项:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2 消费异常时,设置消息重新入列

  1. boolean multiple = false; // 单条确认
  2. boolean requeue  = true; // 重新进入队列,谨慎设置!!!很容易导致死循环,cpu 100%
  3. channel.basicNack(tag, multiple, requeue);

以下是运行例子:

消费者代码如下:

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * @author fmi110
  11. * @description 消息消费者
  12. * @date 2021/7/1 16:08
  13. */
  14. @Component
  15. @Slf4j
  16. public class RabbitConsumer {
  17.    AtomicInteger count = new AtomicInteger();
  18.    @RabbitListener(queues="my-queue")
  19.    public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  20.        log.info(">>>> consumer1 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
  21.    
  22.        try {
  23.            Thread.currentThread().sleep(1000);
  24.            int i = 1/0;
  25.            channel.basicAck(tag,true); // 确认消息消费成功
  26.       } catch (Exception e) {
  27.            log.error(">>>>消费异常,消息重新进入队列并消费");
  28.            boolean multiple = false; // 单条确认
  29.            boolean requeue  = true; // 重新进入队列,谨慎设置!!!
  30.            channel.basicNack(tag, multiple, requeue);
  31.       }
  32.   }
  33.    @RabbitListener(queues="my-queue")
  34.    public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  35.        log.info(">>>> consumer2 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
  36.        try {
  37.            Thread.currentThread().sleep(1000);
  38.            int i = 1/0;
  39.            channel.basicAck(tag,true); // 确认消息消费成功
  40.       } catch (Exception e) {
  41.            log.error(">>>>消费异常,消息重新进入队列并消费");
  42.            boolean multiple = false; // 单条确认
  43.            boolean requeue  = true;
  44.            channel.basicNack(tag, multiple, requeue);
  45.       }
  46.   }
  47. }

这里模拟了两个消费者 consumer1consumer2 ,并在逻辑中人为设置异常 int 1/0 , 在异常捕获中通过

 channel.basicNack(tag, false, true);

设置消息重新进入队列,最终推给消费者再次消费。运行结果如下:

日志里包含了几个信息点:

  1. 消费者每次只消费一条消息,因为我设置了 spring.rabbitmq.listener.simple.prefetch=1

  2. 消息推送使用的 round-robin 算法

  3. rabbitMQ的消费方式有推和拉两种方式,springboot创建的消费者模式使用的推的方式消费 this.channel.basicConsume()

2 潜在问题

如运行日志所示,重进进入队列的消息,会在队列头部,直接再次推送给消费者消费,如果是因为代码逻辑问题,将会导致消息一直消费失败,导致死循环!!!

比较合理的做法是,重试一定次数消费后,如果仍然失败,则终止重试,将消费异常的消息保存,并上报异常,由人工介入处理。

2 结合spring-retry和死信队列实现消息重试

一个比较合理的重试机制如下:

  1. 消息消费出现异常时,借助springboot提供的重试机制进行重试

    因为使用的spring-retry,所以方法中必须抛出异常,否则spring-retry不会被触发!!!

  2. 重试仍然失败时,消息转发到死信队列,死信队列的消费者记录并上报异常信息

    要实现消息消费失败自动转发到死信队列,则rabbitmq在创建消息队列时,需要指定与之绑定的死信队列

完整的实例代码如下:

1 配置文件 application.properties:

这里注释掉了 spring.rabbitmq.listener.simple.acknowledge-mode=manual ,这样在消息消费失败时,会自动转到死信队列,如果开启手动确认机制,必须调用 chanel.basicNack(tag,false,false) 消息才会进入死信队列!!!

  1. # 应用名称
  2. spring.application.name=rabbitmq
  3. server.port=8080
  4. server.servlet.context-path=/
  5. spring.rabbitmq.host=localhost
  6. spring.rabbitmq.port=5672
  7. # 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
  8. spring.rabbitmq.virtual-host=my_vhost
  9. spring.rabbitmq.username=admin
  10. spring.rabbitmq.password=admin
  11. spring.rabbitmq.listener.simple.prefetch=1
  12. # 开启 publish-comfirm 机制和消息路由匹配失败退回机制
  13. spring.rabbitmq.publisher-returns=true
  14. spring.rabbitmq.publisher-confirm-type=correlated
  15. # 开启消费者应答 ack 机制
  16. # spring.rabbitmq.listener.simple.acknowledge-mode=manual
  17. # 开启spring提供的retry
  18. spring.rabbitmq.listener.simple.retry.enabled=true
  19. spring.rabbitmq.listener.simple.retry.max-attempts=3
  20. spring.rabbitmq.listener.simple.retry.initial-interval=3000

2 RabbitConfig

主要在程序启动时,做如下设置:

  1. 创建死信队列和死信交换器,并将死信队列绑定到死信交换器。

  2. 创建普通队列和普通交换器,并将普通队列绑定到普通交换器,同时将死信队列与普通队列关联,这样当消息消费失败时,消息会进入死信队列(使用了自动 ack模式)。

  1. package com.fmi110.rabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. /**
  9. * @author fmi110
  10. * @description rabbitMQ 配置类
  11. * @date 2021/7/1 15:08
  12. */
  13. @Configuration
  14. @Slf4j
  15. public class RabbitConfig {
  16.    String dlQueueName  = "my-queue-dl"; // 普通队列名称
  17.    String dlExchangeName = "my-exchange-dl"; // 死信交换器名称
  18.    String dlRoutingKey   = "rabbit.test";
  19.    String queueName = "retry-queue";
  20.    String exchangeName = "my-exchange"; // 普通交换器名称
  21.    /**
  22.     * 创建死信队列
  23.     *
  24.     * @return
  25.     */
  26.    @Bean
  27.    public Queue queueDL() {
  28.        return QueueBuilder
  29.               .durable(dlQueueName) // 持久化队列
  30.               .build();
  31.   }
  32.    /**
  33.     * 创建死信交换机
  34.     *
  35.     * @return
  36.     */
  37.    @Bean
  38.    public TopicExchange exchangeDL() {
  39.        return new TopicExchange(dlExchangeName, true, false);
  40.   }
  41.    /**
  42.     * 绑定操作
  43.     */
  44.    @Bean
  45.    public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
  46.        log.info(">>>> 队列与交换器绑定");
  47.        return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
  48.   }
  49.    /**
  50.     * 创建持久化队列,同时绑定死信交换器
  51.     *
  52.     * @return
  53.     */
  54.    @Bean
  55.    public Queue queue() {
  56.        log.info(">>>> 创建队列 retry-queue");
  57.        HashMap<String, Object> params = new HashMap<>();
  58.        params.put("x-dead-letter-exchange", dlExchangeName);
  59.        params.put("x-dead-letter-routing-key", dlRoutingKey);
  60.        return QueueBuilder
  61.               .durable(queueName) // 持久化队列
  62.               .withArguments(params) // 关联死信交换器
  63.               .build();
  64.   }
  65.    /**
  66.     * 创建交换机
  67.     *
  68.     * @return
  69.     */
  70.    @Bean
  71.    public TopicExchange exchange() {
  72.        log.info(">>>> 创建交换器 my-exchange");
  73.        boolean durable    = true; // 持久化
  74.        boolean autoDelete = false; // 消费者全部解绑时不自动删除
  75.        return new TopicExchange(exchangeName, durable, autoDelete);
  76.   }
  77.    /**
  78.     * 绑定队列到交换机
  79.     *
  80.     * @param queue
  81.     * @param exchange
  82.     * @return
  83.     */
  84.    @Bean
  85.    public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
  86.        log.info(">>>> 队列与交换器绑定");
  87.        return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
  88.   }
  89. //   /**
  90. //     * spring-retry重试机制:当重试次数达到最大,消息仍然消费失败时回调。
  91. //     * 如果开启这个类,则死信队列失效,消息消费失败,即使配置了死信队列,消息也不会进入死信队列。
  92. //     * 重试失败回调和死信队列只能二选一!!!spring 提供回调实现类有如下几个:
  93. //     * RejectAndDontRequeueRecoverer :消费失败,并且消息不再入列,spring默认使用。
  94. //     * ImmediateRequeueMessageRecoverer :将消息重新入列
  95. //     * RepublishMessageRecoverer:转发消息到指定的队列,
  96. //     * @return
  97. //     */
  98. //   @Bean
  99. //   public MessageRecoverer messageRecoverer(){
  100. //       return new MessageRecoverer() {
  101. //           @Override
  102. //           public void recover(Message message, Throwable cause) {
  103. //               log.info(message.toString());
  104. //               log.info("spring-retry重试次数达到最大,消息仍然失败的回调");
  105. //               // TODO: 记录错误信息并上报
  106. //           }
  107. //       };
  108. //   }
  109. }

3 消息生产者 RabbitProducer

这里为了保证消息能确保消息发送,配置了 confirm 确认机制

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.AMQP;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. /**
  9. * @author fmi110
  10. * @description 消息生产者
  11. * @date 2021/7/1 15:08
  12. */
  13. @Component
  14. @Slf4j
  15. public class RabbitProducer {
  16.    @Autowired
  17.    RabbitTemplate rabbitTemplate;
  18.    /**
  19.     * 1 设置 confirm 回调,消息发送到 exchange 时回调
  20.     * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调
  21.     *
  22.     * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用
  23.     */
  24.    @PostConstruct
  25.    public void enableConfirmCallback(){
  26.        // #1
  27.        /**
  28.         * 连接不上 exchange或exchange不存在时回调
  29.         */
  30.        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
  31.            if (!ack) {
  32.                log.error("消息发送失败");
  33.                // TODO 记录日志,发送通知等逻辑
  34.           }
  35.       });
  36.        // #2
  37.        /**
  38.         * 消息投递到队列失败时,才会回调该方法
  39.         * message:发送的消息
  40.         * exchange:消息发往的交换器的名称
  41.         * routingKey:消息携带的路由关键字信息
  42.         */
  43.        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{
  44.            log.error("消息路由失败");
  45.            // TODO 路由失败后续处理逻辑
  46.       });
  47.   }
  48.    public void send(String msg){
  49.        String exchangeName = "my-exchange";
  50.        // String routingKey   = "aaa.xxx";
  51.        String routingKey   = "rabbit.test";
  52.        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
  53.   }
  54. }

4 消息消费者 RabbitConsumer

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * @author fmi110
  11. * @description 消息消费者
  12. * @date 2021/7/1 16:08
  13. */
  14. @Component
  15. @Slf4j
  16. public class RabbitConsumer {
  17.    AtomicInteger count = new AtomicInteger();
  18.    /**
  19.     * 普通队列消费者
  20.     * @param data
  21.     * @param channel
  22.     * @param tag
  23.     * @throws Exception
  24.     */
  25.    @RabbitListener(queues="retry-queue")
  26.    public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  27.        log.info(">>>> consumer 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
  28.        // TODO 消息处理逻辑
  29.        throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry");
  30.   }
  31.    /**
  32.     * 死信队列消费者
  33.     * @param data
  34.     * @param channel
  35.     * @param tag
  36.     * @throws Exception
  37.     */
  38.    @RabbitListener(queues="my-queue-dl")
  39.    public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  40.        log.info(">>>> 死信队列消费 tag = {},消息内容 : {}",tag,data);
  41. //       channel.basicNack(tag, false, false);
  42.   }
  43. }

5 Controller

用于触发发送消息

  1. package com.fmi110.rabbitmq.controller;
  2. import com.fmi110.rabbitmq.RabbitProducer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Controller;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.ResponseBody;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.HashMap;
  10. @RestController
  11. public class TestController {
  12.    @Autowired
  13.    RabbitProducer rabbitProducer;
  14.    @GetMapping("/test")
  15.    public Object test() {
  16.        rabbitProducer.send("this is a message");
  17.        HashMap<String, Object> result = new HashMap<>();
  18.        result.put("code", 0);
  19.        result.put("msg", "success");
  20.        return result;
  21.   }
  22. }

6 运行结果

运行日志如下:

  1. : >>>> consumer 消费 tag = 1,次数count=1,消息内容 : this is a message
  2. : >>>> consumer 消费 tag = 1,次数count=2,消息内容 : this is a message
  3. : >>>> consumer 消费 tag = 1,次数count=3,消息内容 : this is a message
  4. o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message
  5. (Body:'this is a message' MessageProperties
  6. [headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729},
  7. contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
  8. receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
  9. receivedExchange=my-exchange, receivedRoutingKey=rabbit.test,
  10. deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ,
  11. consumerQueue=retry-queue])
  12. ...
  13. Caused by: java.lang.RuntimeException: 抛出异常,模拟消费失败,触发spring-retry
  14. at com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na]
  15. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
  16. ....
  17. : >>>> 死信队列消费 tag = 1,消息内容 : this is a message

从日志可看出,普通队列的消费者一共消费了三次仍然失败,最后回调spring提供 RejectAndDontRequeueRecoverer ,然后消息进入死信队列被消费。

7 pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.projectlombok</groupId>
  12. <artifactId>lombok</artifactId>
  13. <scope>compile</scope>
  14. </dependency>
  15. </dependencies>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/996797
推荐阅读
相关标签
  

闽ICP备14008679号