当前位置:   article > 正文

java中间件-rocketMq(学习笔记)_rocketmq长连接

rocketmq长连接

1.为什么要使⽤消息队列呢?

消息队列主要有三⼤⽤途,我们拿⼀个电商系统的下单举例:
解耦 :引⼊消息队列之前,下单完成之后,需要订单服务去调⽤库存服务减库存,调⽤
营销服务加营销数据 …… 引⼊消息队列之后,可以把订单完成的消息丢进队列⾥,下游服

务⾃⼰去调⽤就⾏了,这样就完成了订单服务和其它服务的解耦合。Version:0.9 

异步 :订单⽀付之后,我们要扣减库存、增加积分、发送消息等等,这样⼀来这个链路
就长了,链路⼀长,响应时间就变长了。引⼊消息队列,除了 更新订单状态 ,其它的都
可以 异步 去做,这样⼀来就来,就能降低响应时间。
 
削峰 :消息队列合⼀⽤来削峰,例如秒杀系统,平时流量很低,但是要做秒杀活动,秒
杀的时候流量疯狂怼进来,我们的服务器, Redis MySQL 各⾃的承受能⼒都不⼀样,直
接全部流量照单全收肯定有问题啊,严重点可能直接打挂了。
我们可以把请求扔到队列⾥⾯,只放出我们服务能处理的流量,这样就能抗住短时间的⼤
流量了。

总结⼀下
选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维⾏,可拓展性,社区活
跃度。⽬前常⽤的⼏个中间件, ActiveMQ 作为 ⽼古董 ,市⾯上⽤的已经不多,其它⼏种:
RabbitMQ
优点:轻量,迅捷,容易部署和使⽤,拥有灵活的路由配置
缺点:性能和吞吐量不太理想,不易进⾏⼆次开发
RocketMQ
优点:性能好,⾼吞吐量,稳定可靠,有活跃的中⽂社区
缺点:兼容性上不是太好
Kafka
优点:拥有强⼤的性能及吞吐量,兼容性很好
缺点:由于 攒⼀波再处理 导致延迟⽐较⾼
我们的系统是⾯向⽤户的 C 端系统,具有⼀定的并发量,对性能也有⽐较⾼的要求,所以选择
了低延迟、吞吐量⽐较⾼,可⽤性⽐较好的 RocketMQ

 

3.RocketMQ 有什么优缺点?
RocketMQ 优点:
单机吞吐量:⼗万级
可⽤性:⾮常⾼,分布式架构
消息可靠性:经过参数优化配置,消息可以做到 0 丢失
功能⽀持: MQ 功能较为完善,还是分布式的,扩展性好
⽀持 10 亿级别的消息堆积,不会因为堆积导致性能下降
源码是 Java ,⽅便结合公司⾃⼰的业务⼆次开发
天⽣为⾦融互联⽹领域⽽⽣,对于可靠性要求很⾼的场景,尤其是电商⾥⾯的订单扣款,
以及业务削峰,在⼤量交易涌⼊时,后端可能⽆法及时处理的情况
RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿⾥双 11 已经经历了多次考验,
如果你的业务有上述并发场景,建议可以选择 RocketMQ

RocketMQ 缺点:
⽀持的客户端语⾔不多,⽬前是 Java c++ ,其中 c++ 不成熟
没有在 MQ 核⼼中去实现 JMS 等接⼜,有些系统要迁移需要修改⼤量代码

