赞
踩
1.消息发送失败了怎么办(网络原因,broker挂掉)?发送端如何实现的高可用?
2.消息队列是如何选择的,即producer向哪个消息队列里发送消息?
3.为什么要单独设计一个broker故障延迟机制呢?
生产者在发送消息的时候,3种通信模式默认都不进行重试(同步、异步、oneway)。
消息重试原则上是可以保证消息发送成功并且不丢失,但是消息重试可能会造成消息重复消费问题,所以Rocketmq是不保证消息幂等性的,所以开发者需自行保证幂等性。
Rocketmq开启消息重试配置:
// 消息发送失败重试次数,默认2次
producer.setRetryTimesWhenSendFailed(2);
// 消息重试配置,消息没有存储成功是否发送到另外一个broker
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
同步发送消息失败后进行重试,这里的重试方式基本上可以理解为重试别的Broker,该方式只有同步发送才有。具体逻辑在消息发送主逻辑方法sendDefaultImpl
中。
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //省略代码 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; //记录重试时候发送消息目标Broker名字的数组 String[] brokersSent = new String[timesTotal]; //进行重试次数的循环发送消息逻辑 for (; times < timesTotal; times++) { //选择一个(brokerName!=lastBrokerName)的消息队列,此处进行重试别的broker MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); //发送消息 //省略代码 switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: //发送的结果如果不是 SEND_OK ,如果开启了重试,则进行选择别的Broker进行重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } //省略代码 }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //如果lastBrokerName为空,说明第一次正常发送 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果lastBrokerName不为空,说明进行了重试 for (int i = 0; i < this.messageQueueList.size(); i++) { //获取一个队列(brokerName!=lastBrokerName)的消息队列 int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //如果还没有获取到消息对接,则再进行一次选择 return selectOneMessageQueue(); } }
异步模式下发送消息重试。这个重试次数是有Producer设置的retryTimesWhenSendAsyncFailed
,具体重试逻辑是在MQClientAPIImpl
,方法为onExceptionImpl
。
private void onExceptionImpl(final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int timesTotal, final AtomicInteger curTimes, final Exception e, final SendMessageContext context, final boolean needRetry, final DefaultMQProducerImpl producer ) { //当前重试次数自增+1 int tmp = curTimes.incrementAndGet(); //是否需要重试&&当前重试次数<=配置的全局重试次数 if (needRetry && tmp <= timesTotal) { //默认,它仍然发送给同一个broker,即上一次发送失败的broker String retryBrokerName = brokerName;//by default, it will send to the same broker //如果topic订阅信息不为空 if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send //选择一个MessageQueue MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); // retryBrokerName = mqChosen.getBrokerName(); } //获取对应的broker地址 String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); //重新异步 sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { try { //上述重试无效,则执行异常的回调 sendCallback.onException(e); } catch (Exception ignored) { } } }
选择一个消息队列的逻辑是在selectOneMessageQueue
中,大致流程如下。
latencyFaultTolerance
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否开启broker故障延迟机制,默认为false-不开启 if (this.sendLatencyFaultEnable) { try { //选择一个消息队列 int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判断brokerName是否可用,latencyFaultTolerance内部维护一个FaultItem的Map if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) //如果可用,则返回 return mq; } //从容错信息FaultItem的Map中取一个Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //获取该broker的可写队列queueId int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); //如果有queueId>0 if (writeQueueNums > 0) { //获取一个队列 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { //设置brokerName mq.setBrokerName(notBestBroker); // 队列重置 mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //随机选择一个队列 return tpInfo.selectOneMessageQueue(); } //选择一个消息队列 return tpInfo.selectOneMessageQueue(lastBrokerName); }
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
public boolean isAvailable() {
//当前时间-可用时间
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); // 将faultItemTable里的元素全放到list中 while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { // 先给打乱再排序 Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { // 只有一个元素的情况 return tmpList.get(0).getName(); } else { // 根据half取余 final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
updateFaultItem
回顾一下消息发送的流程,在消息发送异常时会调用updateFaultItem
来更新broker异常信息,下面我们来具体分析一下。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { //是否开启延迟故障 默认false if (this.sendLatencyFaultEnable) { //根据消息当前延迟currentLatency计算broker故障延迟的时间duration long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); //更新故障记录 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } // private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; private long computeNotAvailableDuration(final long currentLatency) { //举个例子:发送消息延迟为4000L,则对应的延迟为3000L,认为broker不可用时间为180000L for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
由此分析延迟机制主要分2步
LatencyFaultTolerance
LatencyFaultTolerance是用来判断broker是否可用的。默认实现LatencyFaultToleranceImpl
。内部维护了一个FaultItem
的map,如果开启故障延迟机制,则会以brokerName为key,FaultItem为value添加一条记录,表示着在某个时刻之前,这个brokerName都会标记为故障。
FaultItem
是内部类。主要有三个属性
参数 | 类型 | 说明 |
---|---|---|
name | Sting | brokerName |
currentLatency | long | 发送消息的延迟时间 |
startTimestamp | long | 在这个时刻之前,这个brokerName都标记为故障 |
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { //根据BrokerName获取对应的故障信息 FaultItem old = this.faultItemTable.get(name); //如果为空 if (null == old) { //创建FaultItem对象 final FaultItem faultItem = new FaultItem(name); //设置当前发送延迟时间 faultItem.setCurrentLatency(currentLatency); //设置下次可用的时间 = 现在的时间 + 延迟的时间 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //放入map中 old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。