赞
踩
当我们消息消费失败的时候,可以进行重试,
什么情况下会重发消息
1、网络抖动
2、程序抛出异常没有try-catch
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
1.当我们的消费者在处理我们的消息的时候,程序抛出异常情况下(默认无限次数重试),如果这里的异常try-catch后自己配置的重试机制是不生效的
2.应该对我们的消息重试设置间隔重试时间,比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)
如果重试5次,也就是15秒内重试还是失败情况下应该如何处理
1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失)
解决思路:
A:如果重试多次还是失败的情况下,最终存放到死信队列.
B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消息实现补偿.
一、项目准备
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置类
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.addresses=192.168.23.145
spring.rabbitmq.virtual-host=/rabbit
二、案例重现
# 声明队列
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
}
# 生产者 @Slf4j @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //helloWorld 直连模式 @ApiOperation(value="helloWorld发送接口",notes="直接发送到队列") @GetMapping(value="/helloWorldSend") public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException { //设置部分请求参数 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //发消息 rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : "+message; } } # 消费者 @Component public class ConcumerReceiver { private int count = 1; //直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式 //通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos @RabbitListener(queues="helloWorldqueue") public void helloWorldReceive(String message) { System.out.println("当前执行次数:" + count++); System.out.println("异常前,helloWorld模式 received message : " +message); int i = 1/0; System.out.println("异常后,helloWorld模式 received message : " +message); } }
启动测试:
无限循环报错
停止后,消息重回Ready状态
三、实现消息重试
实现重试
spring.rabbitmq.listener.simple.retry.enabled= true
spring.rabbitmq.listener.simple.retry.max-attempts= 5
spring.rabbitmq.listener.simple.retry.max-interval= 10000
spring.rabbitmq.listener.simple.retry.initial-interval= 2000
spring.rabbitmq.listener.simple.retry.multiplier= 2
重启测试
第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s
最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息
移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
对重试失败的消息重新排队
使用下 ImmediateRequeueMessageRecoverer 重新排队在HelloWorldConfig中配置
@Bean
public MessageRecoverer messageRecoverer() {
return new ImmediateRequeueMessageRecoverer();
}
重启运行:
可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费
把重试失败消息放入重试失败队列
接着使用 RepublishMessageRecoverer 重新发布在HelloWorldConfig中配置
@Configuration public class HelloWorldConfig { @Bean public Queue setQueue() { return new Queue("helloWorldqueue"); } /*@Bean public MessageRecoverer messageRecoverer() { return new ImmediateRequeueMessageRecoverer(); }*/ @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { // 需要配置交换机和绑定键 return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY); } //失败队列 public static final String RETRY_FAILURE_KEY = "retry.failure.key"; //失败交换机 public static final String RETRY_EXCHANGE = "retry_exchange"; @Bean public Queue setQueueFailure() { return new Queue(RETRY_FAILURE_KEY); } @Bean public FanoutExchange setFailureExchange() { return new FanoutExchange(RETRY_EXCHANGE); } @Bean public Binding bindFailureBind() { return BindingBuilder.bind(setQueueFailure()).to(setFailureExchange()); } }
在ConcumerReceiver中创建重试失败消息监听
@RabbitListener(queues="retry.failure.key")
public void retryFailure(String message) throws InterruptedException {
Thread.sleep(20000);
System.out.println(" [ 消费者@重试失败号 ] 接收到消息 ==> '" + message);
}
重启,运行结果:
重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。