赞
踩
MQ消息堆积是指生产者发送的消息短时间内在Broker端大量堆积,无法被消费者及时消费,从而导致业务功能无法正常使用。
消息堆积常见于以下几种情况:
(1)新上线的消费者功能有BUG,消息无法被消费。
(2)消费者实例宕机或因网络问题暂时无法同Broker建立连接。
(3)生产者短时间内推送大量消息至Broker,消费者消费能力不足。
(4)生产者未感知Broker消费堆积持续向Broker推送消息。
解决上述问题就要做到:
(1)解决问题一,要做好 灰度发布。每次新功能上线前,选取一定比例的消费实例做灰度,若出现问题,及时回滚;若消费者消费正常,平稳运行一段时间后,再升级其它实例。如果需要按规则选出一部分账号做灰度,则需要做好消息过滤,让正常消费实例排除灰度消息,让灰度消费实例过滤出灰度消息。
(2)解决问题二,要做到 多活。极端情况下,当一个IDC内消费实例全部宕机时,需要做到让其他IDC内的消费实例正常消费消息。同时,若一个IDC内Broker全部宕机,需要支持生产者将消息发送至其它IDC的Broker。
(3)解决问题三,要 增强消费能力。增强消费能力,主要是增加消费者线程数或增加消费者实例个数。增加消费者线程数要注意消费者及其下游服务的消费能力,上线前就要将线程池参数调至最优状态。增加消费者实例个数,要注意Queue数量,消费实例的数量要与Queue数量相同,如果消费实例数量超过Queue数量,多出的消费实例分不到Queue,只增加消费实例是没用的,如果消费实例数量比Queue数量少,每个消费实例承载的流量是不同的。
(4)解决问题四,要做到 熔断与隔离。当一个Broker的队列出现消息积压时,要对其熔断,将其隔离,将新消息发送至其它队列,过一定的时间,再解除其隔离。
看微众是如何解决这些问题的:
RocketMQ支持两种消息过滤方式:一种是RocketMQ自带的Tag、SQL92过滤,在消费者订阅Topic的时候指定Tag或SQL92表达式;另一种是FilterServer机制,需要上传自定义的ClassFilter类至Broker所在的服务器,Consumer从FilterServer拉取消息,FilterServer负责将请求转发给Broker并从Broker获取消息后根据上传的Java程序做过滤,将过滤后的消息返回给Consumer,FilterServer机制可以充分利用Broker所在服务器的CPU减少网卡流量,且用Java程序写过滤逻辑比较灵活,但要注意代码的质量,避免过渡消耗Broker所在服务器的CPU跟内存。灰度发布一般灰度规则不会太复杂,使用Tag或SQL92过滤即可。
Tag过滤机制,首先会在Broker上对Tag做一次hash值过滤,将不必要的消息过滤出去,不让Consumer拉取,减轻网络传输的压力,为避免因Hash冲突造成的过滤不全的问题,再在客户端做一次按内容匹配的过滤。
Broker端过滤逻辑如下:
- @Override
- public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
- if (null == tagsCode || null == subscriptionData) {
- return true;
- }
-
- if (subscriptionData.isClassFilterMode()) {
- return true;
- }
-
- return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
- || subscriptionData.getCodeSet().contains(tagsCode.intValue());
- }
客户端过滤逻辑如下:
- public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
- final SubscriptionData subscriptionData) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
-
- this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
- if (PullStatus.FOUND == pullResult.getPullStatus()) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
- List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
-
- List<MessageExt> msgListFilterAgain = msgList;
- if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
- msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
- for (MessageExt msg : msgList) {
- if (msg.getTags() != null) {
- if (subscriptionData.getTagsSet().contains(msg.getTags())) {
- msgListFilterAgain.add(msg);
- }
- }
- }
- }
- //...
-
- return pullResult;
- }

SQL92过滤机制,需要设置属性enablePropertyFilter=true,与Tag不同,sql过滤逻辑只在Broker上,部分逻辑如下:
- @Override
- public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
- //...
-
- Object ret = null;
- try {
- MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
-
- ret = realFilterData.getCompiledExpression().evaluate(context);
- } catch (Throwable e) {
- log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
- }
- //...
-
- return (Boolean) ret;
- }