消息队列有两种模型: 队列模型 发布 / 订阅模型
队列模型
这是最初的⼀种消息队列模型,对应着消息队列 - - 的模型。⽣产者往某个队列⾥
⾯发送消息,⼀个队列可以存储多个⽣产者的消息,⼀个队列也可以有多个消费者,但是
消费者之间是竞争关系,也就是说每条消息只能被⼀个消费者消费。
发布 / 订阅模型
如果需要将⼀份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很
显然,队列模型⽆法满⾜这个需求。解决的⽅式就是发布 / 订阅模型。
在发布 - 订阅模型中,消息的发送⽅称为发布者( Publisher ),消息的接收⽅称为订阅者
Subscriber ),服务端存放消息的容器称为主题( Topic )。发布者将消息发送到主题
中,订阅者在接收消息之前需要先 订阅主题 订阅 在这⾥既是⼀个动作,同时还可以
认为是主题在消费时的⼀个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消
息。
它和 队列模式 的异同:⽣产者就是发布者,队列就是主题,消费者就是订阅者,⽆本
质区别。唯⼀的不同点在于:⼀份消息数据是否可以被多次消费。
RocketMQ 使⽤的消息模型是标准的发布 - 订阅模型,在 RocketMQ 的术语表中,⽣产者、消费
者和主题,与发布 - 订阅模型中的概念是完全⼀样的。
RocketMQ 本⾝的消息是由下⾯⼏部分组成:
Message
Message (消息)就是要传输的信息。
⼀条消息必须有⼀个主题( Topic ),主题可以看做是你的信件要邮寄的地址。
⼀条消息也可以拥有⼀个可选的标签( Tag )和额处的键值对,它们可以⽤于设置⼀个业务
Key 并在 Broker 上查找此消息以便在开发期间查找问题。
Topic
Topic (主题)可以看做消息的归类,它是消息的第⼀级类型。⽐如⼀个电商系统可以分为:
交易消息、物流消息等,⼀条消息必须有⼀个 Topic
Topic 与⽣产者和消费者的关系⾮常松散,⼀个 Topic 可以有 0 个、 1 个、多个⽣产者向其发送
消息,⼀个⽣产者也可以同时向不同的 Topic 发送消息。
⼀个 Topic 也可以被 0 个、 1 个、多个消费者订阅。
Tag
Tag (标签)可以看作⼦主题,它是消息的第⼆级类型,⽤于为⽤户提供额外的灵活性。使⽤
标签,同⼀业务模块不同⽬的的消息就可以⽤相同 Topic ⽽不同的 Tag 来标识。⽐如交易消
息又可以分为:交易创建消息、交易完成消息等,⼀条消息可以没有 Tag
标签有助于保持你的代码⼲净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
Group
RocketMQ 中,订阅者的概念是通过消费组( Consumer Group )来体现的。每个消费组都消费
主题中⼀份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,⼀条消息被
Consumer Group1 消费过,也会再给 Consumer Group2 消费。
消费组中包含多个消费者,同⼀个组内的消费者是竞争消费的关系,每个消费者负责消费组
内的⼀部分消息。默认情况,如果⼀条消息被消费者 Consumer1 消费了,那同组的其他消费者
就不会再收到这条消息。
Message Queue
Message Queue (消息队列),⼀个 Topic 下可以设置多个消息队列, Topic 包括多个
Message Queue ,如果⼀个 Consumer 需要获取 Topic 下所有的消息,就要遍历所有的 Message
Queue
RocketMQ 还有⼀些其它的 Queue—— 例如 ConsumerQueue
Offset
Topic 的消费过程中,由于消息需要被不同的组进⾏多次消费,所以消费完的消息并不会⽴
即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护⼀个消费位置( Consumer
Offset ),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费⼀条消
息,消费位置就加⼀。
也可以这么说, Queue 是⼀个长度⽆限的数组, Offset 就是下标。
RocketMQ 的消息模型中,这些就是⽐较关键的概念了。画张图总结⼀下:
6. 消息的消费模式了解吗?
消息消费模式有两种: Clustering (集群消费)和 Broadcasting (⼴播消费)。

 

 

默认情况下就是集群消费,这种模式下 ⼀个消费者组共同消费⼀个主题的多个队列,⼀个队列只
会被⼀个消费者消费 ,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消
费。
⽽⼴播消费消息会发给消费者组中的每⼀个消费者进⾏消费。

 

