赞
踩
基于 RocketMQ 4.2.0 版本进行的源码分析。
本文讲述 RocketMQ 发送一条普通消息的流程。
我们可以参考官方文档来启动服务:
sh bin/mqnamesrv
sh bin/mqbroker -n localhost:9876
一条消息体最少需要指定两个值:
如下就是创建了一条话题为 “Test”,消息体为 “Hello World” 的消息:
Message msg = new Message( “Test”, “Hello World”.getBytes() );
如果我们想要发送消息呢,我们还需要再启动一个 DefaultProducer (生产者) 类来发消息:
DefaultMQProducer producer = new DefaultMQProducer();
producer.start();
现在我们所启动的服务如下所示:
注意我们上述开启的是单个服务,也即一个 Broker 和一个 Name 服务器,但是实际上使用消息队列的时候,我们可能需要搭建的是一个集群,如下所示:
在 RocketMQ 的设计中,客户端需要首先询问 Name 服务器才能确定一个合适的 Broker 以进行消息的发送:
然而这么多 Name 服务器,客户端是如何选择一个合适的 Name 服务器呢?
首先,我们要意识到很重要的一点,Name 服务器全部都是处于相同状态的,保存的都是相同的信息。在 Broker 启动的时候,其会将自己在本地存储的话题配置文件 (默认位于 $HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步:
由于 Name 服务器每台机器存储的数据都是一致的。因此我们客户端任意选择一台服务器进行沟通即可。
其中客户端一开始选择 Name 服务器的源码如下所示:
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private static int initValueIndex() { Random r = new Random(); return Math.abs(r.nextInt() % 999) % 999; } private Channel getAndCreateNameserverChannel() throws InterruptedException { // ... for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) return channelNew; } // ... } }
以后,如果 namesrvAddrChoosed 选择的服务器如果一直处于连接状态,那么客户端就会一直与这台服务器进行沟通。否则的话,如上源代码所示,就会自动轮寻下一台可用服务器。
当客户端发送消息的时候,其首先会尝试寻找话题路由信息。即这条消息应该被发送到哪个地方去。
客户端在内存中维护了一份和话题相关的路由信息表 topicPublishInfoTable,当发送消息的时候,会首先尝试从此表中获取信息。如果此表不存在这条话题的话,那么便会从 Name 服务器获取路由消息。
public class DefaultMQProducerImpl implements MQProducerInner {
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// ...
}
}
当尝试从 Name 服务器获取路由信息的时候,其可能会返回两种情况:
这个话题是新创建的,Name 服务器不存在和此话题相关的信息:
话题之前创建过,Name 服务器存在此话题信息:
服务器返回的话题路由信息包括以下内容:
“broker-1”、”broker-2” 分别为两个 Broker 服务器的名称,相同名称下可以有主从 Broker,因此每个 Broker 又都有 brokerId 。默认情况下,BrokerId 如果为 MixAll.MASTER_ID (值为 0) 的话,那么认为这个 Broker 为 MASTER 主机,其余的位于相同名称下的 Broker 为这台 MASTER 主机的 SLAVE 主机。
public class MQClientInstance {
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
}
return null;
}
}
每个 Broker 上面可以绑定多个可写消息队列和多个可读消息队列,客户端根据返回的所有 Broker 地址列表和每个 Broker 的可写消息队列列表会在内存中构建一份所有的消息队列列表。之后客户端每次发送消息,都会在消息队列列表上轮循选择队列 (我们假设返回了两个 Broker,每个 Broker 均有 4 个可写消息队列):
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
}
在确定了 Master Broker 地址和这个 Broker 的消息队列以后,客户端才开始真正地发送消息给这个 Broker,也是从这里客户端才开始与 Broker 进行交互:
这里我们暂且先忽略消息体格式的具体编/解码过程,因为我们并不想一开始就卷入这些繁枝细节中,现在先从大体上了解一下整个消息的发送流程,后续会写专门的文章来说明。
刚才说到,如果话题信息在 Name 服务器不存在的话,那么会使用默认话题信息进行消息的发送。然而一旦这条消息到来之后,Broker 端还并没有这个话题。所以 Broker 需要检查话题的存在性:
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { // ... TopicConfig topicConfig = this.brokerController .getTopicConfigManager() .selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { // ... topicConfig = this.brokerController .getTopicConfigManager() .createTopicInSendMessageMethod( ... ); } } }
如果话题不存在的话,那么便会创建一个话题信息存储到本地,并将所有话题再进行一次同步给所有的 Name 服务器:
public class TopicConfigManager extends ConfigManager { public TopicConfig createTopicInSendMessageMethod(final String topic, /** params **/) { // ... topicConfig = new TopicConfig(topic); this.topicConfigTable.put(topic, topicConfig); this.persist(); // ... this.brokerController.registerBrokerAll(false, true); return topicConfig; } }
话题检查的整体流程如下所示:
当 Broker 对消息的一些字段做过一番必要的检查之后,便会存储到磁盘中去:
发送消息的整体流程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。