当前位置:   article > 正文

浅谈如何解决RocketMQ消息堆积的问题_rocketmq消息堆积解决方案

rocketmq消息堆积解决方案

        MQ消息堆积是指生产者发送的消息短时间内在Broker端大量堆积,无法被消费者及时消费,从而导致业务功能无法正常使用。

        消息堆积常见于以下几种情况:

(1)新上线的消费者功能有BUG,消息无法被消费。

(2)消费者实例宕机或因网络问题暂时无法同Broker建立连接。

(3)生产者短时间内推送大量消息至Broker,消费者消费能力不足。

(4)生产者未感知Broker消费堆积持续向Broker推送消息。

         解决上述问题就要做到:

(1)解决问题一,要做好 灰度发布。每次新功能上线前,选取一定比例的消费实例做灰度,若出现问题,及时回滚;若消费者消费正常,平稳运行一段时间后,再升级其它实例。如果需要按规则选出一部分账号做灰度,则需要做好消息过滤,让正常消费实例排除灰度消息,让灰度消费实例过滤出灰度消息。

(2)解决问题二,要做到 多活。极端情况下,当一个IDC内消费实例全部宕机时,需要做到让其他IDC内的消费实例正常消费消息。同时,若一个IDC内Broker全部宕机,需要支持生产者将消息发送至其它IDC的Broker。

(3)解决问题三,要 增强消费能力。增强消费能力,主要是增加消费者线程数或增加消费者实例个数。增加消费者线程数要注意消费者及其下游服务的消费能力,上线前就要将线程池参数调至最优状态。增加消费者实例个数,要注意Queue数量,消费实例的数量要与Queue数量相同,如果消费实例数量超过Queue数量,多出的消费实例分不到Queue,只增加消费实例是没用的,如果消费实例数量比Queue数量少,每个消费实例承载的流量是不同的。

(4)解决问题四,要做到 熔断与隔离。当一个Broker的队列出现消息积压时,要对其熔断,将其隔离,将新消息发送至其它队列,过一定的时间,再解除其隔离。

        看微众是如何解决这些问题的:

灰度发布

        RocketMQ支持两种消息过滤方式:一种是RocketMQ自带的Tag、SQL92过滤,在消费者订阅Topic的时候指定Tag或SQL92表达式;另一种是FilterServer机制,需要上传自定义的ClassFilter类至Broker所在的服务器,Consumer从FilterServer拉取消息,FilterServer负责将请求转发给Broker并从Broker获取消息后根据上传的Java程序做过滤,将过滤后的消息返回给Consumer,FilterServer机制可以充分利用Broker所在服务器的CPU减少网卡流量,且用Java程序写过滤逻辑比较灵活,但要注意代码的质量,避免过渡消耗Broker所在服务器的CPU跟内存。灰度发布一般灰度规则不会太复杂,使用Tag或SQL92过滤即可。

        Tag过滤机制,首先会在Broker上对Tag做一次hash值过滤,将不必要的消息过滤出去,不让Consumer拉取,减轻网络传输的压力,为避免因Hash冲突造成的过滤不全的问题,再在客户端做一次按内容匹配的过滤。

        Broker端过滤逻辑如下:

  1. @Override
  2. public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
  3. if (null == tagsCode || null == subscriptionData) {
  4. return true;
  5. }
  6. if (subscriptionData.isClassFilterMode()) {
  7. return true;
  8. }
  9. return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
  10. || subscriptionData.getCodeSet().contains(tagsCode.intValue());
  11. }

        客户端过滤逻辑如下:

  1. public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
  2. final SubscriptionData subscriptionData) {
  3. PullResultExt pullResultExt = (PullResultExt) pullResult;
  4. this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
  5. if (PullStatus.FOUND == pullResult.getPullStatus()) {
  6. ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
  7. List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
  8. List<MessageExt> msgListFilterAgain = msgList;
  9. if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
  10. msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
  11. for (MessageExt msg : msgList) {
  12. if (msg.getTags() != null) {
  13. if (subscriptionData.getTagsSet().contains(msg.getTags())) {
  14. msgListFilterAgain.add(msg);
  15. }
  16. }
  17. }
  18. }
  19. //...
  20. return pullResult;
  21. }

        SQL92过滤机制,需要设置属性enablePropertyFilter=true,与Tag不同,sql过滤逻辑只在Broker上,部分逻辑如下:

  1. @Override
  2. public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
  3. //...
  4. Object ret = null;
  5. try {
  6. MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
  7. ret = realFilterData.getCompiledExpression().evaluate(context);
  8. } catch (Throwable e) {
  9. log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
  10. }
  11. //...
  12. return (Boolean) ret;
  13. }