NameServer
NameServer 是⼀个⽆状态的服务器,⾓⾊类似于 Kafka 使⽤的 Zookeeper ,但⽐ Zookeeper
轻量。特点:
每个 NameServer 结点之间是相互独⽴,彼此没有任何信息交互。
Nameserver 被设计成⼏乎是⽆状态的,通过部署多个结点来标识⾃⼰是⼀个伪集群,
Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker
Consumer 也会定时从 NameServer 获取 Topic 的路由信息, Broker 在启动时会向
NameServer 注册,并定时进⾏⼼跳连接,且定时同步维护的 Topic NameServer
功能主要有两个:
1 、和 Broker 结点保持长连接。
2 、维护 Topic 的路由信息。
Broker
消息存储和中转⾓⾊,负责存储和转发消息。
Broker 内部维护着⼀个个 Consumer Queue ,⽤来存储消息的索引,真正存储消息的地⽅
CommitLog (⽇志⽂件)。

 

单个 Broker 与所有的 Nameserver 保持着长连接和⼼跳,并会定时将 Topic 信息同步到
NameServer ,和 NameServer 的通信底层是通过 Netty 实现的。
Producer
消息⽣产者,业务端负责发送消息,由⽤户⾃⾏实现和分布式部署。
Producer 由⽤户进⾏分布式部署,消息由 Producer 通过多种负载均衡模式发送到 Broker
集群,发送低延时,⽀持快速失败。
RocketMQ 提供了三种⽅式发送消息:同步、异步和单向
同步发送 :同步发送指消息发送⽅发出数据后会在收到接收⽅发回响应之后才发下⼀
个数据包。⼀般⽤于重要通知消息,例如重要通知邮件、营销短信。
异步发送 :异步发送指发送⽅发出数据后,不等接收⽅发回响应,接着发送下个数据
包,⼀般⽤于可能链路耗时较长⽽对响应时间敏感的业务场景,例如⽤户视频上传后
通知启动转码服务。
单向发送 :单向发送是指只负责发送消息⽽不等待服务器回应且没有回调函数触发,
适⽤于某些耗时⾮常短但对可靠性要求并不⾼的场景,例如⽇志收集。
Consumer
消息消费者,负责消费消息,⼀般是后台系统负责异步消费。
Consumer 也由⽤户部署,⽀持 PUSH PULL 两种消费模式,⽀持 集群消费 和 ⼴播消
费 ,提供 实时的消息订阅机制 。
Pull :拉取型消费者( Pull Consumer )主动从消息服务器拉取信息,只要批量拉取到消
息,⽤户应⽤就会启动消费过程,所以 Pull 称为主动消费型。
Push :推送型消费者( Push Consumer )封装了消息的拉取、消费进度和其他的内部维护
⼯作,将消息到达时执⾏的回调接⼜留给⽤户应⽤程序来实现。所以 Push 称为被动消费
类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push ⾸先要注
册消费监听器,当监听器处触发后才开始消费消息。

 

⽣产
在⽣产阶段,主要 通过请求确认机制,来保证消息的可靠传递
1 、同步发送的时候,要注意处理响应结果和异常。如果返回响应 OK ,表⽰消息成功发送
到了 Broker ,如果响应失败,或者发⽣其它异常,都应该重试。
2 、异步发送的时候,应该在回调⽅法⾥检查,如果发送失败或者异常,都应该进⾏重
试。
3 、如果发⽣超时的情况,也可以通过查询⽇志的 API ,来检查是否在 Broker 存储成功。
存储
存储阶段,可以通过 配置可靠性优先的 Broker 参数来避免因为宕机丢消息 ,简单说就是可靠
性优先的场景都应该使⽤同步。
1 、消息只要持久化到 CommitLog (⽇志⽂件)中,即使 Broker 宕机,未消费的消息也能
重新恢复再消费。
2 Broker 的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息⼀定存储在
pagecache 中(内存中),但是同步刷盘更可靠,它是 Producer 发送消息后等数据持久化到
磁盘之后再返回响应给 Producer

