当前位置:   article > 正文

rabbitmq重试机制_rabbitmq max-attempts

rabbitmq max-attempts

windows安装rabbitmq和启动
https://baijiahao.baidu.com/s?id=1720472084636520996&wfr=spider&for=pc

配置文件:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false
        acknowledge-mode: auto
        retry:
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          max-attempts: 4 #最大重试次数
          initial-interval: 1000 #重试间隔时间(单位毫秒)
          max-interval: 1200000 #重试最大时间间隔(单位毫秒)
          multiplier: 5 #应用于前一重试间隔的乘法器。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

RabbitMQConfig:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange() {
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
//        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }


    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }


    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }


    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

BusinessMessageSender :

@Component
public class BusinessMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(MsgLog msgLog) {
        rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "",msgLog);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

BusinessMessageReceiver :

@Slf4j
@Component
public class BusinessMessageReceiver {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
    private int max_attempts;

    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)
    public void receiveA(MsgLog msgLog, Message message, Channel channel) throws IOException {
        String msg = msgLog.toString();
        log.info("收到业务消息:{}", msg);
        JSONObject retJson = new JSONObject();
        String ret = null;
//      ret = HttpRequest.post(msgLog.getUrl()).body(msgLog.getMsg()).execute().body();
//        retJson = JSONObject.parseObject(ret);
//        log.info("ret" + ret);
        retJson.put("resultCode", 301);
        msgLog.setTime(LocalDateTime.now());
        msgLog.setResult(ret);
        int count = 1;
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        log.info(headers.toString());
        if (headers.get("failed_count_for_send_to_exchange") != null) {
            count = (int) headers.get("failed_count_for_send_to_exchange");
        }
        if (count == max_attempts) {
            msgLog.setStatus(4);
            msgLog.setRetryCount(max_attempts);
            msgLogMapper.updateById(msgLog);
        }
        if (!retJson.get("resultCode").equals(200)) {
            headers.put("failed_count_for_send_to_exchange", count + 1);
            throw new RuntimeException("重试");
        } else {
            msgLog.setRetryCount(count);
            msgLog.setStatus(3);
            msgLogMapper.updateById(msgLog);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

死信队列

@Slf4j
@Component
public class DeadLetterMessageReceiver {

    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(MsgLog msgLog, Message message, Channel channel) {
        log.info("收到死信消息:" + msgLog.toString());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

MsgLog :

@Data
@Accessors(chain = true)
@ApiModel(value = "MsgLog对象", description = "消息投递日志")
public class MsgLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty(value = "消息唯一标识")
    @TableId(type = IdType.AUTO)
    private Integer id;

    @ApiModelProperty(value = "消息体, json格式化")
    private String msg;

    @ApiModelProperty(value = "请求地址")
    private String url;

    @ApiModelProperty(value = "请求时间")
    private LocalDateTime time;

    @ApiModelProperty(value = "请求结果")
    private String result;

    @ApiModelProperty(value = "状态,1:请求成功 2:重试中 3:重试后成功 4:重试后失败")
    private Integer status;

    @ApiModelProperty(value = "重试次数")
    private Integer retryCount;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

ServiceImpl

    @Override
    public void send(String msg) {

        JSONObject orderJson = new JSONObject();
        log.info("接口入参" + orderJson);
        String uri = "";
        JSONObject retJson = new JSONObject();
        String ret = "";
//        String ret = HttpRequest.post(msgLog.getUrl()).body(msgLog.getMsg()).execute().body();
//        retJson = JSONObject.parseObject(ret);
//        log.error("ret" + ret);
        retJson.put("resultCode", 301);
        MsgLog msgLog = new MsgLog();
//            msgLog.setMsg(JSONUtil.toJsonPrettyStr(orderJson));
        msgLog.setMsg(msg);
        msgLog.setUrl(uri);
        msgLog.setResult(ret);
        msgLog.setTime(LocalDateTime.now());
        if (!retJson.get("resultCode").equals(200)) {
            msgLog.setRetryCount(0);
            msgLog.setStatus(2);
            msgLogMapper.insert(msgLog);
            businessMessageSender.sendMsg(msgLog);
        } else {
            msgLog.setStatus(1);
            msgLogMapper.insert(msgLog);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

控制台结果:

2022-08-18 09:29:23.859  INFO 33992 --- [nio-9001-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2022-08-18 09:29:23.869  INFO 33992 --- [nio-9001-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-8VFVj3Nm7FX4gnMtghuEsg identity=3a7c278d] started
2022-08-18 09:29:24.040  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)
2022-08-18 09:29:24.040  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : {}
2022-08-18 09:29:25.057  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)
2022-08-18 09:29:25.057  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : {failed_count_for_send_to_exchange=2}
2022-08-18 09:29:30.072  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)
2022-08-18 09:29:30.073  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : {failed_count_for_send_to_exchange=3}
2022-08-18 09:29:55.080  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)
2022-08-18 09:29:55.080  INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver     : {failed_count_for_send_to_exchange=4}
2022-08-18 09:29:55.123  WARN 33992 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@92cf7a6(byte[352])' MessageProperties [headers={failed_count_for_send_to_exchange=5}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA3OTY0NjI5NwAABHEAAAAAYv2Rlw==.Hsw8oUWTg7ZabM+vBpqkgQ==, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.business.exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-zIRXwiHfxwFbi6cfMGBJOQ, consumerQueue=dead.letter.demo.simple.business.queuea])

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.yull.testyu.config.BusinessMessageReceiver.receiveA(com.yull.testyu.entity.MsgLog,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:204) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) [spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:115) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source) ~[na:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:875) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_172]
Caused by: java.lang.RuntimeException: 重试
	at com.yull.testyu.config.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:56) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	... 26 common frames omitted

2022-08-18 09:29:55.124  WARN 33992 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.lambda$getObject$0(StatelessRetryOperationsInterceptorFactoryBean.java:64) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:141) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:115) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source) ~[na:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:875) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
	... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.yull.testyu.config.BusinessMessageReceiver.receiveA(com.yull.testyu.entity.MsgLog,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:204) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.4.RELEASE.jar:na]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.4.RELEASE.jar:na]
	... 14 common frames omitted
Caused by: java.lang.RuntimeException: 重试
	at com.yull.testyu.config.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:56) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE]
	... 26 common frames omitted

2022-08-18 09:29:55.131  INFO 33992 --- [ntContainer#1-1] c.y.t.config.DeadLetterMessageReceiver   : 收到死信消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

消息流转的流程:

  1. 当请求外部地址返回的code不是成功的code时,将消息封装成MsgLog对象,发送到重试队列;
  2. 在重试队列中进行重新请求外部地址,如果还是失败,会抛出异常,进行重试,重试次数和间隔时间由配置文件决定;
  3. 到达最大重试次数后,重试队列会将消息发送到死信队列,死信队列的消费者费死信消息。

重试次数可以存放在message的header中,由于每次重试和发送到死信队列中的消息都是最开始放进发送到普通队列中的消息,所以选择在重试队列判断是否到了最大重试次数,从而更新数据库。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/996808
推荐阅读
相关标签
  

闽ICP备14008679号