赞
踩
下载地址:https://github.com/apache/rocketmq
git clone https://github.com/apache/rocketmq.git
1)在下载的rocketmq根目录创建新文件夹conf
2)把 rocketmq\distribution\conf 下的 broker.conf, broker-a.properties, logback_broker.xml 这几个文件copy到新建的conf目录下,并更新 broker-a.properties 文件的内容(红框部分为新添加):
用于nameserver启动时加载相应的配置。
在 namesrv 模块中找到 NamesrvStartup.java,运行前先配置run configurations, 在环境遍历中配置ROCKETMQ_HOME
nameserver启动成功后,会输出:
The Name Server boot success. serializeType=JSON
从broker模块中,找到 BrokerStartup.java 文件,配置其run configurations:
1)配置其Arguments启动配置
这个配置里,包括了上面修改的 namesrvAddr 等。
2)配置启动环境变量:
然后运行。运行结果输出:
The broker[broker-a, 10.91.199.184:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
在rocketmq-example模块中,找到Producer.java,添加nameserver的地址(在前面的broker-a.properties中配置过):
运行,输出结果:
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C696633809874997A0000, offsetMsgId=0A5BC7B800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499E50001, offsetMsgId=0A5BC7B800002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499EB0002, offsetMsgId=0A5BC7B800002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F00003, offsetMsgId=0A5BC7B800002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F60004, offsetMsgId=0A5BC7B800002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499F90005, offsetMsgId=0A5BC7B800002A9F00000000000003B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C69663380987499FC0006, offsetMsgId=0A5BC7B800002A9F0000000000000474, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A000007, offsetMsgId=0A5BC7B800002A9F0000000000000532, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A020008, offsetMsgId=0A5BC7B800002A9F00000000000005F0, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000013F6C6966338098749A060009, offsetMsgId=0A5BC7B800002A9F00000000000006AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=2]
14:29:38.465 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:38.472 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.91.199.184:10911] result: true
在rocketmq-example模块中,找到 Consumer.java,添加NameServer地址:
然后运行,结果如下:
Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778314, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778374, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C696633809874997A0000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778411, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778414, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000017C, commitLogOffset=380, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499EB0002, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_6 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778432, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778433, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000532, commitLogOffset=1330, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A000007, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778405, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778409, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499E50001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_4 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1648621778416, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778420, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F000000000000023A, commitLogOffset=570, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F00003, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_5 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778422, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778424, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000002F8, commitLogOffset=760, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F60004, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_9 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1648621778434, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778436, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000005F0, commitLogOffset=1520, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A020008, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_7 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778428, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778430, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F0000000000000474, commitLogOffset=1140, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499FC0006, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_10 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1648621778438, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778439, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000006AE, commitLogOffset=1710, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C6966338098749A060009, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]]
ConsumeMessageThread_please_rename_unique_group_name_4_8 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=1, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1648621778425, bornHost=/10.91.199.184:62230, storeTimestamp=1648621778427, storeHost=/10.91.199.184:10911, msgId=0A5BC7B800002A9F00000000000003B6, commitLogOffset=950, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1648622132214, UNIQ_KEY=7F0000013F6C69663380987499F90005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]]
1)broker:broker模块
2)client:消息客户端,包括生产者、消费者等
3)common:通用包
4)dev:开发者信息 (非源码)
5)distribution:部署实例文件夹(非源码)
6)example:RocketMQ示例代码
7)filter:消息过滤相关
8)filtersrv:消息过滤服务器实现
9)logging:日志相关
10)namesrv:NameServer相关实现
11)openmessaging:消息开放标准
12)remoting:基于Netty的RPC通信
13)srvutil:服务端工具类
14)store:消息存储实现
15)test:测试相关类
17)tools:监控命令相关实现
NameServer替代了ZK, 负责RocketMQ的路由管理、服务注册与发现。
NameServer负责管理和维护broker集群的节点:
1)Broker在启动时,会向所有的NameServer进行注册。
2)Producer在发消息之前,先从NameServer中获取Broker集群节点列表,然后根据某种负载均衡算法,从列表中选一台broker对其发送消息。
3)NameServer与每台broker保持长连接,并间隔每30s检测一次broker是否存活,如果发现broker已经死机,则从路由注册表中将其移除。但移除后,并不会把变化马上同步给producer,这样设计是为了降低NameServer的实现复杂度。
4)NameServer本身的高可用,是通过部署多台NameServer来实现,但他们彼此互补通信,所以他们的数据也不是强一致,这样做也是为了追求设计上的简单高效。
入口:rocketmq-namesrv模块的NamesrvStartup.java main函数
NamesrvStartup.main0 --> NamesrvController controller = createNamesrvController(args) :
1、NamesrvConfig:NameServer的业务配置参数
其属性值包括:
1)rocketmqhome:rocketmq的主目录,可以通过 -Drocketmq.home.dir=path 或 设置环境变量 ROCKETMQ_HOME来配置RocketMQ的主目录
2)kvConfigPath:NameServer存储KV配置属性的持久化路径
3)configStorePath:NameServer默认配置文件路径,默认不生效。NameServer启动时如果要通过配置文件配置NameServer属性的话,可使用-c选项
4)orderMessageEnable:是否支持顺序消息,默认为false
2、NettyServerConfig:NameServer的网络配置参数
看其启动参数 -c 后面是否有带配置文件
1)listenPort:NameServer的监听端口,默认会被初始化为9876
2)serverWorkerThreads:Netty处理业务的线程池中的线程个数
3)serverCallbackExecutorThreads:Netty public 任务线程池中的线程个数,RocketMQ的netty会对不同的业务创建不同独立的线程池,如消息发送、消息消费、心跳检测等。但如果该业务类型(RequestCode)未注册线程池,则在public线程池中执行。
4)serverSelectorThreads:IO线程池中的线程个数,这些线程在netty中负责解析请求包,然后转发到各业务线程池(worker)中进行业务处理,然后再把结果返回给调用方。
5)serverOnewaySemaphoreValue:send oneway消息请求的并发度(broker的参数)
6)serverAsyncSemaphoreValue:异步消息发送最大并发度(Broker的参数)
7)serverChannelMaxIdelTimeSeconds:网络连接最大空闲时间,默认120s,如果连接空闲时间超过该参数设置的值,则连接会被关闭。
8)serverSocketSndBufSize:网络socket发送缓冲区大小,默认64k
9)serverSocketRcvBufSize:网络socket接收缓冲区大小,默认64k
10)serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,默认开启
11)useEpollNativeSelector:是否启用Epoll IO模型,Linux环境建议开启
NamesrvController实例作为NameServer的核心控制器
NamesrvStartup.main0 --> start(controller) --> boolean initResult = controller.initialize():
1、加载KV配置到内存HashMap,即
kvConfigManager.load()
,创建NettyServer网络处理对象,即
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
然后开启两个定时任务,用于做心跳检测:
定时任务1:每隔10s检测一次broker,如broker不心跳则摘除。即:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
定时任务2:每隔10分钟打印一次KV配置,即:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
controller.start();
controller的shutdown方法:
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
通过钩子来关闭线程池,是一种优雅的方式。
NameServer的主要作用:
1、为Producer和Consumer提供关于topic的路由信息,所以NameServer首先要存储这些路由信息,这就包括了broker的路由注册。
2、管理broker节点,这就包括了broker的路由删除。
NameServer的路由实现类为:RouteInfoManager.java,其存储元数据的代码:
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
1)topicQueueTable:存储 topic 到 队列 的映射,消息发送时,根据此进行负载均衡。
2)brokerAddrTable:存储 brokerName 到 broker基本信息 的映射,broker的基本信息包括:集群名称,集群内所有broker的地址。
3)clusterAddrTable:存储 集群名 到 集群中所有brokerName 的映射。
4)brokerLiveTable:存储 broker地址 到 broker的状态信息 的映射,NameServer每次收到心跳包后,会替换该信息。
5)filterServerTable:存储 broker地址 到 Filter Server 的映射,主要用于类模式消息过滤。
RocketMQ的部署架构模式:
1)一个topic拥有多个消息队列,一个broker为每个topic默认创建4个读队列,4个写队列。
2)多个broker组成一个集群,brokerName由相同的多个broker组成Master-Slave架构,brokerId为0代表master,大于0代表slave。
3)BrokerLiveInfo中的lastUpdateTimestamp存储该broker上次收到的心跳包时间。
NameServer路由关键类的类图:
如RocketMQ 两主两从,其部署图如下:
其对应的数据结构为:
broker启动时,会向NameServer集群中的所有节点发送心跳请求,之后每隔30s发送一次心跳包。
NameServer收到心跳包后,会更新brokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimstamp;然后NameServer每隔10s会扫描一次brokerLiveTable,如果连续120s没有收到某个broker的心跳包,则将该broker从路由信息中移除,并关闭对该broker的连接。
在broker模块的BrokerController中的start()方法:每隔10s发送一次心跳给NameServer:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
其关键的BrokerOuterAPI.registerBrokerAll()方法,把broker注册到了所有的NameServer上
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
对于向某个nameserver注册,采用oneway的方式发送:
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); if (oneway) { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr()); }
心跳包都包含哪些内容呢?在BrokerOuterAPI.java的registerBrokerAll()中:
1、封装了RegisterBrokerRequestHeader,具体包括:
1)brokerAddr:broker地址
2)brokerId:0表示master, 大于0表示slave
3)brokerName
4)clusterName
5)haServerAddr:master地址,初次请求时该值为空,slave向NameServer注册后返回。
2、封装了RegisterBrokerBody,具体包括:
1)filterServerList:消息过滤服务器列表
2)topicConfigWrapper:topic配置。用TopicConfigSerializeWrapper封装了TopicConfigManager中的topicConfigTable, 其中存储了broker启动时默认的一些topic,如MixAll.SELF_TEST_TOPIC, MixAll.DEFAULT_TOPIC, MixAll.BENCHMARK_TOPIC, MixAll.OFFSET_MOVED_EVENT, BrokerConfig#brokerClusterName, BrokerCOnfig#brokerName。
Broker中topic默认存储在 ${Rocket_Home}/store/config/topic.json中。
在namesrv模块,DefaultRequestProcessor.java的processRequest()方法,用于根据请求类型,转发到不同的地方处理:
switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request);
此处关于心跳包的请求类型为REGISTER_BROKER,所以转发到RoutelnfoManager.registerBroker:
先加写锁,防止改RouteInfoManager中的路由表时由于并发导致写入的数据丢失。如果该broker在路由表中不存在,则创建,然后将broker加入到路由信息的broker集合中。
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
维护BrokerData信息,如果该broker不存在,则创建。
registerFirst 为true,表示第一次创建
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
if (MixAll.MASTER_ID == brokerId) {
log.info("cluster [{}] brokerName [{}] master address change from {} to {}",
brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);
}
registerFirst = registerFirst || (null == oldAddr);
如果broker是Master,且broker topic配置信息发生变化或者初次注册,则需要创建或更新 topic路由元数据,即填充topicQueueTable,即为默认topic自动注册路由信息,其中包括MixAll.DEFAULT_TOPIC的路由信息。
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataMap) { queueDataMap = new HashMap<>(); queueDataMap.put(queueData.getBrokerName(), queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData); if (old != null && !old.equals(queueData)) { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old, queueData); } } }
RoutelnfoManager#registerBroker
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
一个broker上会关联多个FilterServer消息过滤服务器。
1、broker与NameServer保持长连接
2、broker的状态,存储在NameServer的brokerLiveTable中,NameServer没收到一个心跳包,都会更新brokerLiveTable中关于broker的状态信息及路由表(topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable)。
3、更新上述路由表采用了读写锁:允许多个发送者并发读,但写时只能串行写。
broker每隔30s向NameServer发送一个心跳包,心跳包中包含brokerId, broker地址,broker名称,broker所属集群名,broker关联的filterServer列表。
NameServer会每隔10s扫描一次brokerLiveTable表,如果BrokerLive的lastUpdateTimstamp的时间戳距离当前时间超过120s,则认为broker失效了,从而移除该broker,并关闭与该broker的连接,同时更新topicQueueTable, brokerAddrTable, brokerLiveTable,filterServerTable。
RocketMQ删除broker的触发场景有2个:
1)10s扫描120s没有心跳的broker,并删除
2)broker正常关闭,会执行unregisterBroker指令。
这两种方式删除broker的路由,都是走同样的代码,及从topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable删除该broker的相关信息。
路由删除的相关代码:RouteInfoManager#scanNotActiveBroker
public int scanNotActiveBroker() { int removeCount = 0; Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); removeCount++; } } return removeCount; }
其中,RemotingUtil.closeChannel(next.getValue().getChannel());是用于关闭netty的nio长连接
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());用于删除该broker的路由表信息:
先加读锁,查此要删除的broker在brokerLiveTable中是否存在:
public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } }
如存在,则通过加写锁删除其broker的路由信息表:
Step1:把brokerAddress 从brokerLiveTable,filterServerTable中 移除
this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } }
Step2:维护brokerAddrTable
brokerAddrTable的数据结构:
HashMap<String /*brokerName*/, BrokerData> brokerAddrTable
BrokerData的数据结构:HashMap<Long /*brokerId*/, String /*broker address*/> broker Address
遍历brokerAddrTable,找到brokerAddrs中的broker,从BrokerData中删除。
Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } }
Step3:根据brokerName,从clusterAddrTable中找到broker,并从集群中移除:
if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } }
Step4:根据brokerName, 遍历所有topic的队列,如果队列中包含了当前broker,则移除:
if (removeBrokerName) { String finalBrokerNameFound = brokerNameFound; Set<String> needRemoveTopic = new HashSet<>(); topicQueueTable.forEach((topic, queueDataMap) -> { QueueData old = queueDataMap.remove(finalBrokerNameFound); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, old); if (queueDataMap.size() == 0) { log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); needRemoveTopic.add(topic); } }); needRemoveTopic.forEach(topicQueueTable::remove); }
Step5:最后释放锁,完成路由删除
} finally {
this.lock.writeLock().unlock();
}
当某broker宕机,如果NameServer通过心跳检测发现后,并不会实时通知每个RocketMQ客户端,而是由客户端定时拉取topic最新的路由时做数据同步。
根据topic拉取路由信息的命令编码为:
GET_ROUTEINTO_BY_TOPIC
代码在:DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
上面代码重点在下面的方法:
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic); if (queueDataMap != null) { topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values())); foundQueueData = true; brokerNameSet.addAll(queueDataMap.keySet()); for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; // skip if filter server table is empty if (!filterServerTable.isEmpty()) { for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); // only add filter server list when not null if (filterServerList != null) { filterServerMap.put(brokerAddr, filterServerList); } } } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
首先通过从RouterInfoManager的topicQueueTable, brokerAddrTable, filterServerTable中分别填充TopicRouteData中的List<QueueData>, List<BrokerData>, flterServer地址表
然后如果找到topic对应的路由信息的topic为顺序消息,则从NameServer VKconfig中获取关于顺序消息相关的配置,来填充路由信息。
NameServer的路由注册、删除机制:
RocketMQ支持三种消息发送:
1)同步发送:Producer发消息时,同步等待,串行发送
2)异步发送:Producer发消息是,不阻塞,仅指定消息成功后的回调方法,然后立即返回,最后发送成功还是失败,在新的线程中执行返回。
3)单向(Oneway)发送:Producer发消息时,不等待服务器返回结果,直接返回,也不注册回调函数,即不在乎消息是否成功发到了服务端。
在common模块的Message.java
消息类基本属性包括:
1)String topic
2)消息 int flag
3)扩展属性 Map<String, String> properties
4)消息体 byte[] body
5)事务ID String transactionId
Message的构造器为:
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0) {
this.setTags(tags);
}
if (keys != null && keys.length() > 0) {
this.setKeys(keys);
}
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
消息的扩展属性,及存储在Map<String, String> properties中,主要包括:
1)tag:用于消息过滤
2)keys:message的索引key, 多个用空格分隔,可根据key快速查找到消息
3)waitStoreMsgOK:发消息时,是否等消息在broker存储完成后再返回
4)delayTimeLevel:消息延迟级别,用于定时消息或消息重试
Producer的代码都在client模块,对于RocketMQ来说,他就是客户端。
client模块中的DefaultMQProducer.java是默认的Producer实现类,他继承自MQProducer类,并实现了MQAdmin接口,其主要方法如下:
MQAdmin的方法:
1)创建topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException;
2)根据时间戳从队列中查找其偏移量
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
3)查找该消息队列中最大的物理偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
4)查找该消息队列中最小的物理偏移量
long minOffset(final MessageQueue mq) throws MQClientException;
5)查找消息队列中最早的消息
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
6)根据消息偏移量查找消息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
7)根据下面条件查找消息
topic, 消息索引key, 本次取出最多消息条数maxNum, 开始时间begin,结束时间end
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
MQProducer的方法:
1)查找topic下所有消息队列:
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
2)同步发送消息:
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
3)同步发消息,超时抛异常
SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
4)异步发消息,sendCallback为发送成功后的回调方法
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
5)异步发消息,超时抛异常
void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;
6)单向发消息,不管是否接收成功,发出去后立即返回
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
7)同步发消息到指定消息队列
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
8)异步发消息到指定消息队列
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
9)单向发消息到指定队列
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
10)同步发送消息,并指定负载均衡算法
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
11)同步批量发送消息
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
DefaultMQProducer的核心属性:
1)生产者组 private String producerGroup:
对于事务消息,消息服务器在回查事务时,会随机选择生产者组中的任何一个生产者发起事务回查请求
2)默认topic key private String createTopicKey:
仅用于测试或demo
3)topic在每个broker上默认的队列数量private volatile int defaultTopicQueueNums = 4;
4)发消息默认超时时间,单位毫秒:private int sendMsgTimeout = 3000;
5)消息体大小超过此阈值后启动压缩,默认4K:private int compressMsgBodyOverHowmuch = 1024 * 4;
6)同步发送消息重试次数,默认为2,即共执行3次:private int retryTimesWhenSendFailed = 2;
7)异步发送消息重试次数,默认为2,即共执行3次:private int retryTimesWhenSendAsyncFailed = 2;
8)消息重试时选择另一个broker时,是否不等待存储结果就返回,默认为false:private boolean retryAnotherBrokerWhenNotStoreOK = false;
9)允许发送的最大消息长度,默认为4M,最大值为2^32-1:private int maxMessageSize = 1024 * 1024 * 4;
Producer的启动入口在DefaultMQProducerImpl#start
Step1:检验producerGroup是否符合要求,并把生产者的instanceName改为进程ID
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
Step2:创建MQClientInstance实例,同一个clientId只有一个MQClientInstance。这是通过存到了缓存表:
ConconrrentMap<String /*clientId*/, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>()中。
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
生成clientId的方法:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
clientId为 ip + instance + unitname(可选),为了避免同一个服务器上部署两个producer实例时,其clientId相同,所以RocketMQ设计时,自动将Instance设置为进程ID。
Step3:把producer加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
Step4:启动MQClientInstance:
if (startFactory) {
mQClientFactory.start();
}
消息发送流程包括:消息校验、查找路由、消息发送 三步
代码在DefaultMQProducer#send
校验消息的topic是否合法(不能跟系统的topic重复),消息体长度大于0且小于4M
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
向topic发送消息前,先要获取到topic的路由信息,即topic在哪个broker节点上。
代码在:DefaultMQProducerImpl#tryToFindTopicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
如果producer中已经缓存了topic的路由信息,且路由信息中包含了消息队列,则直接返回路由信息;如果没有缓存或包含消息队列,则向NameServer查询该topic的路由信息,然后存到TopicPublishInfo。
下面看下TopicPublishInfo的属性:
1)是否是顺序消息
private boolean orderTopic = false
2)topic对应的消息队列
private List messageQueueList = new ArrayList();
3)是否有topic路由信息
private boolean haveTopicRouterInfo = false
4)topic路由信息
private TopicRouteData topicRouteData;
5)每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0。用于选择消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
TopicRouteData 类的属性:
1)topic队列源数据:
private List queueDatas;
2)topic对应的broker的元数据
private List brokerDatas;
3)broker上过滤服务器的地址列表
private HashMap<String/* brokerAddr /, List/ Filter Server */> filterServerTable;
发消息时,如果本地没有缓存topic的路由信息,则从NameServer查询,如果路由信息未找到,则尝试用默认topic DefaultMQProducerImpl#createTopicKey去NameServer模块查询,如果autoCreateTopicEnable为true, 则NameServer将返回路由信息;如果autoCreateTopicEnable为false,则抛出异常。
从NameServer获取路由信息的client模块的代码:
MQClientInstance#updateTopicRouteInfoFromNameServer
Step1: 如果使用默认topic去查询,如果查询到路由信息,则替换路由信息中读写队列的个数为producer的默认队列个数
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
Step2:如果找到路由信息,与本地缓存中的对比,如果发生了变化,则更新
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
Step3:更新MQClientInstance Broker地址缓存表
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
Step4:遍历从NameServer获取到的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData;根据brokerName找到brokerData信息,找不到或没有找到master节点,则遍历下一个DataQueue; 最后根据写队列的个数遍历,根据topic+序号创建MessageQueue,填充topicPublishInfo中的List<QueueMessage>, 从而完成消息发送的路由查找。
List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } } } info.setOrderTopic(false);
消息的producer有发送重试机制,由DefaultMQProducer.retryTimesWhenSendFailed来指定同步发送重试次数。
由retryTimesWhenSendAsyncFailed指定异步重试次数。
选择消息队列有两种方式:
1)sendLatencyFaultEnable=false:默认不启用broker故障延迟机制
代码在TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
lastBrokerName是上次选择执行发送失败的broker。第一次执行消息队列选择时,lastBrokerName为null, 此时用sendWhichQueue自增,再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue;如果消息发送失败,则下次进行消息队列选择时,规避掉上次MessageQueue所在的broker,这是为了避免再次失败。
之所以broker不可用,还没有从发送列表被清除,是因为NameServer发现broker不可用,摘除后,不会立即同步给Client, 直到client定时从NameServer同步时才能知道。为了解决这个问题,RocketMQ引入了一种机制,在broker宕机期间,如果一次发消息失败后,可以将该broker暂时排除在消息队列的选择范围中。
2)sendLatencyFauleEnable=true:启用broker故障延迟机制
代码在:MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
1)根据对消息队列轮训,获取到一个消息队列
2)验证该消息队列是否可用:latencyFaultTolerance.isAvailable(mq.getBrokerName())
如果可用,则从latencyFaultTolerance删除该topic,表示该broker已经恢复。
broker故障延迟机制的核心类:
消息发送的API核心入口:DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
...
}
1)Message msg:待发送消息
2)MessageQueue mq:将消息发送到该消息队列
3)CommunicationMode communicationMode:发送模式:同步、异步、单向
4)SendCallback sendCallback:异步回调函数
5)TopicPublishInfo topicPublishInfo:topic路由信息
6)long timeout:消息发送超时时间
接下来看发送过程:
Step1:从消息队列中获取broker网络地址
如果MQClientInstance的brokerAddrTable未缓存该地址,则调用NameServer主动更新topic的路由信息。如果更新后还是找不到Broker信息,则抛出MQClientException,提示broker不存在。
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
Step2:为消息分配全局唯一ID,如果消息体超过4K,则会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
//for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。