3 Broker 通过主从模式来保证⾼可⽤, Broker ⽀持 Master Slave 同步复制、 Master Slave
异步复制模式,⽣产者的消息都是发送给 Master ,但是消费既可以从 Master 消费,也可以
Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了
消息不会丢失。
消费
Consumer ⾓度分析,如何保证消息被成功消费?
Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就⽴即发送消费
确认,⽽是应该在执⾏完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了
消费的位置,逻辑执⾏失败了,没有确认,再去队列拉取消息,就还是之前的⼀条。

10. 如何处理消息重复的问题呢?
对分布式消息队列来说,同时做到确保⼀定投递和不重复投递是很难的,就是所谓的 有且仅
有⼀次 RocketMQ 择了确保⼀定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端⾃⼰保证,主要的⽅式有两种: 业务幂等 消息去重
业务幂等 :第⼀种是保证消费逻辑的幂等性,也就是多次调⽤和⼀次调⽤的效果是⼀样的。
这样⼀来,不管消息消费多少次,对业务都没有影响。
消息去重 :第⼆种是业务端,对重复的消息就不再消费了。这种⽅法,需要保证每条消息都
有⼀个惟⼀的编号,通常是业务相关的,⽐如订单号,消费的记录需要落库,⽽且需要保证
和消息确认这⼀步的原⼦性。
具体做法是可以建⽴⼀个消费记录表,拿到这个消息做数据库的 insert 操作。给这个消息做⼀
个唯⼀主键( primary key )或者唯⼀约束,那么就算出现重复消费的情况,就会导致主键冲
突,那么就不再处理这条消息。
 
11. 怎么处理消息积压?
发⽣了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提⾼消费能⼒,⼀
般有两种办法:

12. 顺序消息如何实现?
顺序消息是指消息的消费顺序和产⽣顺序相同,在有些业务逻辑下,必须保证顺序,⽐如订
单的⽣成、付款、发货,这个消息必须按顺序处理才⾏。

 

消费端通过使⽤ MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题。
全局顺序消息
RocketMQ 默认情况下不保证顺序,⽐如创建⼀个 Topic ,默认⼋个写队列,⼋个读队列,这
时候⼀条消息可能被写⼊任意⼀个队列⾥;在数据的读取过程中,可能有多个 Consumer ,每
Consumer 也可能启动多个线程并⾏处理,所以消息被哪个 Consumer 消费,被消费的顺序
和写⼈的顺序是否⼀致是不确定的。
要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 ⼀,然后 Producer Consumer 的并
发设置,也要是⼀。简单来说,为了保证整个 Topic 全局消息有序,只能消除所有的并发处
理,各部分都设置成单线程处理 ,这时候就完全牺牲 RocketMQ 的⾼并发、⾼吞吐的特性了。

 

13. 如何实现消息过滤?
有两种⽅案:
⼀种是在 Broker 端按照 Consumer 的去重逻辑进⾏过滤,这样做的好处是避免了⽆⽤的消
息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
另⼀种是在 Consumer 端过滤,⽐如按照消息设置的 tag 去重,这样的好处是实现起来简
单,缺点是有⼤量⽆⽤的消息到达了 Consumer 端只能丢弃不处理。
⼀般采⽤ Cosumer 端过滤,如果希望提⾼吞吐量,可以采⽤ Broker 过滤。
对消息的过滤有三种⽅式:

 

14. 延时消息了解吗?
电商的订单超时⾃动取消,就是⼀个典型的利⽤延时消息的例⼦,⽤户提交了⼀个订单,就
可以发送⼀个延时消息, 1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库
存。
RocketMQ 是⽀持延时消息的,只需要在⽣产消息的时候设置消息的延时级别:

 

 

