当前位置:   article > 正文

RabbitMQ如何避免丢失消息

RabbitMQ如何避免丢失消息

消息丢失

消息从生产到消费,要经历三个阶段,分别是生产、队列转发与消费,每个环节都可能丢失消息。
在这里插入图片描述
以下以RabbitMQ为例,来说明各个阶段会产生的问题以及解决方式。在说明之前,先回顾一下RabbitMQ的一个基本架构图
在这里插入图片描述

1. 生产者生产消息到RabbitMQ Server 消息丢失场景

1. 网络问题

外界环境问题导致:发生网络丢包、网络故障等造成RabbitMQ Server端收不到消息,因为生产环境的网络是很复杂的,网络抖动,丢包现象很常见,下面会讲到针对这个问题是如何解决的。

2. 代码层面,配置层面,考虑不全导致消息丢失

一般情况下,生产者使用Confirm模式投递消息,如果方案不够严谨,比如RabbitMQ Server 接收消息失败后会发送nack消息通知生产者,生产者监听消息失败或者没做任何事情,消息存在丢失风险;
生产者发送消息到exchange后,发送的路由和queue没有绑定,消息会存在丢失情况,下面会讲到具体的例子,保证意外情况的发生,即使发生,也在可控范围内。

解决方案:开启confirm模式

首先生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一deliveryTag和multiple参数)。

