当前位置:   article > 正文

rabbitmq消息阻塞情况分析;自动重试处理异常_amqprejectanddontrequeueexception

amqprejectanddontrequeueexception

现象描述

消费者因为代码问题出现了异常,此时默认是自动提交的消息,这个RuntimException会导致消息直接重新入队,再次投递(进入队首),此时会导致后面的消息被阻塞.

分析

auto自动确认分四种情况,第一种就是正常消费,其他三种则为异常情况

  1. 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  2. 当抛出ImmediateAcknowledgeAmqpException异常,则视为成功消费,确认该消息。
  3. 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  4. 其他的异常,则消息会被拒绝,且requeue = true

我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环,cpu磁盘io直接拉高.

解决方案

1-在消费者消费逻辑外面套个catch,把异常吃掉,然后把当前异常的消息再做额外处理
2-把mq重新入队关闭
3-抛出ImmediateAcknowledgeAmqpException/AmqpRejectAndDontRequeueException异常

我的解决方案

使用spring-retry重试3次后还是失败就记录到mysql中,作为后续补偿的记录.

需要依赖如下

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aspects</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

启动类加上@EnableRetry注解
在这里插入图片描述

实现自动重试代码demo

package com.fchan.mq.process;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;

/**
 * ClassName: MyRabbitConsume <br/>
 * Description: <br/>
 * date: 2022/7/18 17:07<br/>
 *
 * @author fchen<br />
 */
@Component
public class MyRabbitNormalMessageProcess{
    
    Logger log = LoggerFactory.getLogger(MyRabbitNormalMessageProcess.class);

	//默认的重试就是3次,maxAttempts = 第一次正常请求 + 后续异常重试次数
    @Retryable(value = AmqpRejectAndDontRequeueException.class, maxAttempts = 3,backoff = @Backoff(delay = 2000L, multiplier = 1.5), recover = "sendNormalRetryable")
    public Object processMessage(Map<String,Object> data){
        log.info("处理消息中----------");
        if(Objects.equals(data.get("data"), "exception")){
            throw new AmqpRejectAndDontRequeueException("mq消费时出现异常");
        }
        log.info("收到normal信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");

        return "success";
    }

	//这里利用的就是spring aop,所以需要aspect依赖
	//入参除了第一个throwable其余需要和原方法一直,返回值也需要一致
    @Recover
    public Object sendNormalRetryable(Throwable throwable, Map<String,Object> data){
        log.error("重试次数上限,异常信息:{}", throwable.getMessage());
        log.info("入参map:{},插入db", data);
        return "exception process";
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/543524
推荐阅读
相关标签
  

闽ICP备14008679号