半消息:是指暂时还不能被 Consumer 消费的消息, Producer 成功发送到 Broker 端的消息,
但是此消息被标记为 暂不可投递 状态,只有等 Producer 端执⾏完本地事务后经过⼆次确认
了之后, Consumer 才能消费此条消息。
依赖半消息,可以实现分布式消息事务,其中的关键在于⼆次确认以及消息回查:
1 Producer broker 发送半消息
2 Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 不可投递 状态,
Consumer 消费不了。
3 Producer 端执⾏本地事务。
4 、正常情况本地事务执⾏完成, Producer Broker 发送 Commit/Rollback ,如果是
Commit Broker 端将半消息标记为正常消息, Consumer 可以消费,如果是 Rollback
Broker 丢弃此消息。
5 、异常情况, Broker 端迟迟等不到⼆次确认。在⼀定时间后,会查询所有的半消息,然
后到 Producer 端查询半消息的执⾏情况。
6 Producer 端查询本地事务的状态
7 、根据事务的状态提交 commit/rollback broker 端。( 5 6 7 是消息回查)
8 、消费者段消费到消息之后,执⾏本地事务,执⾏本地事务。
16. 死信队列知道吗?
死信队列⽤于处理⽆法被正常消费的消息,即死信消息。
当⼀条消息初次消费失败, 消息队列 RocketMQ 会⾃动进⾏消息重试 ;达到最⼤重试次数
后,若消费依然失败,则表明消费者在正常情况下⽆法正确地消费该消息,此时,消息队列
RocketMQ 不会⽴刻将消息丢弃,⽽是将其发送到该 消费者对应的特殊队列中 ,该特殊队列
称为 死信队列
死信消息的特点
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天, 3 天后会被⾃动删除。因此,需要在死信消息产⽣后
3 天内及时处理。
死信队列的特点
⼀个死信队列对应⼀个 Group ID , ⽽不是对应单个消费者实例。
如果⼀个 Group ID 未产⽣死信消息,消息队列 RocketMQ 不会为其创建相应的死信队
列。
⼀个死信队列包含了对应 Group ID 产⽣的所有死信消息,不论该消息属于哪个 Topic
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

 

17. 如何保证 RocketMQ 的⾼可⽤?
NameServer 因为是⽆状态,且不相互通信的,所以只要集群部署就可以保证⾼可⽤。
RocketMQ 的⾼可⽤主要是在体现在 Broker 的读和写的⾼可⽤, Broker 的⾼可⽤是通过 集群
主从 实现的。

 

Broker 可以配置两种⾓⾊: Master Slave Master ⾓⾊的 Broker ⽀持读和写, Slave ⾓⾊的
Broker 只⽀持读, Master 会向 Slave 同步消息。
也就是说 Producer 只能向 Master ⾓⾊的 Broker 写⼊消息, Cosumer 可以从 Master Slave ⾓⾊的
Broker 读取消息。
Consumer 的配置⽂件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可⽤或者
繁忙的时候, Consumer 的读请求会被⾃动切换到从 Slave 。有了⾃动切换 Consumer 这种机
制,当⼀个 Master ⾓⾊的机器出现故障后, Consumer 仍然可以从 Slave 读取消息,不影响
Consumer 读取消息,这就实现了读的⾼可⽤。
如何达到发送端写的⾼可⽤性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建
在多个 Broker 组上(相同 Broker 名称,不同 brokerId 机器组成 Broker 组),这样当 Broker
组的 Master 不可⽤后,其他组 Master 仍然可⽤, Producer 仍然可以发送消息 RocketMQ ⽬前
还不⽀持把 Slave ⾃动转成 Master ,如果机器资源不⾜,需要把 Slave 转成 Master ,则要⼿动
停⽌ Slave ⾊的 Broker ,更改配置⽂件,⽤新的配置⽂件启动 Broker

