赞
踩
RabbitMQ默认是auto模式,当监听消费者方法正常执行完毕后,由Spring自动向RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack消费失败。
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: auto
RabbitMQ认为所有消息都会被成功消费,所以RabbitMQ投递消息后会立即删除消息
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: none
开发人员在处理完业务后,调用RabbitMQ封装好的API,向RabbitMQ返回ack确认消费成功或者消费失败
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: manual
可以利用Spring本身自动重试的机制,当消费者出现异常后,在消费者内部进行本地重试;而不是让消息立刻重新回到队列,然后让RabbitMQ重新投递,会导致CPU飙升。(这样会导致无限循环 -> 消息一旦出现异常会不断投放回到队列,再重新发送给消费者)。
application.yml配置
定义队列和交换机
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RetryQueueConfig { //定义direct类型交换机 @Bean public DirectExchange retryExchange() { return ExchangeBuilder.directExchange("retry.exchange").build(); } //定义持久化队列 @Bean public Queue retryQueue() { return new Queue("retry.queue",true,false,false); } @Bean public Binding retryQueueBinding(Queue retryQueue, DirectExchange retryExchange) { return BindingBuilder.bind(retryQueue).to(retryExchange).with("retry"); } }
模拟生产者
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class RetryQueueTest { @Autowired private RabbitTemplate rabbitTemplate; //模拟生产者 @Test public void test(){ rabbitTemplate.convertAndSend("retry.exchange","retry","模拟消息异常"); } }
控制台输出
重试三次后就把消息删除了
重试失败后的恢复策略
在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。
但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢失消息。是默认的处理策略
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。
RepublishMessageRecoverer策略代码
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RepublishMessageRecovererConfig { /* * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略:重试次数耗尽后,将失败消息投递到指定的交换机 */ @Bean public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error"); } //定义Topic类型交换机 @Bean public TopicExchange errorExchange() { return ExchangeBuilder.topicExchange("error.exchange").build(); } //定义队列 @Bean public Queue errorQueue() { return QueueBuilder.durable("error.queue").build(); } //队列和交换机绑定 @Bean public Binding errorQueueBinding(TopicExchange errorExchange, Queue errorQueue) { return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.#"); } }
这样就实现了异常消息重试耗尽后,就会投递到指定的异常队列中去,等待人工处理了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。