当前位置:   article > 正文

RocketMQ(六)高级特性及原理_生产者 发送多次同一条数据的原因

生产者 发送多次同一条数据的原因

一、消息幂等性问题

保证消息幂等性就是保证多次相同的请求时,消费者只执行这些相同请求中的一个。

例如之前的订单系统中成功支付完订单后对相应用户赠送一张优惠卷,而因为没有进行幂等性的保证,此时就会出现支付一个订单收到多张优惠卷的情况。

此时我们来分析各个阶段可能会导致这种情况的原因

1、生产者重复发送相同消息
在重试机制中,订单系统在支付订单成功后发送消息到队列中,然后此时可能因为网络问题,此时订单系统始终没收到发送到broker的响应,则订单进行重试机制再次发送一条消息到broker中,这样就导致broker中有两条相同的消息。

所以这个重试机制是会导致消息重复发送的。这是导致发送多张优惠券的原因之一。

2、消费者重复消费一条消息

我们现在来讲讲第二个原因:优惠卷系统重复消费一条消息(上面重复发送消息是因为发送多条相同消息导致消费者消费了多次相同的消息,这里是只有一条消息但是被消费了多次)

当你在消费一个消息的时候,此时本地事务已经处理完了,要向MQ返回一个CONSUME_SUCCESS状态,然后提交消费进度offset到broker中,而在你处理完本地事务后,要将offset提交给broker了,而在没提交之前该系统进行重启了,所以导致本地事务执行了但offset没提交,从而在系统重启后会给该消费者重新发送这个消息。则就导致了重复消费同一个消息的问题。

这个两个问题就是我们保证幂等性所要解决的。

二、幂等性保证方案

1、发送消息的时候需要保证幂等性?
1)、业务判断法:在发送消息前,先看看MQ是否已经有这个消息了

RocketMQ有提供相应的接口使用,但可能性能会比较差

2)、基于redis缓存的幂等性机制(状态判断法)

如果你成功发送一条消息到MQ就往redis中写一条数据,标记这个消息已经发送过了。然后重复调用时看看redis中是否有这个缓存。(但这种情况还是存在没法做到幂等的问题,例如当消息发送成功到MQ还没写入redis就宕机了,这样下次还会重复发)

(这里有个疑问,发送时出现重复发送不是因为没接收到MQ的响应而导致的重发?那么状态判断法不就行不通?)

其实上面两种对于生产者重复发送消息阶段的幂等性是不太合适的,真实生产中一般都不会在发消息时保证幂等性,而是在消费消息的时候去保证幂等性。即我们可以允许你发送多个相同的消息,但是我在消费端限制多个相同的消息我只消费一次。

所以第一个原因我们不需要解决,我们只需要来解决第二个原因,即消费者端的幂等性

2、消费消息时怎么保证幂等性?
1)、业务判断法:基于数据库的某字段唯一性保证幂等性

2)、状态判断法:基于redis,消费一条数据就记录缓存(保证记录时的操作原子性,即使用setnx分布式锁,或者redis+lua),每次消费就先去redis看看有没有记录。

三、重试队列和死信队列

1、如果优惠券系统的数据库宕机了怎么办?

此时无法对消息进行处理,我们都知道在本地事务执行完成功后,消费者会给brok发送CONSUME_SUCCESS,如果执行失败则会返回RECONSUME_SUCCESS然后让MQ进行重发。

broker收到RECONSUME_SUCCESS状态后,在MQ会有你这个消费者组对应的一个重试队列,如果你返回RECONSUME_SUCCESS,MQ就会将这些消息放到这个消费组的重试队列中去。(例如你的消费组叫VouConsumerGroup,那么就会有这个消费组对应的重试队列%RETRY%VouConsumerGroup)。

然后过段时间,重试队列就会再将消息发送给我们,如果我们再次返回RECONSUME_SUCCESS,就会再过段时间再发过来,每次重试的时间间隔不一样,可以自行进行配置,比如:

依次类推,最多可以连续重试16次。