18. 说⼀下 RocketMQ 的整体⼯作流程?
简单来说, RocketMQ 是⼀个分布式消息队列,也就是 消息队列 + 分布式系统
作为消息队列,它是 - - 的⼀个模型,对应的就是 Producer Broker Cosumer ;作为
分布式系统,它要有服务端、客户端、注册中⼼,对应的就是 Broker Producer/Consumer
NameServer
所以我们看⼀下它主要的⼯作流程: RocketMQ NameServer 注册中⼼集群、 Producer ⽣产者
集群、 Consumer 消费者集群和若⼲ Broker RocketMQ 进程)组成:
1. Broker 在启动的时候去向所有的 NameServer 注册,并保持长连接,每 30s 发送⼀次⼼跳
2. Producer 在发送消息的时候从 NameServer 获取 Broker 服务器地址,根据负载均衡算法选择
⼀台服务器来发送消息
3. Conusmer 消费消息的时候同样从 NameServer 获取 Broker 地址,然后主动拉取消息来消费

 

 

CommitLog :消息主体以及元数据的存储主体,存储 Producer 端写⼊的消息主体内容 ,
息内容不是定长的。单个⽂件⼤⼩默认 1G, ⽂件名长度为 20 位,左边补零,剩余为起始偏
移量,⽐如 00000000000000000000 代表了第⼀个⽂件,起始偏移量为 0 ,⽂件⼤⼩为
1G=1073741824 ;当第⼀个⽂件写满了,第⼆个⽂件为 00000000001073741824 ,起始偏移
量为 1073741824 ,以此类推。消息主要是顺序写⼊⽇志⽂件,当⽂件满了,写⼊下⼀个⽂
件。
CommitLog ⽂件保存于 ${Rocket_Home}/store/commitlog ⽬录中,从图中我们可以明显看出
来⽂件名的偏移量,每个⽂件默认 1G ,写满后⾃动⽣成⼀个新的⽂件。
ConsumeQueue :消息消费队列,引⼊的⽬的主要是提⾼消息消费的性能,由于
RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进⾏的,如果要遍历
commitlog ⽂件中根据 topic 检索消息是⾮常低效的。
Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中, ConsumeQueue (逻辑消费
队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理
偏移量 offset ,消息⼤⼩ size 和消息 Tag HashCode 值。
ConsumeQueue ⽂件可以看成是基于 Topic CommitLog 索引⽂件,故 ConsumeQueue ⽂件夹
的组织⽅式如下: topic/queue/file 三层组织结构,具体存储路径为:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName} 。同样 ConsumeQueue ⽂件采取定
长设计,每⼀个条⽬共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、 4 字节的消息长
度、 8 字节 tag hashcode ,单个⽂件由 30W 个条⽬组成,可以像数组⼀样随机访问每⼀个条
⽬,每个 ConsumeQueue ⽂件⼤⼩约 5.72M

 

IndexFile IndexFile (索引⽂件)提供了⼀种可以通过 key 或时间区间来查询消息的⽅
法。 Index ⽂件的存储位置是: $HOME \store\index${fileName} ,⽂件名 fileName 是以创建
时的时间戳命名的,固定的单个 IndexFile ⽂件⼤⼩约为 400M ,⼀个 IndexFile 可以保存
2000W 个索引, IndexFile 的底层存储设计为在⽂件系统中实现 HashMap 结构,故
RocketMQ 的索引⽂件其底层实现为 hash 索引。