多活

        要满足IDC内Broker集群宕机后,Producer可以把消息发送到其它IDC的Broker集群,就要保证每个子系统的应用实例(客户端)至少连接两台不同IDC的Broker,并且为保证消息传递速度,默认在正常状态下Producer只将消息发送至与其在同一个IDC的Broker。

        微众在RocketMQ基础上增加了HealthyMessageQueueSelector, Broker Cluster及Broker均使用IDC-命名,Producer启动时,会启动一个调度任务将同一个IDC下的Broker缓存至 HealthyMessageQueueSelector,生产者通过HealthyMessageQueueSelector缓存的Broker来优先选择本IDC的Queue,逻辑如下:

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. public MessageQueue select(List<MessageQueue> mqs, Message msg, final Object selectedResultRef) {
  4. //...
  5. boolean pub2local = MapUtils.getBoolean(sendNearbyMapping, msg.getTopic(), Boolean.TRUE);
  6. MessageQueue lastOne = ((AtomicReference<MessageQueue>) selectedResultRef).get();
  7. if (pub2local) {
  8. List<MessageQueue> localMQs = new ArrayList<>();
  9. List<MessageQueue> remoteMqs = new ArrayList<>();
  10. HashMap<String, Integer> localBrokerMQCount = separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs);
  11. for (String brokerName : localBrokerMQCount.keySet()) {
  12. //if MQ num less than threshold, send msg to all broker
  13. if (localBrokerMQCount.get(brokerName) <= minMqCountWhenSendLocal) {
  14. localMQs.addAll(remoteMqs);
  15. }
  16. }
  17. //try select a mq from local idc first
  18. MessageQueue candidate = selectMessageQueue(localMQs, lastOne, msg);
  19. if (candidate != null) {
  20. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  21. LOGGER.debug("select local mq [{}], {}", candidate.toString(), msg);
  22. return candidate;
  23. }
  24. //try select a mq from other idc if cannot select one from local idc
  25. candidate = selectMessageQueue(remoteMqs, lastOne, msg);
  26. if (candidate != null) {
  27. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  28. LOGGER.debug("select remote mq [{}], {}", candidate.toString(), msg);
  29. return candidate;
  30. }
  31. } else {
  32. //try select a mq from all mqs
  33. MessageQueue candidate = selectMessageQueue(mqs, lastOne, msg);
  34. if (candidate != null) {
  35. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  36. LOGGER.debug("select global mq [{}], {}", candidate.toString(), msg);
  37. return candidate;
  38. }
  39. }
  40. //try select a mq which is not isolated if no mq satisfy all limits
  41. for (int j = 0; j < mqs.size(); j++) {
  42. int index = this.getSendIndex(msg.getTopic());
  43. int pos = Math.abs(index) % mqs.size();
  44. MessageQueue candidate = mqs.get(pos);
  45. if (isQueueHealthy(candidate)) {
  46. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  47. LOGGER.debug("select any available mq [{}], {}", candidate.toString(), msg);
  48. return candidate;
  49. }
  50. }
  51. //in case of retry, still try select a mq from another broker if all mq isolated
  52. if (lastOne != null) {
  53. for (int j = 0; j < mqs.size(); j++) {
  54. int index = this.getSendIndex(msg.getTopic());
  55. int pos = Math.abs(index) % mqs.size();
  56. MessageQueue candidate = mqs.get(pos);
  57. if (!lastOne.getBrokerName().equals(candidate.getBrokerName())) {
  58. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  59. LOGGER.debug("select another broker mq [{}], {}", candidate.toString(), msg);
  60. return candidate;
  61. }
  62. }
  63. }
  64. //select a mq from all mqs anyway if no mq satisfy any limits
  65. int index = this.getSendIndex(msg.getTopic());
  66. int pos = Math.abs(index) % mqs.size();
  67. MessageQueue candidate = mqs.get(pos);
  68. ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
  69. LOGGER.debug("select any mq [{}], {}", candidate.toString(), msg);
  70. return candidate;
  71. }

