赞
踩
原文网址:RabbitMQ--重试机制_IT利刃出鞘的博客-CSDN博客
说明
本文介绍RabbitMQ的重试机制。
问题描述
消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。
消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。
RabbitMQ的自动确认
自动确认分四种情况(第一就是正常消费,其他三种为异常情况)
我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环。
消息未被确认时如下图所示:
本处使用spring-rabbit中自带的重试功能解决上述问题。
注意
重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。
不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。
重试机制有2种情况
配置
application.yml
- spring:
- # RabbitMQ服务配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- listener:
- simple:
- # 重试机制
- retry:
- enabled: true #是否开启消费者重试
- max-attempts: 3 #最大重试次数
- initial-interval: 5000ms #重试间隔时间(单位毫秒)
- max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
- # 乘子。间隔时间*乘子=下一次的间隔时间,不能超过max-interval
- # 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
- multiplier: 2
代码
- @RabbitListener(queues = "meat_queue")
- public void processMeatTwo(String message) throws InterruptedException {
- System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);
- Thread.sleep(1000);
- //模拟异常
- String is = null;
- is.toString();
- }
结果
可以看到,消息重试了5次,之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted,提示我们重试次数已经用尽了。
消息重试次数用尽后,消息就会被抛弃。
消息在重试完之后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类(看它们名字即可知道含义):
默认情况下是RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列。我们可以使用RepublishMessageRecoverer,重新发布消息,将它发布到其他队列,后边对它进行补偿处理。
先创建一个异常队列,然后与交换机绑定进行绑定,绑定之后设置MessageRecoverer。
- @Configuration
- public class MQErrorConfig {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- private static String errorTopicExchange = "error-topic-exchange";
- private static String errorQueue = "error-queue";
- private static String errorRoutingKey = "error-routing-key";
-
- //创建异常交换机
- @Bean
- public TopicExchange errorTopicExchange(){
- return new TopicExchange(errorTopicExchange, true, false);
- }
-
- //创建异常队列
- @Bean
- public Queue errorQueue(){
- return new Queue(errorQueue, true);
- }
-
- //队列与交换机进行绑定
- @Bean
- public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange){
- return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
- }
-
- //设置MessageRecoverer
- @Bean
- public MessageRecoverer messageRecoverer(){
- //AmqpTemplate和RabbitTemplate都可以
- return new RepublishMessageRecoverer(rabbitTemplate, errorTopicExchange, errorRoutingKey);
- }
- }
查看处理结果:
通过控制台可以看到,消息重试5次以后直接以新的routingKey发送到了配置的交换机中,此时再查看监控页面,可以看原始队列中已经没有消息了,但是配置的异常队列中存在一条消息:
上面的例子在测试中发现了一个问题,就是经过5次重试以后,控制台输出了一个异常的堆栈日志,然后队列中的数据也被ack掉了(自动ack模式),首先我们看一下这个异常日志是什么。
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
出现消息被消费掉并且出现上述异常的原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:
- ListenerRetry retryConfig = configuration.getRetry();
- if (retryConfig.isEnabled()) {
- RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
- : RetryInterceptorBuilder.stateful();
- RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
- .createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
- builder.retryOperations(retryTemplate);
- MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
- : new RejectAndDontRequeueRecoverer(); // 1
- builder.recoverer(recoverer);
- factory.setAdviceChain(builder.build());
注意看1处的代码,默认使用的是RejectAndDontRequeueRecoverer实现类,根据实现类的名字我们就可以看出来该实现类的作用就是拒绝并且不会将消息重新发回队列,我们可以看一下这个实现类的具体内容:
- public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
- protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
- @Override
- public void recover(Message message, Throwable cause) {
- if (this.logger.isWarnEnabled()) {
- this.logger.warn("Retries exhausted for message " + message, cause);
- }
- throw new ListenerExecutionFailedException("Retry Policy Exhausted",
- new AmqpRejectAndDontRequeueException(cause), message);
- }
- }
上述源码给出了异常的来源,但是未看到拒绝消息的代码,猜测应该是使用aop的方式实现的,此处不再继续深究。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。