总结⼀下: RocketMQ 采⽤的是混合型的存储结构,即为 Broker 单个实例下所有的队列共⽤⼀
个⽇志数据⽂件(即为 CommitLog )来存储。
RocketMQ 的混合型存储结构 ( 多个 Topic 的消息实体内容都存储于⼀个 CommitLog ) 针对
Producer Consumer 分别采⽤了数据和索引部分相分离的存储结构, Producer 发送消息⾄
Broker 端,然后 Broker 端使⽤同步或者异步的⽅式对消息刷盘持久化,保存⾄ CommitLog 中。
只要消息被刷盘持久化⾄磁盘⽂件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因
为如此, Consumer 也就肯定有机会去消费这条消息。当⽆法拉取到消息后,可以等下⼀次消
息拉取,同时服务端也⽀持长轮询模式,如果⼀个消息拉取请求未拉取到消息, Broker 允许等
30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这⾥, RocketMQ 的具体做法是,使⽤ Broker 端的后台服务线程 —ReputMessageService 不停地
分发请求并异步构建 ConsumeQueue (逻辑消费队列)和 IndexFile (索引⽂件)数据。

 

 

Version:0.9 StartHTML:0000000105 EndHTML:0000012747 StartFragment:0000000141 EndFragment:0000012707

21. 说说 RocketMQ 怎么对⽂件进⾏读写的?
RocketMQ 对⽂件的读写巧妙地利⽤了操作系统的⼀些⾼效⽂件读写⽅式 —— PageCache
顺序读写 零拷⻉
PageCache 、顺序读取
RocketMQ 中, ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page
cache 机制的预读取作⽤下, Consume Queue ⽂件的读性能⼏乎接近读内存,即使在有消息堆
积情况下也不会影响性能。⽽对于 CommitLog 消息存储的⽇志数据⽂件来说,读取消息内容
时候会产⽣较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,⽐如设
置调度算法为 “Deadline” (此时块存储采⽤ SSD 的话),随机读的性能也会有所提升。
页缓存( PageCache) OS 对⽂件的缓存,⽤于加速对⽂件的读写。⼀般来说,程序对⽂件进
⾏顺序读写的速度⼏乎接近于内存的读写速度,主要原因就是由于 OS 使⽤ PageCache 机制对读
写访问操作进⾏了性能优化,将⼀部分的内存⽤作 PageCache 。对于数据的写⼊, OS 会先写⼊
Cache 内,随后通过异步的⽅式由 pdflush 内核线程将 Cache 内的数据刷盘⾄物理磁盘上。对
于数据的读取,如果⼀次读取⽂件时出现未命中 PageCache 的情况, OS 从物理磁盘上访问读取
⽂件的同时,会顺序对其他相邻块的数据⽂件进⾏预读取。
零拷贝
另外, RocketMQ 主要通过 MappedByteBuffer 对⽂件进⾏读写操作。其中,利⽤了 NIO 中的
FileChannel 模型将磁盘上的物理⽂件直接映射到⽤户态的内存地址中(这种 Mmap 的⽅式减少
了传统 IO ,将磁盘⽂件数据在操作系统内核地址空间的缓冲区,和⽤户应⽤程序地址空间的
缓冲区之间来回进⾏拷贝的性能开销),将对⽂件的操作转化为直接对内存地址进⾏操作,
从⽽极⼤地提⾼了⽂件的读写效率(正因为需要使⽤内存映射机制,故 RocketMQ 的⽂件存储
都使⽤定长结构来存储,⽅便⼀次将整个⽂件映射⾄内存)。
说说什么是零拷贝 ?
在操作系统中,使⽤传统的⽅式,数据需要经历⼏次拷贝,还要经历⽤户态 / 内核态切换。

 