要满足IDC内Broker集群宕机后,Producer可以把消息发送到其它IDC的Broker集群,就要保证每个子系统的应用实例(客户端)至少连接两台不同IDC的Broker,并且为保证消息传递速度,默认在正常状态下Producer只将消息发送至与其在同一个IDC的Broker。
微众在RocketMQ基础上增加了HealthyMessageQueueSelector, Broker Cluster及Broker均使用IDC-命名,Producer启动时,会启动一个调度任务将同一个IDC下的Broker缓存至 HealthyMessageQueueSelector,生产者通过HealthyMessageQueueSelector缓存的Broker来优先选择本IDC的Queue,逻辑如下:
- @Override
- @SuppressWarnings("unchecked")
- public MessageQueue select(List<MessageQueue> mqs, Message msg, final Object selectedResultRef) {
- //...
- boolean pub2local = MapUtils.getBoolean(sendNearbyMapping, msg.getTopic(), Boolean.TRUE);
- MessageQueue lastOne = ((AtomicReference<MessageQueue>) selectedResultRef).get();
-
- if (pub2local) {
- List<MessageQueue> localMQs = new ArrayList<>();
- List<MessageQueue> remoteMqs = new ArrayList<>();
- HashMap<String, Integer> localBrokerMQCount = separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs);
-
- for (String brokerName : localBrokerMQCount.keySet()) {
- //if MQ num less than threshold, send msg to all broker
- if (localBrokerMQCount.get(brokerName) <= minMqCountWhenSendLocal) {
- localMQs.addAll(remoteMqs);
- }
- }
-
- //try select a mq from local idc first
- MessageQueue candidate = selectMessageQueue(localMQs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select local mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
-
- //try select a mq from other idc if cannot select one from local idc
- candidate = selectMessageQueue(remoteMqs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select remote mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- } else {
- //try select a mq from all mqs
- MessageQueue candidate = selectMessageQueue(mqs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select global mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
-
- //try select a mq which is not isolated if no mq satisfy all limits
- for (int j = 0; j < mqs.size(); j++) {
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- if (isQueueHealthy(candidate)) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select any available mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
-
- //in case of retry, still try select a mq from another broker if all mq isolated
- if (lastOne != null) {
- for (int j = 0; j < mqs.size(); j++) {
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- if (!lastOne.getBrokerName().equals(candidate.getBrokerName())) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select another broker mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
- }
-
- //select a mq from all mqs anyway if no mq satisfy any limits
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select any mq [{}], {}", candidate.toString(), msg);
- return candidate;
-
- }

Producer启动时缓存localBroker逻辑如下:
- private void updateLocalBrokers(ClusterInfo clusterInfo) {
- if (clusterInfo != null) {
- String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
- HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set<String> currentBrokers = new HashSet<String>();
- for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
- String clusterName = entry.getKey();
- String clusterIdc = StringUtils.split(clusterName, DeFiBusConstant.IDC_SEPERATER)[0];
- if (StringUtils.isNotEmpty(clusterPrefix) && StringUtils.equalsIgnoreCase(clusterIdc, clusterPrefix)) {
- currentBrokers.addAll(entry.getValue());
- }
- }
- if (!currentBrokers.equals(messageQueueSelector.getLocalBrokers())) {
- messageQueueSelector.setLocalBrokers(currentBrokers);
- LOGGER.info("localBrokers updated: {} , clusterPrefix :{} ", currentBrokers, clusterPrefix);
- }
- }
- }

要达到一个IDC的Consumer实例全部宕机后,Broker上消息可以由其它IDC的Cousmer实例消费,就要实现本IDC的Queue被本IDC内Consumer实例监听,也要被其它IDC的Consumer实例监听,正常状况下,只允许本IDC的Consumer实例消费消息。要实现这个功能,要调整客户端负载均衡的算法,保证正常情况下只允许本IDC下的Consumer消费。
微众增加了自己的负载均衡算法AllocateMessageQueueByIDC,客户端以-IDC为后缀命名,客户端及队列均以IDC为分组,同一个IDC下的客户端优先监听同一个IDC的队列:
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- //...
-
- /**
- * step1: seperate mqs and cids by idc
- */
- Map<String /*idc*/, List<MessageQueue>> sepMqs = this.seperateMqsByIDC(mqAll);
- Map<String /*idc*/, List<String/*cid*/>> sepClients = this.seperateCidsByIDC(cidAll);
-
- /**
- * step2: allocate local mqs first
- */
- String clusterPrefix = extractIdcFromClientId(currentCID);
- if (clusterPrefix != null) {
- List<MessageQueue> nearbyMqs = sepMqs.get(clusterPrefix);
- List<String> nearbyCids = sepClients.get(clusterPrefix);
-
- if (nearbyMqs != null && nearbyCids != null && !nearbyMqs.isEmpty() && !nearbyCids.isEmpty()) {
- Collections.sort(nearbyCids);
- Collections.sort(nearbyMqs);
- int index = nearbyCids.indexOf(currentCID);
- for (int i = index; i < nearbyMqs.size(); i++) {
- if (i % nearbyCids.size() == index) {
- result.add(nearbyMqs.get(i));
- }
- }
- }
- }
-
- /**
- * step3: allocate mqs which no subscriber in the same idc
- */
- List<MessageQueue> mqsNoClientsInSameIdc = new ArrayList<>();
- for (String idc : sepMqs.keySet()) {
- if (!idc.equals(UNKNOWN_IDC) && (sepClients.get(idc) == null || sepClients.get(idc).isEmpty())) {
- mqsNoClientsInSameIdc.addAll(sepMqs.get(idc));
- }
- }
- if (!mqsNoClientsInSameIdc.isEmpty()) {
- Collections.sort(mqsNoClientsInSameIdc);
- Collections.sort(cidAll);
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqsNoClientsInSameIdc.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqsNoClientsInSameIdc.get(i));
- }
- }
- }
-
- /**
- * step4: allocate mqs which no matched any cluster and cannot determined idc.
- */
- if (sepMqs.get(UNKNOWN_IDC) != null && !sepMqs.get(UNKNOWN_IDC).isEmpty()) {
- log.warn("doRebalance: cannot determine idc of mqs. allocate all to myself. {}", sepMqs.get(UNKNOWN_IDC));
- result.addAll(sepMqs.get(UNKNOWN_IDC));
- }
- return result;
- }