如果16次都没处理成功,此时这些消息就会被存到对应的死信队列(%DLQ%VouConsumerGroup)中,此时在MQ的管理后台就可以看到了。

那么我们此时 如何处理这些死信队列?  这个时候就可以根据场景去自定义了,例如我可以开启后台线程,然后订阅这个“%DLQ%VouConsumerGroup”的死信队列,然后做一些相应的处理。

四、消息有序性

首先我们先来看看消息乱序的问题。

之前我们说过一个背景,就是大数据团队需要获取我们订单数据库的全部数据,然后存放到自己的大数据存储系统中。

我们之前说过可以基于cannl这种模拟从机,去监听主数据库的binlog日志,然后把这些binlog发送到MQ中去。

而在大数据团队从MQ中拿数据时,发现有一些数据指标是错误的,比如某个订单的字段A在订单数据库的值是100,结果在大数据存储中为0。

最后经过排除,发现是订单数据库的binlog消息乱序,我们都知道binlog消息都是传输增删改查信息的,而此时就可能因为乱序而导致数据错误。

例如限制有两条sql语句:
insert  into  order(id,sum)  values(xx,0)

update  order  set  sum= 100 where  id =xx

第一句是插入一条数据,开始他的值是0,第二句是更新这个值为100。

这两条sql分别对应两个消息,此时正常情况我们两条信息先消费insert的再消费update的。

而如果此时消息乱序了,先更新再插入,此时不就造成了之前出现的数据错误的情况。

我们之前说过一个topic可以知道多个MessageQueue,然后再写入消息的时候会将消息均匀的分发到各个MessageQueue中,例如我这个insert binlog写到MessageQueue01中,update binlog写到MessageQueue02中,接着会有一个ConsumerGroup去负责消费这些MessageQueue的消息,所以insert binlog和update binlog的获取是没有顺序的。

这样就会导致上面的错误

这个时候消息乱序就是我们需要解决的问题。


此时如何解决消息乱序的问题?
此时我们可以让同一个订单的binlog进入同一个messageQueue中,这样同一个订单的各种操作就不会分发到不同的messageQueue中从而导致乱序问题。

(就是将insert binlog和update binlog放到同一个messageQueue中,这样消费的时候就会先取insert,在取update。)

那么我们怎么让这两个消息放进同一个messageQueue?
我们可以根据订单id来判断这些binlog是属于同一个订单的,此时同一订单的binlog发送到同一个MessageQueue,比如MessageQueue有15个,此时用事务id进行取模例如得到是第5个MessageQueue

(注意绝对有序需要cannl推过来的binlog有序,mq消费的时候有序)

不过这就完了吗?

如果消息处理失败了,这些消息能走重试队列吗?答案是不可以的,因为重试相当于又会出现乱序的情况了,所以在处理失败后不能返回RECONSUME_LATER(返回这个会进行重试,进入重试队列),而是要返回SUSPEND_CURRENT_QUEUE_A_MOMENT这个状态,这时,不会将消息放到重试队列中,然后让消费者先阻塞等待下,然后再次对这个消息进行消费。


此时有一个问题:
发送端零丢失的方案好像也会影响到消息的顺序性,比如先insert 后update 但是insert的时候发送到mq失败了,update的成功了。这样好像就没有办法了。哪怕用mq事务特性也没法解决这种情况。

————————在canal消费binlog日志的时候有binlog对应的position,当处理binlog的时候如果前面语句发送到mq的时候失败话要进行重试和告警了,如果insert发送不了后续的binlog也就不能发送到mq中,binlog中insert和update的顺序性通过配置row模式物理日志来保证


总结消息有序性的保证:消息到broker前是有序正确的+同一个订单的消息保证有序的到同一个MessageQueue中+有序性的消息消费失败时为了防止进入重试队列此时返回SUSPEND_CURRENT_QUEUE_A_MOMENT状态(阻塞一会再次消费,这里会一直重试,不会进入死信队列,这个时候就可能会出现卡死的状态)。

五、消息有序性的伪代码实现

1、怎么让一个订单的binlog进入同一个MessageQueue?

这里根据订单id取余然后返回MessageQueue的坐标,这样就会进去某个MessageQueue。

