当前位置:   article > 正文

详细讲解!RabbitMQ如何防止数据丢失,看这篇就够了!_correlationdata

correlationdata

思维导图

在这里插入图片描述

文章已收录Github精选,欢迎Starhttps://github.com/yehongzhi/learningSummary

一、分析数据丢失的原因

分析RabbitMQ消息丢失的情况,不妨先看看一条消息从生产者发送到消费者消费的过程:

可以看出,一条消息整个过程要经历两次的网络传输:从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者

在消费者未消费前存储在队列(Queue)中

所以可以知道,有三个场景下是会发生消息丢失的:

  • 存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。
  • 生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。
  • 消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。

针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。

二、消息持久化

RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,这样当消息发送到RabbitMQ服务器时,消息就会持久化。

首先看Exchange交换机的类图:

看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?

从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:

	@Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
   
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

接着是Queue队列,我们先看看Queue的构造器是怎么样的:

也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:

	@Bean
    public Queue fanoutExchangeQueueA() {
   
    	//只需要指定名称,默认是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这就完成了消息持久化的设置,接下来启动项目,发送几条消息,我们可以看到:


怎么证明是已经持久化了呢,实际上可以找到对应的文件:
在这里插入图片描述
找到对应磁盘中的目录:

消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失

三、消息确认机制

3.1 confirm机制

在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:
在这里插入图片描述
从上图中可以看到是通过两个回调函数**confirm()、returnedMessage()**进行通知。

一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()。第二步从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()

代码怎么实现呢,请看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
#    publisher-returns: true
    template:
      mandatory: true
# publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
# publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

有个小细节,publisher-returns和mandatory如果都设置的话,优先级是以mandatory优先。可以看源码:
在这里插入图片描述
接着我们需要定义回调方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
   
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 监听消息是否到达Exchange
     *
     * @param correlationData 包含消息的唯一标识的对象
     * @param ack             true 标识 ack,false 标识 nack
     * @param cause           nack 投递失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   
        if (ack) {
   
            logger.info("消息投递成功~消息Id:{}", correlationData.getId());
        } else {
   
            logger.error(
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/597442
推荐阅读
相关标签
  

闽ICP备14008679号