赞
踩
RocketMQ源码解析(一)-架构原理-https://www.jianshu.com/p/6b833d01b249
RocketMQ源码阅读(二)-通信模块:https://www.jianshu.com/p/cf0f41314a76
个人总结及脑图:
参考资料:
1.RocketMQ源码阅读(四)-消息存储: https://www.jianshu.com/p/6494e33c9b1f
2.从RocketMQ消息持久化设计看磁盘性能瓶颈的突破
个人整理思维图如下:
具体内容可看“空挡”的博客:RocketMQ源码解析(二)-nameserv:https://www.jianshu.com/p/1686fdfc409b
一句话:nameserver的主要功能是负责服务发现。底层实际上就是维护着好几个HashMap,用于记录各种映射关系,比方说集群cluster与broker的关系,比方说topic下有哪些queue。每当有broker发起注册时,nameserver就会去更新相关的映射关系和心跳。
RouteInfoManager主要由五个HashMap组成,缓存着topic、queue、broker和filterServer的关联关系。
本人对这五个map进行了理解与整理,整理结果如下图所示:
list < QueueData>
Set<String> brokerNameSet
中List<BrokerData> brokerDataList
中List<String> filterServerList
,将其存入到 filterServerMap
中。具体内容可看“空挡”的博客:RocketMQ源码解析(三)-Producer:https://www.jianshu.com/p/02dbc0710f80
MQClientInstance启动时,会开启client的定时任务:定时任务功能是依靠JUC的ScheduledExecutorService线程池来定时创建线程。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//定时从nameServer获取路由信息并更新
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
详见:RocketMQ源码分析之消费者https://blog.csdn.net/xiaoye319/article/details/102953592
rebalance相关部分:https://www.jianshu.com/p/dd2202cc22ea 这篇有提及。
队列分配策略:
Rocketmq之消息队列分配策略:https://blog.csdn.net/yewandemty/article/details/81989695
consumer中的组件较多,刚开始看很混乱,花了大概4天慢慢理清关系,虚线部分是个人理解的组件间的关系图,实现部分和之前的一样,是代码的流程关系。
重试机制如下:
失败重试延时表:
重复次数 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
延迟时间 | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
TODO 补充到第六点的图里。
broker的一大功能“存储功能”第二大点已经讲述,剩余的部分并不多,主要就是几个netty server处理producer和consumer请求的处理器。
rocketmq原理_RocketMQ原理:RocketMQ高可用原理
使用的是DLedger机制,利用的是Raft算法(redis哨兵选举leader也用了这玩意)来实现的故障自动转移。
注意:另一个中间件kafka用的是partition leader选举和controller选举,不太一样。
具体内容可看“空挡”的博客:
RocketMQ源码解析(十二)-顺序消息:https://www.jianshu.com/p/57be402365ee
八股文总结:RocketMQ如何保证消息的顺序性?
总结:
详见:RocketMQ源码解析(十三)-事务消息:https://www.jianshu.com/p/8831a717edef
rocketmq事务消息机制常用于分布式事务中系统间实现最终一致性,具体实例:
rocketmq方案的缺点在于:
(重要)使用场景:订单系统,创建订单后,如果用户没有按时支付,就需要把之前的订单(支付单)给取消掉。
原理:面试常问Rocketmq延迟消息原理
进阶原理:弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
RocketMQ4.3.0集群搭建和部署rocketMq监控平台
https://blog.csdn.net/crazyman2010/article/details/80962430
https://www.jianshu.com/p/524ef06ce25a
consumer group和topic的关系:
https://my.oschina.net/tantexian/blog/703804
结论:即便是在Cluster模式下,不同消费者Group 消费同一个topic,也都能获得消息。原因在于:
现假设消费者组groupA和groupB订阅了topicA,按照之前consumer的源码解析,会有以下几步:
broker中的SendMessageProcessor、PullMessageProcessor、ConsumerManageProcessor和EndTransactionProcessor就是具体的例子。前者负责与producer通信,后者负责与consumer通信。
可以注册多个,broker就同时注册了多个,broker内含有remotingServer,而remotingServer中的processorTable负责管理processor,每个处理器都有自己的code,当client向server发出请求时,会先根据请求的code从processorTable中取出对应的processor,再由对应的processor进行处理。因此server可以处理不同client的不同请求。
不会收不到对吧?但是实际上我自己实验的时候发现B并没有收到消息,是不是很神奇? 为此我又深入研究了一下消费的相关源码,发现原因出在了我是用的同一台机器开启的消费者A和消费者B,而广播模式下offset是本地持久化,所以A和B共享了同一个偏移量,B重启的时候,偏移量已经被A改变了,所以B收不到消息,所以需要注意如果采用广播模式的话,不同消费者一定不要部署在同一台容器里。
producer:发消息失败重试机制。
broker:消息持久化,刷盘策略。
consumer:通过提交offset。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。