队列及客户端根据IDC分组逻辑如下:
- private Map<String, List<MessageQueue>> seperateMqsByIDC(List<MessageQueue> mqAll) {
- String topic = mqAll.get(0).getTopic();
- TopicRouteData topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
- if (topicRouteData == null) {
- mqClientInstance.updateTopicRouteInfoFromNameServer(topic);
- topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
- }
-
- HashMap<String/*brokerName*/, String/*idc*/> brokerIdcMap = new HashMap<>();
- ArrayList<BrokerData> brokerDatas = new ArrayList<>(topicRouteData.getBrokerDatas());
- for (BrokerData broker : brokerDatas) {
- String clusterName = broker.getCluster();
- String idc = clusterName.split(DeFiBusConstant.IDC_SEPERATER)[0];
- brokerIdcMap.put(broker.getBrokerName(), idc.toUpperCase());
- }
-
- Map<String/*IDC*/, List<MessageQueue>> result = new HashMap<>();
- for (MessageQueue mq : mqAll) {
- String idc = brokerIdcMap.get(mq.getBrokerName());
- if (idc == null) {
- idc = UNKNOWN_IDC;
- }
- if (result.get(idc) == null) {
- List<MessageQueue> mqList = new ArrayList<>();
- mqList.add(mq);
- result.put(idc, mqList);
- } else {
- result.get(idc).add(mq);
- }
- }
- return result;
- }
-
- private Map<String, List<String>> seperateCidsByIDC(List<String> cidAll) {
- Map<String/* idc */, List<String>> result = new HashMap<>();
- for (String cid : cidAll) {
- String cidIdc = extractIdcFromClientId(cid);
- if (cidIdc != null) {
- cidIdc = cidIdc.toUpperCase();
- if (result.get(cidIdc) != null) {
- result.get(cidIdc).add(cid);
- } else {
- List<String> cidList = new ArrayList<>();
- cidList.add(cid);
- result.put(cidIdc, cidList);
- }
- }
- }
- return result;
- }

