当前位置:   article > 正文

RocketMQ篇四(保证消息不丢失、保证消息顺序、快速处理消息积压、消息轨迹、消息位点)_mq 消息的有序性如何保证?如何保证 mq 消息不丢失?如何保证 mq 消息消费的幂等性?

mq 消息的有序性如何保证?如何保证 mq 消息不丢失?如何保证 mq 消息消费的幂等性?

1. RocketMQ如何保证消息不丢失

1.1 Producer端

1.1.1 同步发送、重试3次、集群部署

  • 采用send()同步发消息,如果能正常收到 MQ Broker 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,这个阶段是不会出现消息丢失的。
  • 发送失败后可以重试,设置重试次数。默认3次。
  • 集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。

1.1.2 生产者使用事务消息机制保证消息零丢失

RocketMQ的事务消息机制就是为了保证零丢失来设计的,并且经过阿里的验证,肯定是非常靠谱的。
Half消息保证消息不丢失
1、为什么要发送个half消息?有什么用?
这个half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备。

2.half消息如果写入失败了怎么办?
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。

3.订单系统写数据库失败了怎么办?
如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。

4.half消息写入成功后RocketMQ挂了怎么办?
我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。

5.下单成功后如何优雅的等待支付成功?
可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。

可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。

6、事务消息机制的作用
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。目前来看,也是业内最好的降级方案。

1.2 Broker同步刷盘+Dledger主从架构保证MQ自身不会丢消息

  • 同步刷盘:修改刷盘策略 flushDiskType为同步刷盘,默认情况下是异步刷盘。
  • 集群部署,主从模式,高可用。

Dledger的文件同步:
Dledger文件同步
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过Raft协议两阶段提交的方式保证文件在主从之间成功同步。

  • 简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
  • Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。

任何一台Broker突然宕机,RocketMQ服务仍然能继续使用,因为采用的是Broker主从架构以及多副本策略。

Master Broker挂了,会怎么样?

  • RocketMQ 4.5版本之前,用Slave Broker同步数据 ,尽量保证不丢失,但是一旦Master故障了,Slave是没有办法自动切换成Master的,需要手动做些运维操作,把Slave Broker修改配置,重启机器给调整为Master Broker,这会导致中间一段时间不可用。
  • RocketMQ 4.5之后支持的Dledger机制,基于Raft协议实现的一个机制,可以让一个Master Broker对应多个Slave Broker,一旦Master 宕机了,在多个Slave中通过Dledger技术将一个Slave选为新的Master对外提供服务。在生产环境中可以使用Dledger实现自动故障切换,只要10秒或者几十秒的时间就可以完成。

如果Slave Broker挂了,会怎么样?

  • 如果Slave Broker挂掉了,会导致所有读写压力都集中到Master Broker上。因为消息写入全部是发送到Master Broker的,获取消息也可以是Master获取。所以Slave Broker挂了影响不大。

1.3 Consumer消费者端消费确认

RocketMQ支持消费者发送消息消费确认。当消费者成功处理一条消息后,可以发送确认消息给Broker,告知Broker消息已经被成功消费。只有在接收到消费确认后,Broker才会将消息从队列中删除,确保消息不会重复消费。

1.4 RocketMQ特有的问题:NameServer挂了如何保证消息不丢失?

NameServer在RocketMQ中,是扮演的一个路由中心的角色,提供到Broker的路由功能。但是其实路由中心这样的功能,在所有的MQ中都是需要的。Kafka是用ZooKeeper和一个作为Controller的Broker一起来提供路由服务,整个功能是相当复杂纠结的。而RabbitMQ是由每一个Broker来提供路由服务。而只有RocketMQ把这个路由中心单独抽取了出来,并独立部署。

NameServer集群中任意多的节点挂掉,都不会影响他提供的路由功能。那如果集群中所有NameServer节点都挂了呢?

有很多人就会认为在生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段时间。其实这个问题大家可以做一下实验,当NameServer全部挂了后,生产者和消费者是立即就无法工作的。至于为什么,可以回顾一下我们之前的源码课程去源码中找找答案。

