赞
踩
生产者发送消息时,需要进行队列选择,不同的重试策略,选择队列方式不一样。
(sendWhichQueueNum++) %(消息队列数量)
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // 1、验证消息 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 2、尝试查找Topic 路由信息:如果本地没有,查询NameServer找到路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; // 3、若是同步消息,需要重试 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 4、根据上次发送失败的Broker名字,选择Message 的发送Queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // 5、记录发送前时间 beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 6、发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 7、记录消息发送后的时间戳 endTimestamp = System.currentTimeMillis(); // 8、故障转移机制时生效 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); } } } } }
当sendLatencyFaultEnable
为false
时,走负载均衡模式。
TopicPublishInfo.selectOneMessageQueue
lastBrokerName: 若有重试,lastBorkerName为上次失败的MqName
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 1、第一次发送消息,根据自增id%队列数 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 2-1、不是第一次发送消息,自增id+1 int index = this.sendWhichQueue.getAndIncrement(); // 2-2、获取所有的消息队列 for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 2-3、遍历过程中,如果拿到了上次brocker,跳过,换一个broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 3、自增取模 return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
1、在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次发送是为null
2、第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模
3、如果消息发送再失败的话,下次进行消息队列选择时规避上次故障的Broker,否则还是很有可能再次失败。
该算法在一次消息发送时,能规避故障的Broker。
考虑以下场景:
该算法在一次消息发送过程中能成功规避故障的Broker,但如果Broker宕机,由于路由算法中的消息队列是按Broker排序的。
如果上一次根据路由算法选择的是宕机的Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗。
因为每次发送消息,需要通过网络发送给Broker,因此损耗。
该模式引入,就是为了解决负载均衡模式不足的。
MQFaultStrategy:如果能引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中,就可以避免负载均衡模式
的问题。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 1、开启故障转移机制 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); /** * - null == lastBrokerName,说明没问题,直接根据退出 * - broker故障了,但是【可能】已经恢复了。 */ if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 2、使用正常模式下的队列选择方式 return tpInfo.selectOneMessageQueue(lastBrokerName); }
消息发送端采用重试机制,默认重试两次。
1、在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。
2、第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模
3、如果消息发送再失败的话,该消息进行重试时,重新选择消息队列选择时规避上次故障的Broker。
4、假设有一个BrokerA故障了,还没恢复。准备第二条消息时,进行队列选择,又选择了故障的BrokerA,然后往这个故障的BrokerA发送消息,结果又失败了,然后继续进行重试…。
第一条消息次已经失败了,第二条消息其实可以把BrokerA规避掉的。
RocketMQ的故障转移机制就是在BrokerA 已经故障了,发送不了消息了的时候,给BrokerA一个故障恢复时间, 在这段时间内,发送消息就不选择发送到这台Broker上。
String[] brokersSent = new String[timesTotal]; // 重试 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 发送前的时间 beginTimestampPrev = System.currentTimeMillis(); // 往Broker发发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 发送后的时间, endTimestamp = System.currentTimeMillis(); // (endTimestamp - beginTimestampPrev)算出花费时间 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // 其他流程 }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 如果开启了故障转移,才会有下面的逻辑
if (this.sendLatencyFaultEnable) {
// 根据特定值,给出一个【延迟时间】
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
/** * * @param name :brokerName * @param currentLatency:发送消息到Broker的花费时间,毫秒 * @param notAvailableDuration:多久不可用时间,根据【经验值】算出来的,毫秒 */ @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 1、根据BrokerName在Map中查找是否有,若不为空,说明之前就故障过 FaultItem old = this.faultItemTable.get(name); // 2、第一次故障 if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); // 3、开始aviable的时间戳 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); // 4、返回为空,说明里面没有值 old = this.faultItemTable.putIfAbsent(name, faultItem); // 5、双重检验锁,解决并发问题。不为空,说明被其他线程更新了值。这里需要重新设置 if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { // 2、之前故障过,重新设置值 old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
判断Avaible
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
// 当前时间超过延迟级别窗口
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。