其实Confirm模式有三种方式实现:
串行confirm模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传。

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
    if (channel.waitForConfirms()) {
        System.out.println("发送成功");
    } else {
        //发送失败这里可进行消息重新投递的逻辑
        System.out.println("发送失败");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
问题:一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量confirm性能应该是不升反降的。

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}
if (channel.waitForConfirms()) {
    System.out.println("发送成功");
} else {
    System.out.println("发送失败");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。 我们分别来看看这三种confirm模式。

    public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
        List<Object> list = new ArrayList<>();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(DeviceConstant.COMMAND, DELETE);
        jsonObject.put(DeviceConstant.BODY, list );
        String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
        String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
        //rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
        try {
            Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
            channel.confirmSelect();
            channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
            channel.addConfirmListener(new ConfirmListener() {
                //消息失败处理
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
                    try {
                        Thread.sleep(3000l);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    //重发
                    channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
                }
                //消息成功处理
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
                }
            });
        } catch (Exception e) {
            log.error("sendQueue-ack-发送消息失败:{}", ExceptionUtils.getStackTrace(e));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

2. 队列本身可能丢失消息

1. 消息未完全持久化,当机器重启后,消息会全部丢失,甚至Queue也不见了

仅仅持久化了Message,而Exchange,Queue没有持久化,这个持久化是无效的。

解决方案:

交换机持久化:在声明交换器时将 durable 设为 true。
// 参数1:交换机的名字
// 参数2:交换机的类型,topic/direct/fanout/headers
// 参数3:是否持久化
channel.exchangeDeclare(exchangeName,exchangeType,true);
  • 1
  • 2
  • 3
  • 4
队列持久化:在声明队列的时候把 durable 参数设置为true。

在这里插入图片描述

消息持久化:

想让消息实现持久化需要在消息生产者推送消息的方法中修改参数,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
在这里插入图片描述

2. 单节点模式问题,节点挂了,消息只存在当前节点。硬盘坏了,那消息真的就无法恢复了

如果做了消息持久化方案,消息会持久化硬盘,机器重启后消息不会丢失;但是还有一个极端情况,这台服务器磁盘突然坏了(公司遇到过磁盘问题还是很多的),消息持久化不了,非高可用状态,这个模式生产环境慎重考虑。

3. 默认的集群模式,消息只会存在与当前节点中,并不会同步到其他节点,其他节点也仅只会同步该节点的队列结构

在这里插入图片描述
上图中的三个节点组成了一个RabbitMQ的集群。其中exchange是交换器,它的元数据信息(交换器名称、交换器属性、绑定键等)在所有节点上都是一致的,而队列中的实际消息数据则只会存在于所创建的那个节点上,其它节点只知道这个队列的元数据信息和一个指向拥有这个消息的队列的节点指针
RabbitMQ集群会同步四种类型的内部元数据:队列元数据(队列名和属性)、交换器元数据(交换器名和属性)、绑定键和虚拟机。在用户访问其中任何一个rabbitmq节点时查询到的queue、user、exchange和vhost等信息都是一致的。

那为什么普通集群只保持元数据同步,消息内容却没同步呢?这里涉及到存储空间和性能的问题,如果保持每个节点都有一份消息,那会导致每个节点的空间都非常大,消息的积压量会增加且无法通过扩容节点解决积压问题。另外如果要使每个节点存储一份消息,对于持久化的消息而言,内存和磁盘同步复制机制会导致性能受到很大影响。

工作原理

在这里插入图片描述
上图中的三个节点,其中节点1是数据节点(即实际存储消息内容的节点)。
如果客户端(生产者或消费者)与节点1建立了连接,那么关于消息的收发就只在节点1上进行(可以理解为简单的单机模式);

如果客户端(消费者)是与节点2或者节点3建立的连接,此时由于数据在节点1上,那么节点2或节点3只会起到一个消息转发的作用,例如此客户端是消费者,那么消息将由节点2或节点3从节点1中拉取,再经自身节点路由给消费者端;
如果客户端(生产者),那么消息先发给节点2或3,再路由到节点1的队列中存储。

一个节点可以是磁盘节点和内存节点,磁盘节点将元数据存储在磁盘,内存节点将元数据存储在内存。
这里需要注意的是,内存节点只是将元数据(比如队列名和属性、交换器名和属性和虚拟机等)存储在内存,因此在对资源管理(创建和删除队列、交换器和虚拟机等)时的性能有所提升,但是对发布和订阅的消息速率并没有提升。
RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个即可正常操作。总之在无法得知它们如何使用才能保证最佳时建议最好都用磁盘节点。

总结:普通集群模式并不能保证服务的高可用,因为其它节点只复制了队列和交换器等元数据信息,并没有将真实的消息内容复制到自身节点。该部署模式只解决了单节点的压力问题,但是当数据节点宕机之后便无法提供服务了,消息的路由线路受到了阻隔,客户端则无法继续与服务交互。为了解决这个问题,就需要此消息数据也能被复制到集群的其它节点中,因此rabbitmq引入了镜像部署模式。

解决方案:镜像部署,消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。

在这里插入图片描述
Rabbitmq的镜像集群实际上是在普通集群的基础上增加了策略,它需要先按照普通集群的方式进行部署,部署完成之后再通过创建镜像队列的策略实现主备节点消息同步。也就是说,每个备用节点都有和主节点一样的队列,这个队列是由主节点通过创建镜像队列所产生的,且这些备用节点能及时的同步主节点中队列的入队消息。当消息设置了持久化时,每个节点都有属于自己的本地消息持久化存储机制。当消息入队和出队时,所有关于对主节点的操作都会同步给备用节点用来更新。此集群模式在主节点宕机之后备用节点所保留的消息与主节点完全一致,即可实现高可用。

工作原理

在这里插入图片描述
上图就是镜像集群模式的实现流程,其中有三个节点(主节点、备节点1、备节点2)和三个镜像队列queue(其中备节点上的queue是由主节点镜像生成的)。要注意的是,这里的主节点和备节点是针对某个队列而言的,并不能认为一个节点作为了所有队列的主节点,因为在整个镜像集群模式下,会存在多个节点和多个队列,这时候任何一个节点都能作为某一个队列的镜像主节点,其它节点则成了镜像备节点(例如:有A、B、C三个节点和Q1、Q2、Q3三个队列,如果A作为Q1的镜像主节点,那么B和C就作为了Q1的镜像备节点,在此基础上,如果B作为了Q2的镜像主节点,那么A和C就是Q2的镜像备节点)。

每一个队列都是由两部分组成的,一个是queue,用来接收消息和发布消息,另外还有一个BackingQueue,它是用来做本地消息持久化处理。客户端发送给主节点队列的消息和ack应答都将会同步到其它备节点上。

所有关于镜像主队列(mirror_queue_master)的操作,都会通过组播GM的方式同步到其它备用节点上,这里的GM负责消息的广播,mirror_queue_slave则负责回调处理(更新本次同步内容),因此当消息发送给备用节点时,则由mirror_queue_slave来做实际处理,将消息存储在queue中,如果是持久化消息则同时存储在BackingQueue中。master上的回调则由coordinator来处理(发布本次同步内容)。在主节点中,BackingQueue的存储则是由Queue进行调用。对于生产者而言,消息发送给queue之后,接着调用mirror_queue_master进行持久化处理,之后再通过GM广播发送本次同步消息给备用节点,备用节点通过回调mirror_queue_slave同步本次消息到queue和BackingQueue;对于消费者而言,从queue中获取消息之后,消息队列会等待消费者的ack应答,ack应答收到之后删除queue和BackingQueue中的该条消息,并将本次ack内容通过GM广播发送给备用节点同步本次操作。如果slave宕机了,那对于客户端的服务提供将不会有任何影响。如果master宕机了,则其它备用节点就提升为master继续服务消息不会丢失。那这其中多个备用节点是如何选择其中一个来作为master的呢?这里通过选取出“最年长的”节点作为master,因为这个备用节点相对于其它节点而言是同步时间最长、同步状态最好的一个节点,但如果存在没有任何一个slave与master完全同步的情况,那么master中未同步的消息将会丢失。

GM

GM模块实现的一种可靠的组播通讯协议,该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到。
它的实现大致为:将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到下一个节点。在master节点和slave节点上的这些gm形成一个group,group(gm_group)的信息会记录在mnesia中。不同的镜像队列形成不同的group。消息从master节点对应的gm发出后,顺着链表依次传送到所有的节点,由于所有节点组成一个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知道消息已经复制到所有的slave节点了。另外需要注意的是,每一个新节点的加入都会先清空这个节点原有数据,下图是新节点加入集群的一个简单模型:

消息的同步:
将新节点加入已存在的镜像队列,在默认情况下ha-sync-mode=manual,镜像队列中的消息不会主动同步到新节点,除非显式调用同步命令。当调用同步命令后,队列开始阻塞,无法对其进行操作,直到同步完毕。

总结

镜像集群模式通过从主节点拷贝消息的方式使所有节点都能保留一份数据,一旦主节点崩溃,备节点就能完成替换从而继续对外提供服务。这解决了节点宕机带来的困扰,提高了服务稳定性,但是它并不能实现负载均衡,因为每个操作都要在所有节点做一遍,这无疑降低了系统性能。再者当消息大量入队时,集群内部的网络带宽会因此时的同步通讯被大大消耗掉,因此对于可靠性要求高、性能要求不高且消息量并不多的场景比较适用。如果对高可用和负载均衡都有要求的场景则需要结合HAProxy(实现节点间负载均衡)和keepalived(实现HAproxy的主备模式)中间件搭配使用,下面我们将对这种场景的部署进行全流程概述。

3. 消费端可能丢失消息

消费端采用自动ack机制,还没有处理完毕,消费端宕机。

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会导致消息丢失。因为RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。

解决方案:改为手动ack,当消息正确处理完成后,再通知mq。消费端处理消息异常后,回传nack,这样mq会把这条消息投递到另外一个消费端上。

消息应答的方法
  1. Channel.basicAck(long deliveryTag, boolean multiple):用于肯定确认。RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

deliveryTag:该消息的index
multiple:是否批量.。true:将一次性ack所有小于deliveryTag的消息。

multiple参数解析
true 代表批量应答
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8
那么此时 5-8 的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
在这里插入图片描述

  1. Channel.void basicNack(long deliveryTag, boolean multiple, boolean requeue) :用于否定确认

deliveryTag:该消息的index。
multiple:是否批量。true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列。

  1. Channel.basicReject(long deliveryTag, boolean requeue):用于否定确认 (推荐使用)

deliveryTag:该消息的index。
requeue:被拒绝的是否重新入队列。

basicNack()和basicReject()的区别在于:basicNack()可以批量拒绝,basicReject()一次只能拒接一条消息。

demo
    @RabbitHandler
    @RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
    public void receiveQueueCommonLocal(Channel channel, Message message) {
        String messageBody = new String(message.getBody());
        //System.out.println("messageBody===>"+messageBody);
        try {
            //todo 业务逻辑
            /*手动确认成功
             * 参数:
             * deliveryTag:该消息的index
             * multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
             * **/
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
            try {
                //手动确认回滚 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

文章来源:https://blog.51cto.com/u_15840568/5784352
https://zhuanlan.zhihu.com/p/79545722
集群:https://blog.csdn.net/weixin_43498985/article/details/122185972
消费者ack:https://blog.csdn.net/m0_64337991/article/details/122755297
https://zhuanlan.zhihu.com/p/483289106?utm_id=0

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/597451
推荐阅读
相关标签
  

闽ICP备14008679号