Producer启动时缓存localBroker逻辑如下:

  1. private void updateLocalBrokers(ClusterInfo clusterInfo) {
  2. if (clusterInfo != null) {
  3. String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
  4. HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
  5. Set<String> currentBrokers = new HashSet<String>();
  6. for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
  7. String clusterName = entry.getKey();
  8. String clusterIdc = StringUtils.split(clusterName, DeFiBusConstant.IDC_SEPERATER)[0];
  9. if (StringUtils.isNotEmpty(clusterPrefix) && StringUtils.equalsIgnoreCase(clusterIdc, clusterPrefix)) {
  10. currentBrokers.addAll(entry.getValue());
  11. }
  12. }
  13. if (!currentBrokers.equals(messageQueueSelector.getLocalBrokers())) {
  14. messageQueueSelector.setLocalBrokers(currentBrokers);
  15. LOGGER.info("localBrokers updated: {} , clusterPrefix :{} ", currentBrokers, clusterPrefix);
  16. }
  17. }
  18. }

        要达到一个IDC的Consumer实例全部宕机后,Broker上消息可以由其它IDC的Cousmer实例消费,就要实现本IDC的Queue被本IDC内Consumer实例监听,也要被其它IDC的Consumer实例监听,正常状况下,只允许本IDC的Consumer实例消费消息。要实现这个功能,要调整客户端负载均衡的算法,保证正常情况下只允许本IDC下的Consumer消费。

        微众增加了自己的负载均衡算法AllocateMessageQueueByIDC,客户端以-IDC为后缀命名,客户端及队列均以IDC为分组,同一个IDC下的客户端优先监听同一个IDC的队列:

  1. @Override
  2. public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  3. List<String> cidAll) {
  4. //...
  5. /**
  6. * step1: seperate mqs and cids by idc
  7. */
  8. Map<String /*idc*/, List<MessageQueue>> sepMqs = this.seperateMqsByIDC(mqAll);
  9. Map<String /*idc*/, List<String/*cid*/>> sepClients = this.seperateCidsByIDC(cidAll);
  10. /**
  11. * step2: allocate local mqs first
  12. */
  13. String clusterPrefix = extractIdcFromClientId(currentCID);
  14. if (clusterPrefix != null) {
  15. List<MessageQueue> nearbyMqs = sepMqs.get(clusterPrefix);
  16. List<String> nearbyCids = sepClients.get(clusterPrefix);
  17. if (nearbyMqs != null && nearbyCids != null && !nearbyMqs.isEmpty() && !nearbyCids.isEmpty()) {
  18. Collections.sort(nearbyCids);
  19. Collections.sort(nearbyMqs);
  20. int index = nearbyCids.indexOf(currentCID);
  21. for (int i = index; i < nearbyMqs.size(); i++) {
  22. if (i % nearbyCids.size() == index) {
  23. result.add(nearbyMqs.get(i));
  24. }
  25. }
  26. }
  27. }
  28. /**
  29. * step3: allocate mqs which no subscriber in the same idc
  30. */
  31. List<MessageQueue> mqsNoClientsInSameIdc = new ArrayList<>();
  32. for (String idc : sepMqs.keySet()) {
  33. if (!idc.equals(UNKNOWN_IDC) && (sepClients.get(idc) == null || sepClients.get(idc).isEmpty())) {
  34. mqsNoClientsInSameIdc.addAll(sepMqs.get(idc));
  35. }
  36. }
  37. if (!mqsNoClientsInSameIdc.isEmpty()) {
  38. Collections.sort(mqsNoClientsInSameIdc);
  39. Collections.sort(cidAll);
  40. int index = cidAll.indexOf(currentCID);
  41. for (int i = index; i < mqsNoClientsInSameIdc.size(); i++) {
  42. if (i % cidAll.size() == index) {
  43. result.add(mqsNoClientsInSameIdc.get(i));
  44. }
  45. }
  46. }
  47. /**
  48. * step4: allocate mqs which no matched any cluster and cannot determined idc.
  49. */
  50. if (sepMqs.get(UNKNOWN_IDC) != null && !sepMqs.get(UNKNOWN_IDC).isEmpty()) {
  51. log.warn("doRebalance: cannot determine idc of mqs. allocate all to myself. {}", sepMqs.get(UNKNOWN_IDC));
  52. result.addAll(sepMqs.get(UNKNOWN_IDC));
  53. }
  54. return result;
  55. }