为了让消费实例的数量与Queue数量相同,微众基于RocketMQ定制化开发了队列随消费实例动态增减的特性。实现方式是动态调整Topic配置的ReadQueueNum和WriteQueueNum,在扩容时,首先增加可读队列个数,保证Consumer先完成监听,再增加可写队列个数,使得Producer可以往新增加的队列发消息。队列缩容与扩容的过程相反,先缩小可写队列个数,不再往即将缩掉的队列发消息,等到Consumer将该队列里的消息全部消费完成后,再缩小可读队列个数,完成缩容过程。
Broker端消费者注册时,调整队列逻辑:
- @Override
- public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
- //...
- DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerGroupInfo;
-
- Set<String> oldSub = deFiConsumerGroupInfo.findSubscribedTopicByClientId(clientChannelInfo.getClientId());
- boolean r1 = super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, isNotifyConsumerIdsChangedEnable);
- boolean r2 = deFiConsumerGroupInfo.registerClientId(subList, clientChannelInfo.getClientId());
-
- if (r1 || r2) {
- adjustQueueNum(oldSub, subList);
- if (isNotifyConsumerIdsChangedEnable) {
- // this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
- asyncNotifyClientChange(group, deFiConsumerGroupInfo);
- }
- }
- //...
- return r1 || r2;
- }
-
- private void adjustQueueNum(final Set<String> oldSub, final Set<SubscriptionData> subList) {
- for (SubscriptionData subscriptionData : subList) {
- if (!oldSub.contains(subscriptionData.getTopic())) {
- //new sub topic, increase queue num
- adjustQueueNumStrategy.increaseQueueNum(subscriptionData.getTopic());
- }
- }
- //...
- }

队列调整逻辑在AdjustQueueNumStrategy类中,新增队列,先调整ReadQueueNum,再调整WriteQueueNum,缩减队列时相反,逻辑如下:
- private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) {
- //...
- switch (scaleType) {
- case INCREASE_QUEUE_NUM:
- adjustReadQueueNumByConsumerCount(topic, 0, scaleType);
- adjustWriteQueueNumByConsumerCount(topic, 10 * 1000, scaleType);
- break;
-
- case DECREASE_QUEUE_NUM:
- adjustWriteQueueNumByConsumerCount(topic, 0, scaleType);
- long delayTimeMinutes = Math.min(deFiBrokerController.getDeFiBusBrokerConfig().getScaleQueueSizeDelayTimeMinute(), 10);
- long delayTimeMillis = delayTimeMinutes * 60 * 1000;
- adjustReadQueueNumByConsumerCount(topic, delayTimeMillis, scaleType);
- break;
- }
- }