2、怎么保证消费者顺序消费这个MessageQueue的消息?

注意:上面使用的时MessageListenerOrderly,也就是说消费者会对每一个ConsumerQueue

都使用一个线程来处理,例如订单号为1100的多个binlog,此时会交给一个线程进行顺序存储,这样可以避免多个线程处理带来的乱序问题。

(可进重试队列的,我们是使用ConsumeConsurrentlyContext.COMSUME_SUCCESS、ConsumeConsurrentlyContext.RECONSUME_SUCCESS来返回状态)

(有序性消费的状态用MessageListenerOrderly.SUCCESS、SUSPEND_CURRENT_QUEUE_A_MOMENT来返回状态)

六、RocketMQ的数据过滤机制

订单数据库同步的时候,会有一些混杂在一起的数据,此时需要对这些数据进行过滤。

我们利用cannl进行数据库binlog同步的时候,很可能把数据库的所有表的binlog都推送到MQ中去,这时我们可能只需要处理订单表A的binlog,可能我们一些其他表的binlog是不需要的。

如果过滤的工作在消费端进行判断是否是订单表的binlog,是再进行处理,这样的话是非常耗时性能低下的。


这个时候,我们就可以使用MQ支持的数据过滤机制(给消息设置tag和属性)

例如我们再发消息的时候:

这里给消息设置了tag、属性信息。

然后在消费数据的时候根据tag和属性进行过滤

根据tag进行过滤:

根据属性进行过滤:

RocketMQ还支持丰富的数据过滤语法:

七、延迟消息机制

这个机制我们配合订单退款扫描的场景:我们在购物时,提交订单后未支付时,此时订单的状态是待支付,此时有两种可能,一种是直接支付成功然后进行后面的发货流程,一种是用户没有进行支付或者忘了支付,不久之后就会堆积很多未支付的订单。

所以为了清除一些没用的订单,我们一般会有一个后台线程,不停的扫描数据库中所有未支付状态的订单,如果该订单超过30分钟还没支付,那么就把该订单状态改为“已关闭”。

但这里会有一个问题:我们要知道订单数量是巨大的,还要定时不断地去扫描,此时根本扫描不过来,即使你现在想要多个机器多进程去扫描这些订单,你也不知道哪些机器扫描哪些订单?怎么扫描?什么时候扫描?

此时MQ地一个延时消息就可以用到了,所谓地延时消息,就是我们订单系统创建一个订单后,就发送一条消息到MQ中去,我们指定这条消息为延迟消息,比如这个消息要在30分钟后才会被扫描订单的消费者扫描到。30分钟后扫描订单的消费者就会接收到这条信息,然后再去数据库查询该订单是否已经支付,这样每个订单只会被扫描服务扫描一次,而不用我们定时多次的去扫描一个订单多次。

而且这个扫描消费者完全可以部署多机器多消费者从而提高效率。


下面我们基于订单系统,说明下伪代码实现延迟消息:
订单消息的生产者:

这里发送延迟消息的核心就是设置消息的delayTimeLevel,也就是延迟级别,RocketMQ的默认延迟级别是:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。而上面的代码中设置延迟级别为3,即延迟10s的意思。

接下来我们看下消费者的伪代码:

消费者只需要监听这个Topic即可,然后进行扫描处理工作。

八、RocketMq的一些经验

1、灵活的运用tags来过滤数据

之前我们就讲过,可以用tags来区分是否是要进行处理的消息,就可以不用自己手动去区别这些消息,这样可以提高性能。

2、基于消息key来定位消息是否丢失

我们在发送消息的时候。可以给消息设置一个key,例如以订单号为key,message.setKeys(orderId),这样这个消息就具备一个key了,接着这个消息到broker上后,会基于key构建hash索引,这个hash索引会存放在indexFile索引文件中,然后我们需要知道MQ中是否有某个消息,就可以通过命令+key值去MQ中查询。MQ会提供类似的命令:mqadmin queryMsgByKey -n 127.0.0.1:9876 -t  SCANRECORD -k orderId(具体的命令可以去官网查)

