当前位置:   article > 正文

rocketmq 源码阅读笔记_rocketmq消费者源码

rocketmq消费者源码

一、概念模型

RocketMQ源码解析(一)-架构原理-https://www.jianshu.com/p/6b833d01b249

二、通信模块

RocketMQ源码阅读(二)-通信模块:https://www.jianshu.com/p/cf0f41314a76

个人总结及脑图:
在这里插入图片描述

三、存储原理

参考资料
1.RocketMQ源码阅读(四)-消息存储: https://www.jianshu.com/p/6494e33c9b1f
2.从RocketMQ消息持久化设计看磁盘性能瓶颈的突破

个人整理思维图如下:
在这里插入图片描述

四、nameserver

具体内容可看“空挡”的博客:RocketMQ源码解析(二)-nameserv:https://www.jianshu.com/p/1686fdfc409b

补充:RouteInfoManager

一句话:nameserver的主要功能是负责服务发现。底层实际上就是维护着好几个HashMap,用于记录各种映射关系,比方说集群cluster与broker的关系,比方说topic下有哪些queue。每当有broker发起注册时,nameserver就会去更新相关的映射关系和心跳。

数据结构

RouteInfoManager主要由五个HashMap组成,缓存着topic、queue、broker和filterServer的关联关系。

本人对这五个map进行了理解与整理,整理结果如下图所示:
在这里插入图片描述

pickupTopicRouteData()方法总结:
  1. 根据topic从topicQueueTable中取得对应的所有队列list < QueueData>
  2. 将每个队列QueueData的brokerName存到Set<String> brokerNameSet
  3. 遍历 brokerNameSet,从brokerAddrTable中取出brokerName对应的BrokerData,复制一份存到List<BrokerData> brokerDataList
  4. 每个BrokerData内还有多个地址brokerAddr(因为broker有主从,它们地址不同但名字相同),遍历地址并从filterServerTable中取出List<String> filterServerList,将其存入到 filterServerMap中。
  5. 将brokerDataList和filterServerMap放入到新生成的TopicRouteData中,并返回。
注册broker:registerBroker()方法总结
  1. 更新cluster和broker的对应关系
  2. 更新brokername对应的brokerdata
  3. 如果是master broker,且是第一次注册或者topic信息发生变化,则更新topicQueueTable
  4. 更新broker的心跳
  5. 更新filter server table
  6. 如果是slave broker注册,如果master存在,则返回master broker信息

五、Producer

具体内容可看“空挡”的博客:RocketMQ源码解析(三)-Producer:https://www.jianshu.com/p/02dbc0710f80
在这里插入图片描述

补充:

MQClientInstance启动时,会开启client的定时任务:定时任务功能是依靠JUC的ScheduledExecutorService线程池来定时创建线程。


        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                	//定时从nameServer获取路由信息并更新
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

六、consumer

详见:RocketMQ源码分析之消费者https://blog.csdn.net/xiaoye319/article/details/102953592

rebalance相关部分:https://www.jianshu.com/p/dd2202cc22ea 这篇有提及。

队列分配策略:
Rocketmq之消息队列分配策略:https://blog.csdn.net/yewandemty/article/details/81989695

注:

consumer中的组件较多,刚开始看很混乱,花了大概4天慢慢理清关系,虚线部分是个人理解的组件间的关系图,实现部分和之前的一样,是代码的流程关系。
在这里插入图片描述

BroadCast和Cluster的具体区别:
  1. 广播模式下,消费者会负责topic的所有MessageQueue;集群模式下,消费者会根据负载策略负责topic的一部分MessageQueue。
  2. 广播模式下,StoreOffset存储在本地;而集群模式下,StoreOffset远程存在broker中,具体作用详见下文“问题3”部分

六.5、消费者的失败重试机制

重试机制如下:
图片名称
失败重试延时表:

重复次数12345678910111213141516
延迟时间10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

TODO 补充到第六点的图里。

七、broker

broker的一大功能“存储功能”第二大点已经讲述,剩余的部分并不多,主要就是几个netty server处理producer和consumer请求的处理器。
在这里插入图片描述

(重要) RocketMQ如何保证消息不丢失

RocketMQ如何保证消息不丢失?

补充(重要)broker的主从节点如何选址/故障自动转移机制? 面试被问过

rocketmq原理_RocketMQ原理:RocketMQ高可用原理
使用的是DLedger机制,利用的是Raft算法(redis哨兵选举leader也用了这玩意)来实现的故障自动转移。
注意:另一个中间件kafka用的是partition leader选举和controller选举,不太一样。

八、顺序消息

具体内容可看“空挡”的博客:
RocketMQ源码解析(十二)-顺序消息:https://www.jianshu.com/p/57be402365ee

在这里插入图片描述
八股文总结:RocketMQ如何保证消息的顺序性?

总结:

  • producer -> broker: 发送到同一个队列里
  • broker -> consumer: 三把锁:
    • 锁定broker的MessageQueue,确保只有一个消费者能消费
    • 锁定consumer本地的MessageQueue,确保只有一个线程能消费。
    • 锁定broker的ProcessQueue,确保在rebalance的时候不会出现重复消费。(这是rocketmq为了解决重复消费问题而设置的机制,并不是为了顺序消息设计的,只是这个机制刚好也能帮助到顺序消息)