微众银行同样做了MQ熔断机制,实现的逻辑是:当Broker某个队列消息堆积达到预设阈值(或出现异常)后,生产者再向其推送消息,会收到异常响应,然后执行回调方法将该队列放在faultMap中隔离60s,每次消息发送进行queueSelector时,会将过了隔离时间的队列从隔离区移除。
- @Override
- public void onException(Throwable e) {
- try {
- MessageQueueHealthManager messageQueueHealthManager
- = ((HealthyMessageQueueSelector) messageQueueSelector).getMessageQueueHealthManager();
- MessageQueue messageQueue = ((AtomicReference<MessageQueue>) selectorArg).get();
- if (messageQueue != null) {
- messageQueueSelector.getMessageQueueHealthManager().markQueueFault(messageQueue);//隔离队列
- if (messageQueueSelector.getMessageQueueHealthManager().isQueueFault(messageQueue)) {//若超过隔离时间,将队列移除隔离区
- LOGGER.warn("isolate send failed mq. {} cause: {}", messageQueue, e.getMessage());
- }
- }
- //logic of fuse
- if (e.getMessage().contains("CODE: " + DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG)) {
- //first retry initialize
- if (queueCount == 0) {
- List<MessageQueue> messageQueueList = producer.getDefaultMQProducer().getDefaultMQProducerImpl().getTopicPublishInfoTable()
- .get(msg.getTopic()).getMessageQueueList();
- queueCount = messageQueueList.size();
- String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
- if (!StringUtils.isEmpty(clusterPrefix)) {
- for (MessageQueue mq : messageQueueList) {
- if (messageQueueHealthManager.isQueueFault(mq)) {
- queueCount--;
- }
- }
- }
- }
-
- int retryTimes = Math.min(queueCount, deFiBusProducer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed());
- if (circuitBreakRetryTimes.get() < retryTimes) {
- circuitBreakRetryTimes.incrementAndGet();
- LOGGER.warn("fuse:send to [{}] circuit break, retry no.[{}] times, msgKey:[{}]", messageQueue.toString(), circuitBreakRetryTimes.intValue(), msg.getKeys());
- producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
- //no exception to client when retry
- return;
- } else {
- LOGGER.warn("fuse:send to [{}] circuit break after retry {} times, msgKey:[{}]", messageQueue.toString(), retryTimes, msg.getKeys());
- }
- } else {
- int maxRetryTimes = producer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed();
- if (sendRetryTimes.getAndIncrement() < maxRetryTimes) {
- LOGGER.info("send message fail, retry {} now, msgKey: {}, cause: {}", sendRetryTimes.get(), msg.getKeys(), e.getMessage());
- producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
- return;
- } else {
- LOGGER.warn("send message fail, after retry {} times, msgKey:[{}]", maxRetryTimes, msg.getKeys());
- }
- }
-
- if (sendCallback != null) {
- sendCallback.onException(e);
- }
- } catch (Exception e1) {
- LOGGER.warn("onExcept fail", e1);
- if (sendCallback != null) {
- sendCallback.onException(e);
- }
- }
- }
- }

为记录队列的消息堆积量,微众在Broker上增加了ConsumeQueueWaterMark概念,每次接到生产者发送消息的请求时,校验若当前水位超过了预设阈值,则向生产者端返回CONSUME_DIFF_SPAN_TOO_LONG响应,若当前水位未超阈值,但已经超过了预设的高水位警戒线,则打印错误日志告警。逻辑如下:
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- SendMessageRequestHeader requestHeader = parseRequestHeader(request);
- String Topic = requestHeader.getTopic();
- int queueIdInt = requestHeader.getQueueId();
- if (deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().isRejectSendWhenMaxDepth()
- && Topic != null
- && !Topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
- && !Topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
- && !Topic.contains(DeFiBusConstant.RR_REPLY_TOPIC)
- && !Topic.startsWith(DeFiBusConstant.RMQ_SYS)) {
- long maxQueueDepth = deFiQueueManager.getMaxQueueDepth(Topic);
- double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
- ConsumeQueueWaterMark minConsumeQueueWaterMark
- = deFiQueueManager.getMinAccumulated(Topic, queueIdInt);
- if (minConsumeQueueWaterMark != null) {
- long accumulate = minConsumeQueueWaterMark.getAccumulated();
- if (accumulate >= maxQueueDepth) {
- if (System.currentTimeMillis() % 100 == 0) {
- LOG.error("Quota exceed 100% for topic [{}] in queue [{}], current:[{}], max:[{}]", Topic, queueIdInt, accumulate, maxQueueDepth);
- }
- final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- response.setCode(DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG);
- response.setRemark(" consume span too long, maybe has slow consumer, so send rejected");
- return response;
- } else if (accumulate >= maxQueueDepth * highWatermark) {
- if (System.currentTimeMillis() % 100 == 0) {
- LOG.error("Quota exceed {}% for topic [{}] in queue [{}], current:[{}], max:[{}]", highWatermark * 100, Topic, queueIdInt, accumulate, maxQueueDepth);
- }
- }
- }
- //...
-
- }
- return super.processRequest(ctx, request);
- }