3、消息零丢失的补充

当你MQ集群都宕机时,可以将之后的消息写入磁盘或者数据库中暂存,等MQ恢复后再将之前持久化的消息投递到MQ中去.

4、提供消费者的吞吐量

(注意一些场景下是不能进行并发消费的,例如需要保证消息消费的有序性)

5、要不要消费历史消息

消费者端是可以设置从哪里(哪个offset)开始消费的,常见的有两种:一个是从Topic的第一条数据开始消费,一个是从最后一次消费的offset位置开始消费。设置的参数分别对应:CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_LAST_OFFSET。一开始我们选择CONSUME_FROM_FIRST_OFFSET,从第一条开始消费,之后重启就可以使用CONSUME_FROM_LAST_OFFSET模式进行消费。

九、企业中MQ集群式如何进行权限机制的控制的?

在公司中一般会有很多的技术团队,每个技术团队会使用MQ集群中的部分Topic,那么此时就会出现一个问题,如果订单团队使用的Topic不小心被商品团队的写入错误的脏数据,这个时候可能就会导致订单团队的Topic里的数据出错。

此时就需要引入RocketMQ的权限功能,也就是规定好订单团队的用户只能只要订单相关的Topic,商品团队的用自己的Topic,大家之间互不干扰,不能混着用。

在RocketMQ中,实现权限控制并不难,首先我们需要在broker放一个额外的ACL权限控制配置文件,然后在该文件中配置好权限(例如什么用户对哪些Topic有什么操作权限)。

具体操作:
首先在每个Broker的配置文件中设置aclEnable=true这个配置,开启权限控制

其次,在每个Broker部署机器的${ROCKETMQ_HOME}/store/config目录下,放一个plain.acl.yml的配置文件,然后就可以在这个文件里进行权限配置,类似下面这样:

如果你的用户在上面的配置文件中没有说明,那么你的用户就会使用默认的Topic权限。

接着我们在创建生产者和消费者时,就可以指定你团队分配到的RocketMQ账户,这样你就只能访问到你有权限的Topic。

对于消费者也是同理的。

十、如何对线上生产环境的RocketMQ集群进行消息轨迹的追踪?

消息轨迹即查询一条消息的轨迹,即我们可以得到一个消息从哪个生产者来的,他存在哪个Broker的哪个Topic里的,他是被哪个消费者消费的。

查询一个消息的消息轨迹,可能对于我们分析消息的丢失等方面有很大的用处。那么我们怎么配置使得MQ支持消息轨迹功能?
首先我们需要在Broker的配置文件中开启tranceTopicEnable=true这个选项,即开启消息轨迹追踪的功能。

当我们开启了这个开关后,我们启动这个Broker时就会自动创建一个内部Topic处理即RMQ_SYS_TRACE_TOPIC,这个Topic就是用来存储所有的消息轨迹追踪的数据的。

然后我们还需要在发送消息的时候开启消息轨迹,例如创建一个Producer的时候,构造函数的第二个参数enableMsgTrace参数,设置为true就开启了对消息的轨迹追踪

消费者的开启也是同理的,在第二个参数设置为true即可开启。

这样我们在Producer、Broker、Consumer中都配置好了轨迹追踪之后。

在Producer发送消息的时候,就会上报这个消息的一些数据到存储消息轨迹数据的Topic中,其中的上报数据包括:Producer的信息、发送信息的时间、消息是否发送成功、发送消息的耗时。

消息到broker后,会上报的数据:消息存储的Topic、消息存储的位置、消息的key、消息的tags。

消息被消费者消费后,会上报的数据:Consumer的信息、投递消息的时间、这是第几轮投递消息、消息消费是否成功、消费这条消息的耗时。

接着我们要去查询某条消息的消息轨迹时,就可以在MQ的控制台里的消息轨迹中就可以创建查询任务,此时你可以根据messageId(消息id)、message key(消息的key)、topic等来进行查询,然后执行查询任务即可看到相应的消息轨迹了。

十一、消息堆积问题应该如何处理?