九、事物消息

详见:RocketMQ源码解析(十三)-事务消息:https://www.jianshu.com/p/8831a717edef
在这里插入图片描述

rocketmq事务消息机制常用于分布式事务中系统间实现最终一致性,具体实例:

  • 系统producer -> broker发送事务消息,broker同步返回发送是否成功
  • 若发送成功,系统开启本地事务,让用户去支付宝进行支付
  • 支付成功,支付宝 -> 系统,然后producer -> broker发送确认消息
  • broker -> consumer ,consumer收到消息后执行本地事务给用户充值。

rocketmq方案的缺点在于:

  1. 只能适用于双系统单节点这一最简单的场景(待确定)。
  2. 如果consumer失败了,producer的本地事务是无法回滚的。

十、延时消息

(重要)使用场景:订单系统,创建订单后,如果用户没有按时支付,就需要把之前的订单(支付单)给取消掉。
原理:面试常问Rocketmq延迟消息原理
进阶原理:弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!

在这里插入图片描述

十一、具体部署

RocketMQ4.3.0集群搭建和部署rocketMq监控平台

十二、tag:

https://blog.csdn.net/crazyman2010/article/details/80962430
https://www.jianshu.com/p/524ef06ce25a

个人所提出的疑问

问题1:不同消费者Group 消费同一个topic时会如何?

consumer group和topic的关系:
https://my.oschina.net/tantexian/blog/703804
结论:即便是在Cluster模式下,不同消费者Group 消费同一个topic,也都能获得消息。原因在于:
现假设消费者组groupA和groupB订阅了topicA,按照之前consumer的源码解析,会有以下几步:

  1. groupA和groupB都会先自己订阅的所有topic,因此都会查到topicA
  2. groupA和groupB的路由信息中存着topicA的所有queue。
  3. groupA和groupB的路由信息中topicA对应的broker信息,都会向broker查询topicA在自己group对应的所有consumer id。
  4. 现假设某一条信息进入了topicA的queueA中,无论是group A和group B中,都肯定会有某一个consumer负责消费queueA
  5. group A和group B的consumer各自的pullMessageService都会向broker请求获取这条消息,然后放到过各自的ProcesssQueue中进行处理,处理完毕后都会要求broker更新consume offset。
  6. broker中的ConsumerManageProcessor负责记录consumer的消费情况,里面的offsetTable以”topic@group”作为key,分别记录消费者group自己的消费情况,因此不同消费者组订阅相同topic时互不干扰
问题2:通讯模块里,处理器processor哪里有用到?

broker中的SendMessageProcessor、PullMessageProcessor、ConsumerManageProcessor和EndTransactionProcessor就是具体的例子。前者负责与producer通信,后者负责与consumer通信。

问题3:一个netty server只能注册一个处理器还是多个?

可以注册多个,broker就同时注册了多个,broker内含有remotingServer,而remotingServer中的processorTable负责管理processor,每个处理器都有自己的code,当client向server发出请求时,会先根据请求的code从processorTable中取出对应的processor,再由对应的processor进行处理。因此server可以处理不同client的不同请求。

问题4:同一个group下现有两个consumer A和B订阅了同一个topic,然后中途B挂了,这时发送了一些消息,之后B重启的话,这些消息会不会因为被A消费了而导致B收不到?

不会收不到对吧?但是实际上我自己实验的时候发现B并没有收到消息,是不是很神奇? 为此我又深入研究了一下消费的相关源码,发现原因出在了我是用的同一台机器开启的消费者A和消费者B,而广播模式下offset是本地持久化,所以A和B共享了同一个偏移量,B重启的时候,偏移量已经被A改变了,所以B收不到消息,所以需要注意如果采用广播模式的话,不同消费者一定不要部署在同一台容器里。

问题5:mq如何确保消息不丢失

producer:发消息失败重试机制。
broker:消息持久化,刷盘策略。
consumer:通过提交offset。

十三 八股文笔记总结

  • RocketMQ消息堆积了怎么解决?https://www.yuque.com/hollis666/fplypc/ewfswph69g1n2u8c
  • RocketMQ的事务消息是如何实现的?https://www.yuque.com/hollis666/fplypc/abxh7z
  • RocketMQ如何保证消息的顺序性?https://www.yuque.com/hollis666/fplypc/nt1ishhbunfo0g86
  • RocketMQ如何保证消息不丢失?https://www.yuque.com/hollis666/fplypc/txw2gxr6utxggu60
  • RocketMQ如何实现延时消息?
  • RocketMQ有几种集群方式?https://www.yuque.com/hollis666/fplypc/ng01aagqxqmlvamm
  • 介绍一下RocketMQ的工作流程?https://www.yuque.com/hollis666/fplypc/iagycgg9pxkt78rr
  • broker的主从节点如何选址/故障自动转移机制?
  • broker的消息存储在哪?存在commitLog。 存储原理是什么?
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/603725
推荐阅读
相关标签
  

闽ICP备14008679号