赞
踩
对 RabbitMQ 不是很了解的同学,可以看一下我的另一篇博文:RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )
消息丢失的情况主要有以下三种:
那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性
由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: CaiXuKun
password: T1rhFXMGXIOYCoyi
connection-timeout: 1s # 连接超时时间
template:
retry:
enabled: true # 开启连接超时重试机制
initial-interval: 1000ms # 连接失败后的初始等待时间
multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况
sudo docker stop rabbitmq
启动测试类
@Test
void testSendMessageToQueue() {
String queueName = "simple.queue";
String msg = "Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);
}
可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了
注意事项:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
可以考虑使用异步线程来执行发送消息的代码
RabbitMQ 提供了 Publisher Confirm
和 Publisher Return
两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
publisher-confirm-type 有三种模式:
每个 RabbitTemplate 只能配置一个 ReturnCallback
在 publisher 模块新增一个名为 RabbitMQConfig
的配置类,并让该类实现 ApplicationContextAware
接口
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置回调 rabbitTemplate.setReturnsCallback((returnedMessage) -> { System.out.println("收到消息的return callback, " + "exchange = " + returnedMessage.getExchange() + ", " + "routingKey = " + returnedMessage.getRoutingKey() + ", " + "replyCode = " + returnedMessage.getReplyCode() + ", " + "replyText = " + returnedMessage.getReplyText() + ", " + "message = " + returnedMessage.getMessage()); }); } }
测试前先运行 RabbitMQ
sudo docker start rabbitmq
在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果
@Test void testConfirmCallback() throws InterruptedException { CorrelationData correlationData = new CorrelationData(); correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> { if (confirm.isAck()) { // 消息发送成功 System.out.println("消息发送成功,收到ack"); } else { // 消息发送失败 System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason()); } if (throwable != null) { // 消息回调失败 System.err.println("消息回调失败"); } }); rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData); // 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果 Thread.sleep(2000); }
发送成功后可以看到消息发送成功的回调信息
如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果
如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果
可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag
为 0
表示消息无法路由到队列)
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息
我们来测试一下消息丢失的情况,在 RabbitMQ 的控制台中向 simple.queue 队列发送一条信息,发送后重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
测试前,记得先把监听 simple.queue 队列的代码注释掉
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
}
第一步:先发送一条消息
第二步:查看消息的情况
第三步:重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
sudo docker restart rabbitmq
第四步:查看消息的情况(可以看到,RabbitMQ 重启后,消息丢失了)
RabbitMQ 实现数据持久化包括 3 个方面:
注意事项:
- 利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
- 在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)
我们来演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)
我们编写一个测试类,向 simple.queue 一次性发送一百万条消息
在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度
spring:
rabbitmq:
publisher-returns: false
publisher-confirm-type: none
先测试发送非持久化信息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}
测试结果
再测试发送持久化信息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}
从 RabbitMQ 的 3.6.0
版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:
开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执
在 RabbitMQ 的控制台可以看到 RabbitMQ 的版本
在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
x-queue-mode
在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
编程式创建
@Bean
public org.springframework.amqp.core.Queue lazeQueue() {
return QueueBuilder.durable("lazy.queue1")
.lazy()
.build();
}
注解式创建
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(
name = "lazy.queue2",
durable = "true",
arguments = @Argument(
name = "x-queue-mode",
value = "lazy"
)
))
public void listenLazeQueue(String message) {
System.out.println("消费者收到了 laze.queue2的消息: " + message);
}
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
开启消息确认机制,需要在 application.yml
文件中编写相关的配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none
先测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常
publisher 服务
@Test
void testSendMessageToQueue() {
String queueName = "simple.queue";
String msg = "Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);
}
consumer 服务
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
throw new RuntimeException("故意抛出异常");
}
不出意外,程序报错了
但在 RabbitMQ 的控制台可以看到,消息也丢失了
再测试处理模式为 none 的情况
可以看到,控制台一直在报错,报错之后一直在尝试重新发送消息
在 RabbitMQ 的控制台可以看到,simple.queue 一直在收发消息,速率达到了 97 次每秒(状态为 running ,消息的状态为 Unacked )
此时,我们手动关闭 consumer 服务,查看 RabbitMQ 的控制台,可以看到消息恢复到正常的状态了
再来测试异常类型为 MessageConversionException 的情况
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
throw new MessageConversionException("故意抛出异常");
}
在控制台可以看到,消息被拒绝了,而且消息也没有重新发送
查看 RabbitMQ 的控制台,可以发现消息已经从队列中移除了
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力
我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队
在 application.yml 配置文件中开启失败重试机制
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消息消费失败重试机制
initial-interval: 1000ms # 消息消费失败后的初始等待时间
multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false
我们将抛出的异常类型改回 RuntimeException
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了simple.queue的消息:【" + message + "】");
throw new RuntimeException("故意抛出异常");
}
在控制台可以看出,消息的重新发送次数已经耗尽了
查看 RabbitMQ 的控制台,发现消息也丢失了
正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢
开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:
RejectAndDontRequeueRecoverer
:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式ImmediateRequeueMessageRecoverer
:重试次数耗尽后,返回 nack,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机我们来演示一下使用 RepublishMessageRecoverer 类的情况
第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true") public class ErrorConfiguration { @Bean public DirectExchange errorExchange() { return new DirectExchange("error.direct", true, false); } @Bean public Queue errorQueue() { return new Queue("error.queue", true, false, false); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) { return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); } }
第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列
在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列
总结:消费者如何保证消息一定被消费?
- 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
- 开启消费者失败重试机制,并设置
MessageRecoverer
,多次重试失败后将消息投递到异常交换机,交由人工处理
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性
在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的
那么有什么方法能够确保业务的幂等性呢
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId
@Bean
public MessageConverter jacksonMessageConvertor() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息
但这种方式对业务有一定的侵入性
结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理
总结:如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步
- 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失
- 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常
如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,有没有什么兜底的解决方案?
我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
利用死信交换机的特点,可以实现发送延迟消息的功能
RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中
插件的下载地址:rabbitmq-delayed-message-exchange
下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目录
sudo docker inspect rabbitmq
然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下
一般与 docker 相关的目录只有 root 用户才有权限访问,所以我们需要先打开 docker 目录的部分权限(耗时可能较长)
sudo chmod +rx -R /var/lib/docker
接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data
将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data
目录
上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data
最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
看到以下信息,说明插件安装成功了
如果你遇到了以下错误,在执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange
指令前先执行以下指令
chmod 400 /var/lib/rabbitmq/.erlang.cookie
声明延迟交换机
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}
声明队列和延迟交换机,并将队列和延迟交换机绑定在一起
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),
key = "delay"
))
public void listenDelayQueue(String message) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}
编写测试方法,测试发送延迟消息
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000); // 毫秒
return message;
}
});
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}
发送延迟消息的本质是在消息头属性中添加 x-delay 属性
RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒
RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)
定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大
所以说,延迟消息适用于延迟时间较短的场景
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class MultipleDelayMessage<T> { private T data; private List<Long> delayMillis; public MultipleDelayMessage() { } public MultipleDelayMessage(T data, Long... delayMillis) { this.data = data; this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis)); } public MultipleDelayMessage(T data, List<Long> delayMillis) { this.data = data; this.delayMillis = delayMillis; } public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) { return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis))); } public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) { return new MultipleDelayMessage<>(data, delayMillis); } public boolean hasNextDelay() { return !delayMillis.isEmpty(); } public Long removeNextDelay() { return delayMillis.remove(0); } public T getData() { return data; } public void setData(T data) { this.data = data; } public List<Long> getDelayMillis() { return delayMillis; } public void setDelayMillis(List<Long> delayMillis) { this.delayMillis = delayMillis; } @Override public String toString() { return "MultipleDelayMessage{" + "data=" + data + ", delayMillis=" + delayMillis + '}'; } }
我们再定义一个发送延迟消息的消息处理器,供所有服务使用
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; public class DelayMessagePostProcessor implements MessagePostProcessor { private final Integer delay; public DelayMessagePostProcessor(Integer delay) { this.delay = delay; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delay); return message; } }
改造后的发送延迟消息的测试方法
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));
SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。