赞
踩
目录
在生产者这边,RabbitMQ 提供了 消息确认机制 来确保生产者的消息到达队列。
具体的,生产者将消息发送给 MQ 之后,会返回一个结果给生产者,表示消息是否处理成功,具体有以下两种响应:
最后生产者这边的回调接收到响应后,根据不同的 ack 执行不同的“策略”(类似于你去买书,然后拿到书以后具体要干啥,都由你决定).
Ps:确认机制发送消息时,需要给每一个消息设置一个全局唯一的 id, 以区分不同消息,避免 ack 冲突.
a)再 publisher 微服务的 application.yml 中添加配置:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated
- publisher-returns: true
- template:
- mandatory: true
配置说明:
b)每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
- @Slf4j
- @Configuration
- public class CommonConfig implements ApplicationContextAware {
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- // 获取RabbitTemplate
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- // 设置ReturnCallback
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
- replyCode, replyText, exchange, routingKey, message.toString());
- });
- }
- }
Ps:ApplicationContextAware 就是 Spring 容器启动时的要执行的通知接口,通过 setApplicationContext 方法实现具体的通知.
c)生产者发送消息,指定 ID,消息 ConfirmCallback
- @Test
- public void testSendMessage2SimpleQueue() throws InterruptedException {
- // 消息体
- String message = "hello, spring amqp!";
- // 消息ID,需要封装到CorrelationData中
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- // 添加callback
- correlationData.getFuture().addCallback(
- result -> {
- if(result.isAck()){
- // ack,消息成功
- log.debug("消息发送成功, ID:{}", correlationData.getId());
- }else{
- // nack,消息失败
- log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
- }
- },
- ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
- );
- // 发送消息
- rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
- }
MQ 默认时内存存储消息,通过开启持久化功能(设置 durable = true),就可以将消息持久化到文件中,保证保证消息不丢失.
Ps:消息要持久化的前提是交换机(不一定,但最好是)和队列是持久化的.
1.2.2、实践
a)交换机持久化
- @Bean
- public DirectExchange simpleExchange(){
- // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
- return new DirectExchange("simple.direct", true, false);
- }
b)队列持久化
- @Bean
- public Queue simpleQueue(){
- // 使用QueueBuilder构建队列,durable就是持久化的
- return QueueBuilder.durable("simple.queue").build();
- }
c)消息持久化
- public void testDurableMessage() {
- //1.构造一个持久的消息
- Message message = MessageBuilder.withBody("hello".getBytes())
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- .build();
- rabbitTemplate.convertAndSend("simple.queue", message);
- }
Ps:delivery_mode = 2 就表示消息要持久化.
在编程式的 RabbitMQ 中消费者有两种应答方式,自动应答和手动应答,而一定我们在消费消息时开启了手动应答,那么当消费消息完成后不会主动删除消息,而是需要通过客户端手动调用basicAck 方法才会进行应答(内部逻辑就是从内存和硬盘上删除消息)。
在注解式的 RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ收到ack回执后才会删除该消息.
SpringAMQP 允许配置三种确认模式:
这里只需要配置以下 application.yml 文件,添加以下配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1
- acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack
刚刚讲到,消费者消费确认,SpringAMQP 提供了三种确认模式,其中 auto 这种方式,在消费者执行消费代码遇到异常时,会重新将消息加入到队列中,然后发送给消费者,再次异常,无限循环,导致 mq 的消息处理飙升,带来不必要的压力.
假设消费任务如下:
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg) {
- System.out.println("消费者接收到消息:" + msg);
- System.out.println("开始消费!");
- System.out.println(1/0);
- System.out.println("消费完成!");
- }
- }
我们可以利用 Spring 的 retry 机制,在消费者出现异常时,利用本地重试,而不是无限制的加入到 mq 队列,只需要对消费者的配置文件进行以下配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000 # 初始的失败等待时长为1秒
- multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 4 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
在开启重试模式以后,若重试次数耗尽,并且消息依然失败,则需要有 MessageRecoverer 接口来处理,他包含三种不同的实现:
上述第三种方式比较推荐,如下图:
1.4.2、实践
这里就测试以下推荐方案 RepublishMessageRecoverer
a)首先要定义用来接收失败消息的交换机、队列、绑定关系,最后定义 RepublishMessageRecoverer(Bean 的方式注入,覆盖 Spring 默认的方案):
- @Configuration
- public class ErrorMessageConfig {
-
- @Bean
- public DirectExchange errorMessageExchange() {
- return new DirectExchange("error.direct");
- }
-
- @Bean
- public Queue errorQueue() {
- return new Queue("error.queue", true);
- }
-
- @Bean
- public Binding errorBinding() {
- return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
- }
-
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
-
- }
b)定义消费者执行的消费任务
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg) {
- System.out.println("消费者接收到消息:" + msg);
- System.out.println("开始消费!");
- System.out.println(1/0);
- System.out.println("消费完成!");
- }
- }
c)启动消费者,如下:
d)查看失败队列中具体信息(异常栈信息和信息信息)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。