赞
踩
消费者因为代码问题出现了异常,此时默认是自动提交的消息,这个RuntimException
会导致消息直接重新入队,再次投递(进入队首),此时会导致后面的消息被阻塞.
auto
自动确认分四种情况,第一种就是正常消费,其他三种则为异常情况
ack
。不涉及requeue
,毕竟已经成功了。requeue
是对被拒绝的消息生效。ImmediateAcknowledgeAmqpException
异常,则视为成功消费,确认该消息。AmqpRejectAndDontRequeueException
异常的时候,则消息会被拒绝,且requeue = false
(该异常会在重试超过限制后抛出)requeue = true
我遇到的是第四种情况,导致mq
消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环,cpu
和磁盘io
直接拉高.
1-在消费者消费逻辑外面套个catch
,把异常吃掉,然后把当前异常的消息再做额外处理
2-把mq
重新入队关闭
3-抛出ImmediateAcknowledgeAmqpException/AmqpRejectAndDontRequeueException
异常
使用spring-retry
重试3
次后还是失败就记录到mysql
中,作为后续补偿的记录.
需要依赖如下
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
启动类加上@EnableRetry
注解
实现自动重试代码demo
package com.fchan.mq.process; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Recover; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Component; import java.util.Map; import java.util.Objects; /** * ClassName: MyRabbitConsume <br/> * Description: <br/> * date: 2022/7/18 17:07<br/> * * @author fchen<br /> */ @Component public class MyRabbitNormalMessageProcess{ Logger log = LoggerFactory.getLogger(MyRabbitNormalMessageProcess.class); //默认的重试就是3次,maxAttempts = 第一次正常请求 + 后续异常重试次数 @Retryable(value = AmqpRejectAndDontRequeueException.class, maxAttempts = 3,backoff = @Backoff(delay = 2000L, multiplier = 1.5), recover = "sendNormalRetryable") public Object processMessage(Map<String,Object> data){ log.info("处理消息中----------"); if(Objects.equals(data.get("data"), "exception")){ throw new AmqpRejectAndDontRequeueException("mq消费时出现异常"); } log.info("收到normal信息:{}",data); log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ"); return "success"; } //这里利用的就是spring aop,所以需要aspect依赖 //入参除了第一个throwable其余需要和原方法一直,返回值也需要一致 @Recover public Object sendNormalRetryable(Throwable throwable, Map<String,Object> data){ log.error("重试次数上限,异常信息:{}", throwable.getMessage()); log.info("入参map:{},插入db", data); return "exception process"; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。