队列及客户端根据IDC分组逻辑如下:

  1. private Map<String, List<MessageQueue>> seperateMqsByIDC(List<MessageQueue> mqAll) {
  2. String topic = mqAll.get(0).getTopic();
  3. TopicRouteData topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
  4. if (topicRouteData == null) {
  5. mqClientInstance.updateTopicRouteInfoFromNameServer(topic);
  6. topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
  7. }
  8. HashMap<String/*brokerName*/, String/*idc*/> brokerIdcMap = new HashMap<>();
  9. ArrayList<BrokerData> brokerDatas = new ArrayList<>(topicRouteData.getBrokerDatas());
  10. for (BrokerData broker : brokerDatas) {
  11. String clusterName = broker.getCluster();
  12. String idc = clusterName.split(DeFiBusConstant.IDC_SEPERATER)[0];
  13. brokerIdcMap.put(broker.getBrokerName(), idc.toUpperCase());
  14. }
  15. Map<String/*IDC*/, List<MessageQueue>> result = new HashMap<>();
  16. for (MessageQueue mq : mqAll) {
  17. String idc = brokerIdcMap.get(mq.getBrokerName());
  18. if (idc == null) {
  19. idc = UNKNOWN_IDC;
  20. }
  21. if (result.get(idc) == null) {
  22. List<MessageQueue> mqList = new ArrayList<>();
  23. mqList.add(mq);
  24. result.put(idc, mqList);
  25. } else {
  26. result.get(idc).add(mq);
  27. }
  28. }
  29. return result;
  30. }
  31. private Map<String, List<String>> seperateCidsByIDC(List<String> cidAll) {
  32. Map<String/* idc */, List<String>> result = new HashMap<>();
  33. for (String cid : cidAll) {
  34. String cidIdc = extractIdcFromClientId(cid);
  35. if (cidIdc != null) {
  36. cidIdc = cidIdc.toUpperCase();
  37. if (result.get(cidIdc) != null) {
  38. result.get(cidIdc).add(cid);
  39. } else {
  40. List<String> cidList = new ArrayList<>();
  41. cidList.add(cid);
  42. result.put(cidIdc, cidList);
  43. }
  44. }
  45. }
  46. return result;
  47. }