即大量的消息积压在MQ中,消费者消费慢,生产者生产快。

此时可以临时增加消费者系统的实例,加快消费者消费信息。那要是现在你的Topic里只要有限个MessageQueue,例如就只有四个,此时我们加再多的消费者也是没用的,因为此时只能有四个消费者系统去对应消费这四个MessageQueue,此时我们就可以去修改旧的消费这系统的代码,将接收的消息不先进行消费,而是转发到一个新的topic中,然后我们在这个topic中设置足够的MessageQueue,然后增加足够的消费者系统对这个新的topic进行消费,这样就可以提高消费消息的能力。

总结: 处理线上消费堆积的问题

1.先定位到消费者的问题,快速恢复

2.怎么去解决堆积消费问题

2.1:消息可丢失直接修改消费者系统代码进行丢弃消息

2.2:消息不能丢失,消息topic对应的messagequeue较多消费者consumer较少每个消费者消费多个messagequeue,此时可以consumer加机器来加快消费

2.3:消息不能丢失,topic指定的messagequeue过小,此时可新建一个topic调大messagequeue的数量,临时部署服务来消费堆积的topic(也可以让他重试16次失败后进入死信队列(加大死信队列的MessageQueue数量),然后我们再去死信队列中消费)

十一、设计高可用方案

1、金融系统怎么针对RocketMQ集群崩溃设计高可用方案?
一般在金融系统中,为了保证和钱相关的数据消息因为MQ系统崩溃出问题,一般需要设计一个高可用的降级方案,一般我们可以对发送消息到MQ的代码进行try catch,如果发送异常进行重试。如果连续发送多次还失败,此时就可能说明你的MQ系统已经崩溃了,此时就把这条消息写入本地的存储中,之后再不停的发送消息到MQ,一旦发现MQ集群恢复了,此时就需要建立一个后台线程去把之前持久化的消息查询出来,然后依次有序的发送到MQ集群中去。

这里有一个关键,就是将消息存储到本地时,需要保证他的存储有序性,然后在MQ恢复时,从磁盘拿消息到发送消息也要保证起有序性。(一些金融系统的消息有序性是比较严格的)

2、给MQ增加消息限流功能保证起高可用性。

本质上,限流功能是对系统的一个保护功能。对于你的MQ来言,可以改造下开源MQ的源码,然后在broker接收消息这一块引入一个限流机制,一般来说限流算法可以采用令牌算法。(当然这个要对RocketMQ的源码比较了解)

3、MQ技术改变时怎么进行迁移?
例如你公司原本用kafka,现在想改为使用RocketMQ了,那么这个时候怎么进行迁移?
此时MQ的激情群迁移过程可以使用双写+双读的技术方案。

即不能之间将prouducer和consumer的代码和MQ进行粗暴的迁移,而是让kafka和RocketMQ都接收同样的消息一段时间(同时发送到kafka和RocketMQ,Consumer也同时从两种MQ中消费消息,都用一样的逻辑,只不过通过kafka的消息走真实的处理后存到正式的数据库等存储,而通过RocketMQ的消息则存在临时的存储中,需要注意的是,我们需要统计每个MQ每日读取和处理的消息数),因为MQ一般都是实时性的数据,例如过了一周后,两种MQ里消息的基本一致了,此时如果两个MQ每天接收和处理的消数一致,执行结果一致,此时就可以进行正式的切换了。此时就可以停掉旧的Producer系统,修改后重新上线,全部接上RocketMQ,这个然后再下线Consumer改完后再上线。这就是双写+双读的技术流程

基本上对于一些重要中间件的迁移,往往都会采用双写的方法,双写一段时间后,然后观察到两个方案的结果一致了,此时就可以正式下线一些旧的东西。

 

 

至此,对于RocketMQ的原理及特性的分析系列文章就完了,后面会对于RocketMQ的特性使用进行一个小项目实战。即儒猿技术窝的《基于RocketMQ的互联网酒店预订系统项目实战》专栏。

之后还会对RocketMQ的源码、原理进行进一步的研究。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/618505
推荐阅读
相关标签
  

闽ICP备14008679号