1. 从磁盘复制数据到内核态内存;
2. 从内核态内存复制到⽤户态内存;
3. 然后从⽤户态内存复制到⽹络驱动的内核态内存;
4. 最后是从⽹络驱动的内核态内存复制到⽹卡中进⾏传输。
所以,可以通过零拷贝的⽅式, 减少⽤户态与内核态的上下⽂切换 内存拷贝的次数 ,⽤来
提升 I/O 的性能。零拷贝⽐较常见的实现⽅式是 mmap ,这种机制在 Java 中是通过
MappedByteBuffer 实现的。
RocketMQ 提供了两种刷盘策略:同步刷盘和异步刷盘
同步刷盘:在消息达到 Broker 的内存之后,必须刷到 commitLog ⽇志⽂件中才算成功,然
后返回 Producer 数据已经发送成功。
异步刷盘:异步刷盘是指消息达到 Broker 内存后就返回 Producer 数据已经发送成功,会唤
醒⼀个线程去将数据持久化到 CommitLog ⽇志⽂件中。
Broker 在消息的存取时直接操作的是内存(内存映射⽂件),这可以提供系统的吞吐量,但
是⽆法避免机器掉电时数据丢失,所以需要持久化到磁盘中。
刷盘的最终实现都是使⽤ NIO 中的 MappedByteBuffer.force() 将映射区的数据写⼊到磁盘,如
果是同步刷盘的话,在 Broker 把消息写到 CommitLog 映射区后,就会等待写⼊完成。
异步⽽⾔,只是唤醒对应的线程,不保证执⾏的时机,流程如图所⽰。
22. 能说下 RocketMQ 的负载均衡是如何实现的?
RocketMQ 中的负载均衡都在 Client 端完成,具体来说的话,主要可以分为 Producer 端发送消息
时候的负载均衡和 Consumer 端订阅消息的负载均衡。
Producer 的负载均衡
Producer 端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo ,在获取了
TopicPublishInfo 路由信息后, RocketMQ 的客户端在默认⽅式下 selectOneMessageQueue() ⽅法
会从 TopicPublishInfo 中的 messageQueueList 中选择⼀个队列( MessageQueue )进⾏发送消息。
具这⾥有⼀个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过
滤掉 not available Broker 代理。

Consumer 的负载均衡
RocketMQ 中, Consumer 端的两种消费模式( Push/Pull )都是基于拉模式来获取消息的,⽽
Push 模式只是对 pull 模式的⼀种封装,其本质实现为消息拉取线程在从服务器拉取到⼀批消
息后,然后提交到消息消费线程池后,又 马不停蹄 的继续向服务器再次尝试拉取消息。如
果未拉取到消息,则延迟⼀下又继续拉取。在两种基于拉模式的消费⽅式( Push/Pull )中,均
需要 Consumer 端知道从 Broker 端的哪⼀个消息队列中去获取消息。因此,有必要在 Consumer
端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同⼀个 ConsumerGroup 中的哪些
Consumer 消费。
1. Consumer 端的⼼跳包发送
Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送⼼
跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端 id 的值等信
息)。 Broker 端在收到 Consumer 的⼼跳消息后,会将它维护在 ConsumerManager 的本地缓存变
—consumerTable ,同时并将封装后的客户端⽹络通道信息保存在本地缓存变量
channelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。
2. Consumer 端实现负载均衡的核⼼类 —RebalanceImpl
Consumer 实例的启动流程中的启动 MQClientInstance 实例部分,会完成负载均衡服务线
—RebalanceService 的启动(每隔 20s 执⾏⼀次)。
通过查看源码可以发现, RebalanceService 线程的 run() ⽅法最终调⽤的是 RebalanceImpl
rebalanceByTopic() ⽅法,这个⽅法是实现 Consumer 端负载均衡的核⼼。
rebalanceByTopic() ⽅法会根据消费者通信类型为 ⼴播模式 还是 集群模式 做不同的逻辑
处理。这⾥主要来看下集群模式下的主要处理流程:

23.RocketMQ 消息长轮询了解吗?
所谓的长轮询,就是 Consumer 拉取消息,如果对应的 Queue 如果没有数据, Broker 不会⽴即
返回,⽽是把 PullReuqest hold 起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再
重新处理该 queue 上的所有 PullRequest

 

PullMessageProcessor#processRequest

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/712523
推荐阅读
相关标签
  

闽ICP备14008679号