那再回到我们的消息不丢失的问题,在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。

1.5 RocketMQ消息零丢失方案总结

完整分析过后,整个RocketMQ消息零丢失的方案其实挺简单

  • 生产者使用事务消息机制/同步发送。
  • Broker配置同步刷盘+Dledger主从架构
  • 消费者不要使用异步消费。
  • 整个MQ挂了之后准备降级方案

那这套方案是不是就很完美呢?其实很明显,这整套的消息零丢失方案,在各个环节都大量的降低了系统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远远大于部分消息丢失的代价。所以,我们在设计RocketMQ使用方案时,要根据实际的业务情况来考虑。例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,那使用异步消费的机制带来
的性能提升也是非常显著的。

总之,这套消息零丢失方案的总结是为了在设计RocketMQ使用方案时的一个很好的参考。

2. 使用RocketMQ如何保证消息顺序

MQ的顺序问题分为全局有序和局部有序。

  • 全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
  • 局部有序:只保证一部分关键消息的消费顺序。

首先 我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?其实在大部分的MQ业务场景,我们只需要能够保证局部有序就可以了。例如我们用QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以了。至于全局消息的顺序,并不会太关心。而通常意义下,全局有序都可以压缩成局部有序的问题。例如以前我们常用的聊天室,就是个典型的需要保证消息全局有序的场景。但是这种场景,通常可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道,这样就可以用QQ那种保证一个聊天
窗口消息有序的方式来保证整个系统的全局消息有序。

然后 落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的。

对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里

另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。而这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了。

3. RocketMQ如何快速处理积压消息

背景:在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库的问题。等好不容易把数据库恢复过来了,这时基于这个数据库服务的消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。

在Web控制台的主题页面,可以通过 Consumer管理 按钮实时看到消息的积压情况。
RocketMQ消息积压
另外,也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况。还有RocketMQ也会在他的 ${storePathRootDir}/config 目录下落地一系列的json文件,也可以用来跟踪消息积压情况。

治理措施如下:

  • 生产端检测消息积压,不再继续发送/将消息发送到其他队列,过一定的时间再继续发送。
  • 增强消费能力,增加消费者实例个数,注意Queue个数,实例个数要与Queue数量相同,如果实例个数 > Queue实例,多出的消费者分不到Queue。如果实例个数 < Queue实例,那么每个消费实例的承载的流量是不同的。
  • 可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。

在官网中,还分析了一个特殊的情况。就是如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。

4. RocketMQ的消息轨迹

RocketMQ默认提供了消息轨迹的功能,其中提供了MQ消息轨迹数据的关键属性:

Producer端Consumer端Broker端
发送实例消息消费实例消息消息的Topic
发送消息时间投递时间、投递轮次消息存储位置
消息是否发送成功消息是否消费成功消息的Key值
发送耗时消费耗时消息的Tag值

打开消息轨迹功能,需要在broker.conf中打开一个关键配置,这个配置的默认值是false。也就是说默认是关闭的。

traceTopicEnable=true
  • 1

默认情况下,消息轨迹数据是存于一个系统级别的Topic ,RMQ_SYS_TRACE_TOPIC。这个Topic在Broker节点启动时,会自动创建出来。
RocketMQ消息轨迹
另外,也支持客户端自定义轨迹数据存储的Topic。在客户端的两个核心对象 DefaultMQProducer和DefaultMQPushConsumer,他们的构造函数中,都有两个可选的参数来打开消息轨迹存储。

  • enableMsgTrace:是否打开消息轨迹。默认是false。
  • customizedTraceTopic:配置将消息轨迹数据存储到用户指定的Topic 。

5. 消息位点

RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

同时RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

5.1 重置消费位点

适用场景

  • 初始消费位点不符合需求:因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息。
  • 消费堆积快速清理:当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力。
  • 业务回溯,纠正处理:由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/696540
推荐阅读
相关标签
  

闽ICP备14008679号