水位计算逻辑如下:
- public ConsumeQueueWaterMark calculateMinAccumulated(String topic, int queueId) {
- Set<String> subscribedGroups = deFiBrokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
- Set<String> checkGroups = new HashSet<String>();
- DeFiBusTopicConfig deFiBusTopicConfig = this.getBrokerController().getExtTopicConfigManager().selectExtTopicConfig(topic);
- long maxDepth = deFiBusTopicConfig != null ? deFiBusTopicConfig.getMaxQueueDepth() : DeFiBusTopicConfig.DEFAULT_QUEUE_LENGTH;
- double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
- ConsumeQueueWaterMark minDepth = null;
- long maxOffset = this.deFiBrokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
- LOG.debug("calculateMinAccumulated topic:{},queueID:{},subscribedGroups{}", topic, queueId, subscribedGroups);
-
- //calculate accumulated depth for each consumer group
- for (String consumerGroup : subscribedGroups) {
- if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || consumerGroup.startsWith(DeFiBusConstant.EXT_CONSUMER_GROUP)) {
- continue;
- }
-
- DeFiConsumerManager consumerManager = (DeFiConsumerManager) this.deFiBrokerController.getConsumerManager();
- DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerManager.getConsumerGroupInfo(consumerGroup);
-
- //ignore offline consumer group
- if (deFiConsumerGroupInfo == null || deFiConsumerGroupInfo.getClientIdBySubscription(topic) == null
- || deFiConsumerGroupInfo.getClientIdBySubscription(topic).isEmpty()) {
- continue;
- }
- long ackOffset = queryOffset(consumerGroup, topic, queueId);
- long thisDepth = maxOffset - ackOffset;
- long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
-
- if (lastDeliverOffset >= 0) {
- thisDepth = maxOffset - lastDeliverOffset;
- }
-
- checkGroups.add(consumerGroup);
- ConsumeQueueWaterMark depthOfThisGroup = new ConsumeQueueWaterMark(consumerGroup, topic, queueId, lastDeliverOffset, thisDepth);
- if (minDepth == null) {
- minDepth = depthOfThisGroup;
- } else if (depthOfThisGroup.getAccumulated() < minDepth.getAccumulated()) {
- minDepth = depthOfThisGroup;
- }
-
- LOG.debug("topic:{},queueID:{},depthOfThisGroup:{} ,minDepth:{}", topic, queueId, depthOfThisGroup, minDepth);
-
- if (depthOfThisGroup.getAccumulated() > maxDepth) {
- LOG.error("Quota exceed 100% for topic:{},queueID:{},depthOfThisGroup:{} ,maxDepth:{} maxOffset: {} ackOffset: {}"
- , topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
- } else if (depthOfThisGroup.getAccumulated() > maxDepth * highWatermark) {
- LOG.error("Quota exceed {}% for topic:{}, queueID:{}, depthOfThisGroup:{}, maxDepth:{} maxOffset: {} ackOffset: {}"
- , highWatermark * 100, topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
- }
- }
-
- if (checkGroups.isEmpty()) {
- minDepth = new ConsumeQueueWaterMark("NO_ONLINE_GROUP", topic, queueId, maxOffset, 0);
- }
-
- for (String consumerGroup : checkGroups) {
- long thisDepth = maxOffset - queryOffset(consumerGroup, topic, queueId);
- long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
-
- if (lastDeliverOffset >= 0) {
- thisDepth = maxOffset - lastDeliverOffset;
- }
-
- if (thisDepth > maxDepth) {
- if (checkGroups.size() > 1 && minDepth.getAccumulated() < maxDepth * MIN_CLEAN_THRESHOLD) {
- autoUpdateDepth(consumerGroup, topic, queueId, maxDepth, maxOffset);
- }
- }
- }
- return minDepth;
- }
-

微众银行开发者们为解决MQ在不同场景下使用中的问题,基于RocketMQ定制化开发了许多高级特性,并已经全部开源,本文所述全部源码均为DeFiBus源码。GitHub - WeBankFinTech/DeFiBus: DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。. Contribute to WeBankFinTech/DeFiBus development by creating an account on GitHub.https://github.com/WeBankFinTech/DeFiBus
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。