赞
踩
文章已收录Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary
分析RabbitMQ消息丢失的情况,不妨先看看一条消息从生产者发送到消费者消费的过程:
可以看出,一条消息整个过程要经历两次的网络传输:从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者。
在消费者未消费前存储在队列(Queue)中。
所以可以知道,有三个场景下是会发生消息丢失的:
针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。
RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,这样当消息发送到RabbitMQ服务器时,消息就会持久化。
首先看Exchange交换机的类图:
看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?
从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
接着是Queue队列,我们先看看Queue的构造器是怎么样的:
也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:
@Bean
public Queue fanoutExchangeQueueA() {
//只需要指定名称,默认是持久化的
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
}
这就完成了消息持久化的设置,接下来启动项目,发送几条消息,我们可以看到:
怎么证明是已经持久化了呢,实际上可以找到对应的文件:
找到对应磁盘中的目录:
消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失。
在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:
从上图中可以看到是通过两个回调函数**confirm()、returnedMessage()**进行通知。
一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()。第二步从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()。
代码怎么实现呢,请看演示:
首先在application.yml配置文件中加上如下配置:
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
# publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
有个小细节,publisher-returns和mandatory如果都设置的话,优先级是以mandatory优先。可以看源码:
接着我们需要定义回调方法:
@Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 监听消息是否到达Exchange * * @param correlationData 包含消息的唯一标识的对象 * @param ack true 标识 ack,false 标识 nack * @param cause nack 投递失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投递成功~消息Id:{}", correlationData.getId()); } else { logger.error(
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。