动态扩容/缩容

        为了让消费实例的数量与Queue数量相同,微众基于RocketMQ定制化开发了队列随消费实例动态增减的特性。实现方式是动态调整Topic配置的ReadQueueNum和WriteQueueNum,在扩容时,首先增加可读队列个数,保证Consumer先完成监听,再增加可写队列个数,使得Producer可以往新增加的队列发消息。队列缩容与扩容的过程相反,先缩小可写队列个数,不再往即将缩掉的队列发消息,等到Consumer将该队列里的消息全部消费完成后,再缩小可读队列个数,完成缩容过程。

        Broker端消费者注册时,调整队列逻辑:

  1. @Override
  2. public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
  3. //...
  4. DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerGroupInfo;
  5. Set<String> oldSub = deFiConsumerGroupInfo.findSubscribedTopicByClientId(clientChannelInfo.getClientId());
  6. boolean r1 = super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, isNotifyConsumerIdsChangedEnable);
  7. boolean r2 = deFiConsumerGroupInfo.registerClientId(subList, clientChannelInfo.getClientId());
  8. if (r1 || r2) {
  9. adjustQueueNum(oldSub, subList);
  10. if (isNotifyConsumerIdsChangedEnable) {
  11. // this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
  12. asyncNotifyClientChange(group, deFiConsumerGroupInfo);
  13. }
  14. }
  15. //...
  16. return r1 || r2;
  17. }
  18. private void adjustQueueNum(final Set<String> oldSub, final Set<SubscriptionData> subList) {
  19. for (SubscriptionData subscriptionData : subList) {
  20. if (!oldSub.contains(subscriptionData.getTopic())) {
  21. //new sub topic, increase queue num
  22. adjustQueueNumStrategy.increaseQueueNum(subscriptionData.getTopic());
  23. }
  24. }
  25. //...
  26. }

        队列调整逻辑在AdjustQueueNumStrategy类中,新增队列,先调整ReadQueueNum,再调整WriteQueueNum,缩减队列时相反,逻辑如下:

  1. private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) {
  2. //...
  3. switch (scaleType) {
  4. case INCREASE_QUEUE_NUM:
  5. adjustReadQueueNumByConsumerCount(topic, 0, scaleType);
  6. adjustWriteQueueNumByConsumerCount(topic, 10 * 1000, scaleType);
  7. break;
  8. case DECREASE_QUEUE_NUM:
  9. adjustWriteQueueNumByConsumerCount(topic, 0, scaleType);
  10. long delayTimeMinutes = Math.min(deFiBrokerController.getDeFiBusBrokerConfig().getScaleQueueSizeDelayTimeMinute(), 10);
  11. long delayTimeMillis = delayTimeMinutes * 60 * 1000;
  12. adjustReadQueueNumByConsumerCount(topic, delayTimeMillis, scaleType);
  13. break;
  14. }
  15. }

