赞
踩
在消息发送过程中,生产者从NameServer
中获取到了指定Topic
对应的Broker
信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker
有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker
的呢?
我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl
类中查找:
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
-
- 复制代码
无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:
- // 发送成功的话,isolation传false,失败isolation传true
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
- if (this.sendLatencyFaultEnable) {
- long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
- this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
- }
- }
-
- private long computeNotAvailableDuration(final long currentLatency) {
- for (int i = latencyMax.length - 1; i >= 0; i--) {
- if (currentLatency >= latencyMax[i])
- return this.notAvailableDuration[i];
- }
- return 0;
- }
-
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
- 复制代码
如果Broker
产生故障,那么会创建一个FaultItem
对象记录故障的Broker
,并把结果放进故障规避表faultItemTable
中,数据格式如下:
- "broker-a": {
- // broker名称
- "name": "broker-a",
- "currentLatency": 发送消息消耗的时间,毫秒值之差,
- // 解除规避的时间,绝对时间
- "startTimestamp": 时间戳毫秒值
- },
- "broker-b": {
- // broker名称
- "name": "broker-b",
- "currentLatency": 发送消息消耗的时间,毫秒值之差,
- // 解除规避的时间,绝对时间
- "startTimestamp": 时间戳毫秒值
- }
- 复制代码
发送成功的Broker
设置的故障规避时间为0,发送失败的Broker
将被设置为规避30秒;
在MQFaultStrategy.selectOneMessageQueue()
方法中,我们分三部分来分析如何选择Broker。
- // 轮询的基本套路,一个自增变量
- int index = tpInfo.getSendWhichQueue().incrementAndGet();
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- // 通过对队列数量取模,获取选定的Broker所在的位置
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- // 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
- return mq;
- }
- 复制代码
1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;
2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;
- // 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- // 判断该Broker是否允许写消息
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
- }
- // 返回选中的Broker
- return mq;
- }
- 复制代码
1.从规避列表中找到延时比较低的Broker;
2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;
- return tpInfo.selectOneMessageQueue();
- 复制代码
最后直接轮询一个Broker
直接返回:
- public MessageQueue selectOneMessageQueue() {
- int index = this.sendWhichQueue.incrementAndGet();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- return this.messageQueueList.get(pos);
- }
- 复制代码
该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;
RocketMQ
通过设置故障规避表的方式,把所有的Broker
的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:
1.优先选择不在规避时间范围内的
Broker
;2.如果所有
Broker
都在规避时间内,优先选择延迟低的Broker
;3.如果依然没有选中合适的
Broker
,那么就直接挑一个Broker
来用;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。