赞
踩
目录
4.1.1首先我们来测试一下acknowledge-mode: none不做处理的场景:
4.1.2 测试acknowledge-mode: auto自动处理情况下
如下图是消息从生产者到消费者的关系图。通过图片我们可以分析,消息从生产者,MQ,消费者这三个环节任一个都有可能丢失。那么下面我们就从这三点进行分析。
生产者发送消息时连接MQ失败
生产发生消息到达MQ后未找到Exchange
生产者发送消息到达MQ的 Exchange 后,未找到合适的 Queue
消息到达MQ后,处理消息的进程发生异常
消息到达MQ,保存到队列后,尚未消费就突然宕机
消息接收后尚未处理突然宕机
消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
确保生产者一定把消息发送到MQ
确保MQ不会将消息弄丢
确保消费者一定要处理消息
生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。当 RabbitTemplate与MQ连接超时后,多次重试。
我们可以在生产者对应的yml配置中配置:
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重试机制
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval *multiplier
- max-attempts: 3 # 最大重试次数
我这边故意把URL地址写错:
- spring:
- rabbitmq:
- host: 601.204.203.40
我们可以发现总共重试了3次。如图所示:
但是SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。如果对于业务性能有要求,建议禁用重试机制。
其实一般我们生产者与MQ网络连接比较稳定,所以基本上不用考虑第一种场景。但是还有一些到达MQ之后可能会丢失的场景,比如:
生产者发送的消息到达MQ没有找到Exchange
生产者发送的消息到达MQ找到Exchange,但是没有找到Queue
MQ内部处理消息进程异常
基于上面几种情况,RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。具体主要有以下几种情况:
当消息发送到MQ上,但是路由失败了,会返回会通过Publisher Return返回返回信息。同时会返回ack确认信息,表示投递成功。
当非持久化消息发送到MQ上,并且入队成功,会返回ACK确认信息,表示投递成功。
当持久化消息发送到MQ上,入队成功并且持久化到磁盘,会返回ACK确认信息,表示投递成功。
其它情况都会返回NACK,告知投递失败
其中 ack 和 nack 属于Publisher Confirm机制, ack 是投递成功; nack 是投递失败。而 return 则属于Publisher Return机制。默认情况,这两种都是关闭的,需要通过配置开启。
我们在生产者对应yml配置中加入
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
- publisher-returns: true # 开启publisher return机制
其中publisher-confirm-type一共有三种模式:
none :关闭confirm机制
simple :同步阻塞等待MQ的回执
correlated :MQ异步回调返回回执
一般我们都是开启correlated模式。
每个 RabbitTemplate 只能配置一个 ReturnCallback,我们可以定义一个配置类统一配置。下面我们在生产者中定义配置类ReturnsCallbackConfig:
- package com.chenwen.producer.config;
-
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @AllArgsConstructor
- @Configuration
- public class ReturnsCallbackConfig {
- private final RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("触发return callback,");
- log.debug("交换机exchange: {}", returned.getExchange());
- log.debug("路由键routingKey: {}", returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- }
- });
- }
- }
因为每个消息处理逻辑不同,所以我们需要每个消息单独定义ConfirmCallback。其实简单来说,就是是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数CorrelationData。
CorrelationData中包含两个核心的东西:
id :消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
SettableListenableFuture :回执结果的Future对象
将来MQ的回执就会通过这个 Future 来返回,我们可以提前给 CorrelationData 中的 Future 添加回调函数来处理消息回执:
下面我们定义一个测试生产者ConfirmCallback方法:
- @Test
- void testProducerConfirmCallback() throws InterruptedException {
- // 创建CorrelationData
- CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("消息回调失败", ex);
- }
-
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- log.info("收到confirm callback回执");
- if (result.isAck()) {
- log.info("消息发送成功,收到ack");
- } else {
- // 消息发送失败
- log.error("消息发送失败,收到nack, 原因:{}", result.getReason());
- }
- }
- });
- rabbitTemplate.convertAndSend("test.queue", "chenwen", "hello", cd);
- }
-
- rabbitTemplate.convertAndSend("test.direct", "chenwen1", "hello", cd);
目前存在交换机test.direct,并且正确的路由键是chenwen。首先我这边故意将路由键我写错成chenwen1。执行测试方法,通过控制台日志可以看到,路由失败后,会通过Publisher Return返回异常信息,并且会返回ACK。
我们修改成功正确的路由键chenwen,执行测试方法,可以看不会返回Publisher Return信息,只返回了ACK。
注意:
开启生产者确认模式比较消耗MQ性能,一般不建议开启。我们分析一下几种场景:
路由失败:这个其实是人为因素。由于我们编程错误导致的。
交换机名称错误:同样是编程错误导致
MQ内部故障:这种需要处理,但概率往往较低。所以一般只有对消息可靠性极高的场景才需要开启,这种的我们只需要开启Publisher Confirm模式通过处理nack就可以。
MQ的可靠性,其实就是当消息到达MQ,还没有被消费者消费,MQ就由于某些情况出现重启,导致的消息丢失。主要是这几个方面:
交换机Exchange持久化
队列Queue持久化
消息本身的持久化
下面我们就以控制台展示的为例子:
Durability 就是表示设置交换机持久化的参数, Durable 就是持久化模式, Transient 就是临时模式。
同样队列持久化可以在控制台Queues那边设置,根据Durability设置,Durable 就是持久化模式, Transient 就是临时模式。
根据Delivery mode参数设置成2,就是持久化。
注意:如果开启消息持久化,并且也开启了生产者确认模式。需要等消息持久化到磁盘,才会发送ACK回执,来保证消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
在默认的情况下,生产者发送消息是存放在内存中,以提高收发消息的效率。但是由于某些情况会导致消息堆积。比如:
消费者消费者宕机或出现网络故障
生产者生产消息过快,超过了消费者处理消息的能力
消费者处理业务发生了堵塞
一旦消息堆积,会导致占用的内存越来越大,直到触发内存预警。此时的RabbitMQ会将内存上的消息持久化到磁盘中。这个行为成为PageOut 。 PageOut会耗费一段时间,并且会阻塞队列进程。所以此时RabbitMQ不会再处理新的消息,生产者的所有的请求都会被阻塞。
RabbitMQ为了解决这个问题,从3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特性主要有如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
当RabbitMQ向消费者投递消息的时候,可能由于某些因素会导致消息丢失,比如:
消息的投递过程中出现网络故障
消费者接受到消息后突然宕机
消息者已经接受到消息了,但是由于消费者处理报错导致异常
...............
一旦发生上面几种情况,都会导致消息丢失。那我RabbitMQ肯定需要知道消费者处理消息的状态,如果失败了,可以再次进行投递。下面我们就来学习一下消费者如何进行消息确认机制的。
消费者处理消息之后,应该向RabbitMQ发送一个回执。告知RabbitMQ自己的消息处理状态。主要有三个:
ack:处理消息成功,RabbitMQ从队列中把消息删除
nack:处理消息失败,RabbitMQ需要重新投递消息
reject:消息处理失败并拒绝该消息,并且RabbitMQ会从队列中删除该消息
一般我们可以使用try catch 成功即返回ack,失败返回nack。但是SpringAMQP帮我们实现了,我们只需要通过配置对应acknowledge-mode参数即可实现。主要有三种模式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除,这种不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP切面对我们方法进行环绕增强。正常执行返回ack,失败则根据异常返回nack或者reject
如果是业务异常,会自动返回 nack;
如果是消息处理或校验异常,自动返回 reject ;
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: none # 不做处理
我们先向队列test.queue里面发送一条消息,可以看到test.queue队列现在有一条消息。
消费者这边去监听这个消息,对应代码如下:
- @Slf4j
- @Component
- public class ConsumeMqListener {
- @RabbitListener(queues = "test.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- log.info("spring 消费者接收到消息:【" + msg + "】");
- if (true) {
- throw new MessageConversionException("测试没有开启消费者确认");
- }
- log.info("消息处理完成");
- }
- }
dubug断点,还未抛出异常之前,此时我们先去刷下一下UI控制台页面,可以看到test.queue的一条消息已经不存在了。
现在我们在异常点打上断点,然后看看UI后台消息的状态,我们看到现在消息状态变成了Unacked。
断点执行完之后,我们看到此时UI页面的消息数量为0。说明抛出MessageConversionException异常,是将消息直接reject的。
我们将消费者内部逻辑抛出RuntimeException异常,在抛出异常之前打上断点,然后观察看UI页面的消息状态也是为Unacked状态。
异常抛出之后我们去看UI页面队列中消息的状态又回到了Ready状态了,这样就可以确保消费者业务异常之后,消息还能够再次投递
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。为了解决这个问题,Spring又提供了消费者重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
在消费者的application.yml配置下面参数:
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000ms # 初识的失败等待时长为1秒
- multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
可以看到:
消费者在消息失败之后消息没有重新回到队列,而是在本地重试了三次
在本地重试三次以后,抛出了 AmqpRejectAndDontRequeueException 异常。我们查看UI页面看到消息被删除了,说明返回的消息回执是reject。
我们可以看到,当失败重试3次,消息会被从队列中删除。这样对于一些要求消息可靠性比较高的情况下,肯定是不符合的。因此Spring有提供了失败处理的策略。这个策略是由 MessageRecovery 接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer:重试次数耗尽之后,返回reject。直接将消息丢弃,这个是默认模式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
最好的策略是RepublishMessageRecoverer,重试次数耗尽之后,将消息投递到指定的交换机中,后续由人工来处理。下面我们就来演示一下这个场景,我们在消费者服务这边新增配置类ErrorConfiguration,声明交换机和队列,并绑定:
- package com.chenwen.consumer.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Slf4j
- @Configuration
- @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
- public class ErrorConfiguration {
-
-
- @Bean
- public DirectExchange errorExchange(){
- return new DirectExchange("error.direct");
- }
-
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue");
- }
-
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
- return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
- log.debug("加载RepublishMessageRecoverer");
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
可以看到,当重试3次数耗尽之后,会将MQ信息放在error.queue队列中,此时error.queue队列多了一条数据,后续我们人为去处理,或者单独使用一个监听去处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。