熔断与隔离

        微众银行同样做了MQ熔断机制,实现的逻辑是:当Broker某个队列消息堆积达到预设阈值(或出现异常)后,生产者再向其推送消息,会收到异常响应,然后执行回调方法将该队列放在faultMap中隔离60s,每次消息发送进行queueSelector时,会将过了隔离时间的队列从隔离区移除。

  1. @Override
  2. public void onException(Throwable e) {
  3. try {
  4. MessageQueueHealthManager messageQueueHealthManager
  5. = ((HealthyMessageQueueSelector) messageQueueSelector).getMessageQueueHealthManager();
  6. MessageQueue messageQueue = ((AtomicReference<MessageQueue>) selectorArg).get();
  7. if (messageQueue != null) {
  8. messageQueueSelector.getMessageQueueHealthManager().markQueueFault(messageQueue);//隔离队列
  9. if (messageQueueSelector.getMessageQueueHealthManager().isQueueFault(messageQueue)) {//若超过隔离时间,将队列移除隔离区
  10. LOGGER.warn("isolate send failed mq. {} cause: {}", messageQueue, e.getMessage());
  11. }
  12. }
  13. //logic of fuse
  14. if (e.getMessage().contains("CODE: " + DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG)) {
  15. //first retry initialize
  16. if (queueCount == 0) {
  17. List<MessageQueue> messageQueueList = producer.getDefaultMQProducer().getDefaultMQProducerImpl().getTopicPublishInfoTable()
  18. .get(msg.getTopic()).getMessageQueueList();
  19. queueCount = messageQueueList.size();
  20. String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
  21. if (!StringUtils.isEmpty(clusterPrefix)) {
  22. for (MessageQueue mq : messageQueueList) {
  23. if (messageQueueHealthManager.isQueueFault(mq)) {
  24. queueCount--;
  25. }
  26. }
  27. }
  28. }
  29. int retryTimes = Math.min(queueCount, deFiBusProducer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed());
  30. if (circuitBreakRetryTimes.get() < retryTimes) {
  31. circuitBreakRetryTimes.incrementAndGet();
  32. LOGGER.warn("fuse:send to [{}] circuit break, retry no.[{}] times, msgKey:[{}]", messageQueue.toString(), circuitBreakRetryTimes.intValue(), msg.getKeys());
  33. producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
  34. //no exception to client when retry
  35. return;
  36. } else {
  37. LOGGER.warn("fuse:send to [{}] circuit break after retry {} times, msgKey:[{}]", messageQueue.toString(), retryTimes, msg.getKeys());
  38. }
  39. } else {
  40. int maxRetryTimes = producer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed();
  41. if (sendRetryTimes.getAndIncrement() < maxRetryTimes) {
  42. LOGGER.info("send message fail, retry {} now, msgKey: {}, cause: {}", sendRetryTimes.get(), msg.getKeys(), e.getMessage());
  43. producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
  44. return;
  45. } else {
  46. LOGGER.warn("send message fail, after retry {} times, msgKey:[{}]", maxRetryTimes, msg.getKeys());
  47. }
  48. }
  49. if (sendCallback != null) {
  50. sendCallback.onException(e);
  51. }
  52. } catch (Exception e1) {
  53. LOGGER.warn("onExcept fail", e1);
  54. if (sendCallback != null) {
  55. sendCallback.onException(e);
  56. }
  57. }
  58. }
  59. }

        为记录队列的消息堆积量,微众在Broker上增加了ConsumeQueueWaterMark概念,每次接到生产者发送消息的请求时,校验若当前水位超过了预设阈值,则向生产者端返回CONSUME_DIFF_SPAN_TOO_LONG响应,若当前水位未超阈值,但已经超过了预设的高水位警戒线,则打印错误日志告警。逻辑如下:

  1. @Override
  2. public RemotingCommand processRequest(ChannelHandlerContext ctx,
  3. RemotingCommand request) throws RemotingCommandException {
  4. SendMessageRequestHeader requestHeader = parseRequestHeader(request);
  5. String Topic = requestHeader.getTopic();
  6. int queueIdInt = requestHeader.getQueueId();
  7. if (deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().isRejectSendWhenMaxDepth()
  8. && Topic != null
  9. && !Topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
  10. && !Topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
  11. && !Topic.contains(DeFiBusConstant.RR_REPLY_TOPIC)
  12. && !Topic.startsWith(DeFiBusConstant.RMQ_SYS)) {
  13. long maxQueueDepth = deFiQueueManager.getMaxQueueDepth(Topic);
  14. double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
  15. ConsumeQueueWaterMark minConsumeQueueWaterMark
  16. = deFiQueueManager.getMinAccumulated(Topic, queueIdInt);
  17. if (minConsumeQueueWaterMark != null) {
  18. long accumulate = minConsumeQueueWaterMark.getAccumulated();
  19. if (accumulate >= maxQueueDepth) {
  20. if (System.currentTimeMillis() % 100 == 0) {
  21. LOG.error("Quota exceed 100% for topic [{}] in queue [{}], current:[{}], max:[{}]", Topic, queueIdInt, accumulate, maxQueueDepth);
  22. }
  23. final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
  24. response.setCode(DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG);
  25. response.setRemark(" consume span too long, maybe has slow consumer, so send rejected");
  26. return response;
  27. } else if (accumulate >= maxQueueDepth * highWatermark) {
  28. if (System.currentTimeMillis() % 100 == 0) {
  29. LOG.error("Quota exceed {}% for topic [{}] in queue [{}], current:[{}], max:[{}]", highWatermark * 100, Topic, queueIdInt, accumulate, maxQueueDepth);
  30. }
  31. }
  32. }
  33. //...
  34. }
  35. return super.processRequest(ctx, request);
  36. }

        水位计算逻辑如下:

  1. public ConsumeQueueWaterMark calculateMinAccumulated(String topic, int queueId) {
  2. Set<String> subscribedGroups = deFiBrokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
  3. Set<String> checkGroups = new HashSet<String>();
  4. DeFiBusTopicConfig deFiBusTopicConfig = this.getBrokerController().getExtTopicConfigManager().selectExtTopicConfig(topic);
  5. long maxDepth = deFiBusTopicConfig != null ? deFiBusTopicConfig.getMaxQueueDepth() : DeFiBusTopicConfig.DEFAULT_QUEUE_LENGTH;
  6. double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
  7. ConsumeQueueWaterMark minDepth = null;
  8. long maxOffset = this.deFiBrokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
  9. LOG.debug("calculateMinAccumulated topic:{},queueID:{},subscribedGroups{}", topic, queueId, subscribedGroups);
  10. //calculate accumulated depth for each consumer group
  11. for (String consumerGroup : subscribedGroups) {
  12. if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || consumerGroup.startsWith(DeFiBusConstant.EXT_CONSUMER_GROUP)) {
  13. continue;
  14. }
  15. DeFiConsumerManager consumerManager = (DeFiConsumerManager) this.deFiBrokerController.getConsumerManager();
  16. DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerManager.getConsumerGroupInfo(consumerGroup);
  17. //ignore offline consumer group
  18. if (deFiConsumerGroupInfo == null || deFiConsumerGroupInfo.getClientIdBySubscription(topic) == null
  19. || deFiConsumerGroupInfo.getClientIdBySubscription(topic).isEmpty()) {
  20. continue;
  21. }
  22. long ackOffset = queryOffset(consumerGroup, topic, queueId);
  23. long thisDepth = maxOffset - ackOffset;
  24. long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
  25. if (lastDeliverOffset >= 0) {
  26. thisDepth = maxOffset - lastDeliverOffset;
  27. }
  28. checkGroups.add(consumerGroup);
  29. ConsumeQueueWaterMark depthOfThisGroup = new ConsumeQueueWaterMark(consumerGroup, topic, queueId, lastDeliverOffset, thisDepth);
  30. if (minDepth == null) {
  31. minDepth = depthOfThisGroup;
  32. } else if (depthOfThisGroup.getAccumulated() < minDepth.getAccumulated()) {
  33. minDepth = depthOfThisGroup;
  34. }
  35. LOG.debug("topic:{},queueID:{},depthOfThisGroup:{} ,minDepth:{}", topic, queueId, depthOfThisGroup, minDepth);
  36. if (depthOfThisGroup.getAccumulated() > maxDepth) {
  37. LOG.error("Quota exceed 100% for topic:{},queueID:{},depthOfThisGroup:{} ,maxDepth:{} maxOffset: {} ackOffset: {}"
  38. , topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
  39. } else if (depthOfThisGroup.getAccumulated() > maxDepth * highWatermark) {
  40. LOG.error("Quota exceed {}% for topic:{}, queueID:{}, depthOfThisGroup:{}, maxDepth:{} maxOffset: {} ackOffset: {}"
  41. , highWatermark * 100, topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
  42. }
  43. }
  44. if (checkGroups.isEmpty()) {
  45. minDepth = new ConsumeQueueWaterMark("NO_ONLINE_GROUP", topic, queueId, maxOffset, 0);
  46. }
  47. for (String consumerGroup : checkGroups) {
  48. long thisDepth = maxOffset - queryOffset(consumerGroup, topic, queueId);
  49. long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
  50. if (lastDeliverOffset >= 0) {
  51. thisDepth = maxOffset - lastDeliverOffset;
  52. }
  53. if (thisDepth > maxDepth) {
  54. if (checkGroups.size() > 1 && minDepth.getAccumulated() < maxDepth * MIN_CLEAN_THRESHOLD) {
  55. autoUpdateDepth(consumerGroup, topic, queueId, maxDepth, maxOffset);
  56. }
  57. }
  58. }
  59. return minDepth;
  60. }

        微众银行开发者们为解决MQ在不同场景下使用中的问题,基于RocketMQ定制化开发了许多高级特性,并已经全部开源,本文所述全部源码均为DeFiBus源码。GitHub - WeBankFinTech/DeFiBus: DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。. Contribute to WeBankFinTech/DeFiBus development by creating an account on GitHub.https://github.com/WeBankFinTech/DeFiBus     

        

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号