赞
踩
RocketMQ是阿里开源的分布式消息中间件。具有高性能、低延时和高可靠等特性。主要用来提升性能、系统解耦、流量削峰等。
1. 灵活可扩展性
RocketMQ天然支持集群,其核心四组件(Name Server、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。
2. 海量消息堆积能力
RocketMQ采用零拷贝原理实现超大的消息的堆积能力,据说单机已可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。
3. 支持顺序消息
可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。
4. 多种消息过滤方式
消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。
5. 支持事务消息
RocketMQ除了支持普通消息,顺序消息之外还支持事务消息,这个特性对于分布式事务来说提供了又一种解决思路。
6. 回溯消费
回溯消费是指消费者已经消费成功的消息,由于业务上需求需要重新消费,RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
MQ的作用很简单,削峰填谷。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。
如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过MQ异步的方式去处理了。同时,考虑到异步带来的不一致的问题,我们可以通过job去重试保证接口调用成功,而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。
使用MQ之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。
基于以上几个考虑,我们最终选择了RocketMQ。
Kafka | RocketMQ | RabbitMQ | ActiveMQ | |
单机吞吐量 | 10万级 | 10万级 | 万级 | 万级 |
开发语言 | Scala | Java | Erlang | Java |
高可用 | 分布式架构 | 分布式架构 | 主从架构 | 主从架构 |
性能 | ms级 | ms级 | us级 | ms级 |
功能 | 只支持主要的MQ功能 | 顺序消息、事务消息等功能完善 | 并发强、性能好、延时低 | 成熟的社区产品、文档丰富 |
消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。
生产者丢失
生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。
由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。
异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。
一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。
MQ丢失
如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。比如RocketMQ:
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
消费者丢失
消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ默认是需要消费者回复ack确认。
消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。
因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:
消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费
如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息
处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状
最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
满足如下条件,默认48小时后会删除不再使用的CommitLog文件
RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成
它的架构原理是这样的:
RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。
Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。
CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。
特点:
顺序写
我们知道,操作系统每次从磁盘读写数据的时候,都需要找到数据在磁盘上的地址,再进行读写。而如果是机械硬盘,寻址需要的时间往往会比较长而一般来说,如果把数据存储在内存上面,少了寻址的过程,性能会好很多;但Kafka的数据存储在磁盘上面,依然性能很好,这是为什么呢?这是因为,Kafka采用的是顺序写,直接追加数据到末尾。实际上,磁盘顺序写的性能极高,在磁盘个数一定,转数一定的情况下,基本和内存速度一致因此,磁盘的顺序写这一机制,极大地保证了Kafka本身的性能。
零拷贝
比如:读取文件,再用socket发送出去这一过程
- buffer = File.read
- Socket.send(buffer)
传统方式实现:
先读取、再发送,实际会经过以下四次复制
传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的
那么采用零拷贝的方式发送消息,必定会大大减少读取的开销,使得RocketMQ读取消息的性能有一个质的提升
此外,还需要再提一点,零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMQ的存储消息的文件CommitLog的大小规定为1G的原因。
消息在master和slave之间的同步是根据raft协议来进行的:
是因为使用了顺序存储、Page Cache和异步刷盘。
事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。
半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。
实现原理如下:
最终,如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。
基于Dledger实现RocketMQ高可用自动切换
RocketMQ 4.5之后支持了一种叫做Dledger机制,基于Raft协议实现的一个机制。
我们可以让一个Master Broker对应多个Slave Broker,一旦Master Broker宕机了,在多个Slave中通过Dledger技术将一个Slave Broker选为新的Master Broker对外提供服务。
在生产环境中可以是用Dledger机制实现自动故障切换,只要10秒或者几十秒的时间就可以完成。
在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker。
如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用。
如果上次失败的Broker可用那么还是会选择该Broker的队列。
如果上述情况失败,则随机选择一个进行发送。
在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间。
一般是负载均衡做随机选择,但也可以走其他策略,比如根据某个字段来hash。
同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
1. 集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
2. 广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。
消费者获取消息有两种模式:推送模式和拉取模式。
1. PushConsumer
推送模式(虽然RocketMQ使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。
2. PullConsumer
拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。
可以使用同一topic,同一个queue,发消息的时候一个线程去发送消息,消费的时候一个线程去消费一个queue里的消息。
开发
运维
消息的发送流程
一条消息从生产到被消费,将会经历三个阶段:
生产阶段,Producer新建消息,然后通过网络将消息投递给MQ Broker
存储阶段,消息将会存储在Broker端磁盘中
消息阶段,Consumer将会从Broker拉取消息
以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。
生产阶段
生产者(Producer)通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。
send方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。
消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能会返回不同响应状态:
引用官方状态说明:
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度。
如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType = SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。
如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
消息发送成功,但是此时Slave不可用。
如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
另外RocketMQ还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。
异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。
不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。
Broker存储阶段
默认情况下,消息只要到了Broker端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后Broker定期批量的将一组消息从内存异步刷入磁盘。
这种方式减少I/O次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。
若想保证Broker端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
修改Broker端配置如下:
- # 默认情况为ASYNC_FLUSH
- flushDiskType = SYNC_FLUSH
若Broker未在同步刷盘时间内(默认为5s)完成刷盘,将会返回SendStatus.FLUSH_DISK_TIMEOUT状态给生产者。
集群部署
默认方式下,消息写入master成功,就可以返回确认响应给生产者,接着消息将会异步复制到slave节点。
注:master配置:flushDiskType = SYNC_FLUSH
此时若master突然宕机且不可恢复,那么还未复制到slave的消息将会丢失。
为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。
Broker master节点同步复制配置如下:
- # 默认情况为ASYNC_FLUSH
- flushDiskType = SYNC_FLUSH
如果slave节点未在指定时间内同步返回响应,生产者将会收到SendStatus.FLUSH_SLAVE_TIMEOUT返回状态。
消费阶段
消费者从broker拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态给Broker。
如果Broker未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。
以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则需返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。
总结
最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。
造成重复消费的原因
通过幂等性来保证,只要保证重复消息不对结果产生影响,就完美地解决这个问题。
解决方法
缺点
方法一:RocketMQ消息查询的性能不是特别好,如果在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,可能会影响接口的性能;
方法二:在一些极端的场景下,Redis也无法保证消息发送成功之后,就一定能写入Redis成功,比如写入消息成功而Redis此时宕机,那么再次查询Redis判断消息是否已经发送过,是无法得到正确结果的;
消费者端
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。