当前位置:   article > 正文

springboot:整合rabbitmq之重试机制_springboot整合rabbitmq出现异常限制重试次数

springboot整合rabbitmq出现异常限制重试次数

当我们消息消费失败的时候,可以进行重试,
什么情况下会重发消息
1、网络抖动
2、程序抛出异常没有try-catch
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
1.当我们的消费者在处理我们的消息的时候,程序抛出异常情况下(默认无限次数重试),如果这里的异常try-catch后自己配置的重试机制是不生效的
2.应该对我们的消息重试设置间隔重试时间,比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)

如果重试5次,也就是15秒内重试还是失败情况下应该如何处理
1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失)
解决思路:
A:如果重试多次还是失败的情况下,最终存放到死信队列.
B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消息实现补偿.

一、项目准备
依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

配置类

spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.addresses=192.168.23.145
spring.rabbitmq.virtual-host=/rabbit

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二、案例重现

# 声明队列
@Configuration
public class HelloWorldConfig {
	@Bean
	public Queue setQueue() {
		return new Queue("helloWorldqueue");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
# 生产者
@Slf4j
@RestController
public class ProducerController {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	//helloWorld 直连模式
	@ApiOperation(value="helloWorld发送接口",notes="直接发送到队列")
	@GetMapping(value="/helloWorldSend")
	public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
		//设置部分请求参数
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

		//发消息
		rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
		return "message sended : "+message;
	}
}

# 消费者
@Component
public class ConcumerReceiver {

	private int count = 1;

	//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
	//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
	@RabbitListener(queues="helloWorldqueue")
	public void helloWorldReceive(String message) {
		System.out.println("当前执行次数:" + count++);
	     System.out.println("异常前,helloWorld模式 received message : " +message);
	     int i = 1/0;
		System.out.println("异常后,helloWorld模式 received message : " +message);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

启动测试:
无限循环报错
停止后,消息重回Ready状态
在这里插入图片描述
三、实现消息重试
实现重试

spring.rabbitmq.listener.simple.retry.enabled= true
spring.rabbitmq.listener.simple.retry.max-attempts= 5
spring.rabbitmq.listener.simple.retry.max-interval= 10000
spring.rabbitmq.listener.simple.retry.initial-interval= 2000
spring.rabbitmq.listener.simple.retry.multiplier= 2
  • 1
  • 2
  • 3
  • 4
  • 5

重启测试
第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s
最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息
移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
在这里插入图片描述

对重试失败的消息重新排队
使用下 ImmediateRequeueMessageRecoverer 重新排队在HelloWorldConfig中配置

    @Bean
    public MessageRecoverer messageRecoverer() {
        return new ImmediateRequeueMessageRecoverer();
    }

  • 1
  • 2
  • 3
  • 4
  • 5

重启运行:

可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费
在这里插入图片描述
把重试失败消息放入重试失败队列
接着使用 RepublishMessageRecoverer 重新发布在HelloWorldConfig中配置

@Configuration
public class HelloWorldConfig {

	@Bean
	public Queue setQueue() {
		return new Queue("helloWorldqueue");
	}

	/*@Bean
	public MessageRecoverer messageRecoverer() {
		return new ImmediateRequeueMessageRecoverer();
	}*/
	@Bean
	public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
		// 需要配置交换机和绑定键
		return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);
	}
	//失败队列
	public static final String RETRY_FAILURE_KEY = "retry.failure.key";
	//失败交换机
	public static final String RETRY_EXCHANGE = "retry_exchange";

	@Bean
	public Queue setQueueFailure() {
		return new Queue(RETRY_FAILURE_KEY);
	}

	@Bean
	public FanoutExchange setFailureExchange() {
		return new FanoutExchange(RETRY_EXCHANGE);
	}
	@Bean
	public Binding bindFailureBind() {
		return BindingBuilder.bind(setQueueFailure()).to(setFailureExchange());
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在ConcumerReceiver中创建重试失败消息监听

@RabbitListener(queues="retry.failure.key")
	public void retryFailure(String message) throws InterruptedException {
		Thread.sleep(20000);
		System.out.println(" [ 消费者@重试失败号 ] 接收到消息 ==> '" + message);
	}
  • 1
  • 2
  • 3
  • 4
  • 5

重启,运行结果:

重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/603947
推荐阅读
相关标签
  

闽ICP备14008679号