赞
踩
windows安装rabbitmq和启动
https://baijiahao.baidu.com/s?id=1720472084636520996&wfr=spider&for=pc
配置文件:
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: default-requeue-rejected: false acknowledge-mode: auto retry: enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) max-attempts: 4 #最大重试次数 initial-interval: 1000 #重试间隔时间(单位毫秒) max-interval: 1200000 #重试最大时间间隔(单位毫秒) multiplier: 5 #应用于前一重试间隔的乘法器。
RabbitMQConfig:
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea"; public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea"; // 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange() { return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明业务队列A @Bean("businessQueueA") public Queue businessQueueA() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // args.put("x-message-ttl", 10000); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 声明死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明业务队列A绑定关系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } }
BusinessMessageSender :
@Component
public class BusinessMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(MsgLog msgLog) {
rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "",msgLog);
}
}
BusinessMessageReceiver :
@Slf4j @Component public class BusinessMessageReceiver { @Autowired private MsgLogMapper msgLogMapper; @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") private int max_attempts; @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME) public void receiveA(MsgLog msgLog, Message message, Channel channel) throws IOException { String msg = msgLog.toString(); log.info("收到业务消息:{}", msg); JSONObject retJson = new JSONObject(); String ret = null; // ret = HttpRequest.post(msgLog.getUrl()).body(msgLog.getMsg()).execute().body(); // retJson = JSONObject.parseObject(ret); // log.info("ret" + ret); retJson.put("resultCode", 301); msgLog.setTime(LocalDateTime.now()); msgLog.setResult(ret); int count = 1; Map<String, Object> headers = message.getMessageProperties().getHeaders(); log.info(headers.toString()); if (headers.get("failed_count_for_send_to_exchange") != null) { count = (int) headers.get("failed_count_for_send_to_exchange"); } if (count == max_attempts) { msgLog.setStatus(4); msgLog.setRetryCount(max_attempts); msgLogMapper.updateById(msgLog); } if (!retJson.get("resultCode").equals(200)) { headers.put("failed_count_for_send_to_exchange", count + 1); throw new RuntimeException("重试"); } else { msgLog.setRetryCount(count); msgLog.setStatus(3); msgLogMapper.updateById(msgLog); } } }
死信队列:
@Slf4j
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
public void receiveA(MsgLog msgLog, Message message, Channel channel) {
log.info("收到死信消息:" + msgLog.toString());
}
}
MsgLog :
@Data @Accessors(chain = true) @ApiModel(value = "MsgLog对象", description = "消息投递日志") public class MsgLog implements Serializable { private static final long serialVersionUID = 1L; @ApiModelProperty(value = "消息唯一标识") @TableId(type = IdType.AUTO) private Integer id; @ApiModelProperty(value = "消息体, json格式化") private String msg; @ApiModelProperty(value = "请求地址") private String url; @ApiModelProperty(value = "请求时间") private LocalDateTime time; @ApiModelProperty(value = "请求结果") private String result; @ApiModelProperty(value = "状态,1:请求成功 2:重试中 3:重试后成功 4:重试后失败") private Integer status; @ApiModelProperty(value = "重试次数") private Integer retryCount; }
@Override public void send(String msg) { JSONObject orderJson = new JSONObject(); log.info("接口入参" + orderJson); String uri = ""; JSONObject retJson = new JSONObject(); String ret = ""; // String ret = HttpRequest.post(msgLog.getUrl()).body(msgLog.getMsg()).execute().body(); // retJson = JSONObject.parseObject(ret); // log.error("ret" + ret); retJson.put("resultCode", 301); MsgLog msgLog = new MsgLog(); // msgLog.setMsg(JSONUtil.toJsonPrettyStr(orderJson)); msgLog.setMsg(msg); msgLog.setUrl(uri); msgLog.setResult(ret); msgLog.setTime(LocalDateTime.now()); if (!retJson.get("resultCode").equals(200)) { msgLog.setRetryCount(0); msgLog.setStatus(2); msgLogMapper.insert(msgLog); businessMessageSender.sendMsg(msgLog); } else { msgLog.setStatus(1); msgLogMapper.insert(msgLog); } }
控制台结果:
2022-08-18 09:29:23.859 INFO 33992 --- [nio-9001-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to] 2022-08-18 09:29:23.869 INFO 33992 --- [nio-9001-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-8VFVj3Nm7FX4gnMtghuEsg identity=3a7c278d] started 2022-08-18 09:29:24.040 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0) 2022-08-18 09:29:24.040 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : {} 2022-08-18 09:29:25.057 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0) 2022-08-18 09:29:25.057 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : {failed_count_for_send_to_exchange=2} 2022-08-18 09:29:30.072 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0) 2022-08-18 09:29:30.073 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : {failed_count_for_send_to_exchange=3} 2022-08-18 09:29:55.080 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : 收到业务消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0) 2022-08-18 09:29:55.080 INFO 33992 --- [ntContainer#0-1] c.y.t.config.BusinessMessageReceiver : {failed_count_for_send_to_exchange=4} 2022-08-18 09:29:55.123 WARN 33992 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'[B@92cf7a6(byte[352])' MessageProperties [headers={failed_count_for_send_to_exchange=5}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA3OTY0NjI5NwAABHEAAAAAYv2Rlw==.Hsw8oUWTg7ZabM+vBpqkgQ==, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.business.exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-zIRXwiHfxwFbi6cfMGBJOQ, consumerQueue=dead.letter.demo.simple.business.queuea]) org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.yull.testyu.config.BusinessMessageReceiver.receiveA(com.yull.testyu.entity.MsgLog,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:204) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172] at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) [spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:115) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source) ~[na:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:875) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_172] Caused by: java.lang.RuntimeException: 重试 at com.yull.testyu.config.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:56) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] ... 26 common frames omitted 2022-08-18 09:29:55.124 WARN 33992 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.lambda$getObject$0(StatelessRetryOperationsInterceptorFactoryBean.java:64) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:141) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:115) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.$Proxy108.invokeListener(Unknown Source) ~[na:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1451) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:875) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048) [spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172] Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null ... 19 common frames omitted Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.yull.testyu.config.BusinessMessageReceiver.receiveA(com.yull.testyu.entity.MsgLog,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:204) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1542) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1468) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172] at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.4.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.4.RELEASE.jar:na] ... 14 common frames omitted Caused by: java.lang.RuntimeException: 重试 at com.yull.testyu.config.BusinessMessageReceiver.receiveA(BusinessMessageReceiver.java:56) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_172] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_172] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_172] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_172] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.12.RELEASE.jar:5.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) ~[spring-rabbit-2.1.12.RELEASE.jar:2.1.12.RELEASE] ... 26 common frames omitted 2022-08-18 09:29:55.131 INFO 33992 --- [ntContainer#1-1] c.y.t.config.DeadLetterMessageReceiver : 收到死信消息:MsgLog(id=41, msg=3, url=, time=2022-08-18T09:29:23.184, result=, status=2, retryCount=0)
消息流转的流程:
重试次数可以存放在message的header中,由于每次重试和发送到死信队列中的消息都是最开始放进发送到普通队列中的消息,所以选择在重试队列判断是否到了最大重试次数,从而更新数据库。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。