(1)解决问题一,要做好 灰度发布。每次新功能上线前,选取一定比例的消费实例做灰度,若出现问题,及时回滚;若消费者消费正常,平稳运行一段时间后,再升级其它实例。如果需要按规则选出一部分账号做灰度,则需要做好消息过滤,让正常消费实例排除灰度消息,让灰度消费实例过滤出灰度消息。
(2)解决问题二,要做到 多活。极端情况下,当一个IDC内消费实例全部宕机时,需要做到让其他IDC内的消费实例正常消费消息。同时,若一个IDC内Broker全部宕机,需要支持生产者将消息发送至其它IDC的Broker。
(3)解决问题三,要 增强消费能力。增强消费能力,主要是增加消费者线程数或增加消费者实例个数。增加消费者线程数要注意消费者及其下游服务的消费能力,上线前就要将线程池参数调至最优状态。增加消费者实例个数,要注意Queue数量,消费实例的数量要与Queue数量相同,如果消费实例数量超过Queue数量,多出的消费实例分不到Queue,只增加消费实例是没用的,如果消费实例数量比Queue数量少,每个消费实例承载的流量是不同的。
(4)解决问题四,要做到 熔断与隔离。当一个Broker的队列出现消息积压时,要对其熔断,将其隔离,将新消息发送至其它队列,过一定的时间,再解除其隔离。
- @Override
- public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
- if (null == tagsCode || null == subscriptionData) {
- return true;
- }
- if (subscriptionData.isClassFilterMode()) {
- return true;
- }
- return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
- || subscriptionData.getCodeSet().contains(tagsCode.intValue());
- }
- public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
- final SubscriptionData subscriptionData) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
- this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
- if (PullStatus.FOUND == pullResult.getPullStatus()) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
- List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
- List<MessageExt> msgListFilterAgain = msgList;
- if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
- msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
- for (MessageExt msg : msgList) {
- if (msg.getTags() != null) {
- if (subscriptionData.getTagsSet().contains(msg.getTags())) {
- msgListFilterAgain.add(msg);
- }
- }
- }
- }
- //...
- return pullResult;
- }

- @Override
- public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
- //...
- Object ret = null;
- try {
- MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
- ret = realFilterData.getCompiledExpression().evaluate(context);
- } catch (Throwable e) {
- log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
- }
- //...
- return (Boolean) ret;
- }

微众在RocketMQ基础上增加了HealthyMessageQueueSelector, Broker Cluster及Broker均使用IDC-命名,Producer启动时,会启动一个调度任务将同一个IDC下的Broker缓存至 HealthyMessageQueueSelector,生产者通过HealthyMessageQueueSelector缓存的Broker来优先选择本IDC的Queue,逻辑如下:
- @Override
- @SuppressWarnings("unchecked")
- public MessageQueue select(List<MessageQueue> mqs, Message msg, final Object selectedResultRef) {
- //...
- boolean pub2local = MapUtils.getBoolean(sendNearbyMapping, msg.getTopic(), Boolean.TRUE);
- MessageQueue lastOne = ((AtomicReference<MessageQueue>) selectedResultRef).get();
- if (pub2local) {
- List<MessageQueue> localMQs = new ArrayList<>();
- List<MessageQueue> remoteMqs = new ArrayList<>();
- HashMap<String, Integer> localBrokerMQCount = separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs);
- for (String brokerName : localBrokerMQCount.keySet()) {
- //if MQ num less than threshold, send msg to all broker
- if (localBrokerMQCount.get(brokerName) <= minMqCountWhenSendLocal) {
- localMQs.addAll(remoteMqs);
- }
- }
- //try select a mq from local idc first
- MessageQueue candidate = selectMessageQueue(localMQs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select local mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- //try select a mq from other idc if cannot select one from local idc
- candidate = selectMessageQueue(remoteMqs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select remote mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- } else {
- //try select a mq from all mqs
- MessageQueue candidate = selectMessageQueue(mqs, lastOne, msg);
- if (candidate != null) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select global mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
- //try select a mq which is not isolated if no mq satisfy all limits
- for (int j = 0; j < mqs.size(); j++) {
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- if (isQueueHealthy(candidate)) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select any available mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
- //in case of retry, still try select a mq from another broker if all mq isolated
- if (lastOne != null) {
- for (int j = 0; j < mqs.size(); j++) {
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- if (!lastOne.getBrokerName().equals(candidate.getBrokerName())) {
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select another broker mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }
- }
- }
- //select a mq from all mqs anyway if no mq satisfy any limits
- int index = this.getSendIndex(msg.getTopic());
- int pos = Math.abs(index) % mqs.size();
- MessageQueue candidate = mqs.get(pos);
- ((AtomicReference<MessageQueue>) selectedResultRef).set(candidate);
- LOGGER.debug("select any mq [{}], {}", candidate.toString(), msg);
- return candidate;
- }

- private void updateLocalBrokers(ClusterInfo clusterInfo) {
- if (clusterInfo != null) {
- String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
- HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set<String> currentBrokers = new HashSet<String>();
- for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
- String clusterName = entry.getKey();
- String clusterIdc = StringUtils.split(clusterName, DeFiBusConstant.IDC_SEPERATER)[0];
- if (StringUtils.isNotEmpty(clusterPrefix) && StringUtils.equalsIgnoreCase(clusterIdc, clusterPrefix)) {
- currentBrokers.addAll(entry.getValue());
- }
- }
- if (!currentBrokers.equals(messageQueueSelector.getLocalBrokers())) {
- messageQueueSelector.setLocalBrokers(currentBrokers);
- LOGGER.info("localBrokers updated: {} , clusterPrefix :{} ", currentBrokers, clusterPrefix);
- }
- }
- }

- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- //...
- /**
- * step1: seperate mqs and cids by idc
- */
- Map<String /*idc*/, List<MessageQueue>> sepMqs = this.seperateMqsByIDC(mqAll);
- Map<String /*idc*/, List<String/*cid*/>> sepClients = this.seperateCidsByIDC(cidAll);
- /**
- * step2: allocate local mqs first
- */
- String clusterPrefix = extractIdcFromClientId(currentCID);
- if (clusterPrefix != null) {
- List<MessageQueue> nearbyMqs = sepMqs.get(clusterPrefix);
- List<String> nearbyCids = sepClients.get(clusterPrefix);
- if (nearbyMqs != null && nearbyCids != null && !nearbyMqs.isEmpty() && !nearbyCids.isEmpty()) {
- Collections.sort(nearbyCids);
- Collections.sort(nearbyMqs);
- int index = nearbyCids.indexOf(currentCID);
- for (int i = index; i < nearbyMqs.size(); i++) {
- if (i % nearbyCids.size() == index) {
- result.add(nearbyMqs.get(i));
- }
- }
- }
- }
- /**
- * step3: allocate mqs which no subscriber in the same idc
- */
- List<MessageQueue> mqsNoClientsInSameIdc = new ArrayList<>();
- for (String idc : sepMqs.keySet()) {
- if (!idc.equals(UNKNOWN_IDC) && (sepClients.get(idc) == null || sepClients.get(idc).isEmpty())) {
- mqsNoClientsInSameIdc.addAll(sepMqs.get(idc));
- }
- }
- if (!mqsNoClientsInSameIdc.isEmpty()) {
- Collections.sort(mqsNoClientsInSameIdc);
- Collections.sort(cidAll);
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqsNoClientsInSameIdc.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqsNoClientsInSameIdc.get(i));
- }
- }
- }
- /**
- * step4: allocate mqs which no matched any cluster and cannot determined idc.
- */
- if (sepMqs.get(UNKNOWN_IDC) != null && !sepMqs.get(UNKNOWN_IDC).isEmpty()) {
- log.warn("doRebalance: cannot determine idc of mqs. allocate all to myself. {}", sepMqs.get(UNKNOWN_IDC));
- result.addAll(sepMqs.get(UNKNOWN_IDC));
- }
- return result;
- }

- private Map<String, List<MessageQueue>> seperateMqsByIDC(List<MessageQueue> mqAll) {
- String topic = mqAll.get(0).getTopic();
- TopicRouteData topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
- if (topicRouteData == null) {
- mqClientInstance.updateTopicRouteInfoFromNameServer(topic);
- topicRouteData = mqClientInstance.getTopicRouteTable().get(topic);
- }
- HashMap<String/*brokerName*/, String/*idc*/> brokerIdcMap = new HashMap<>();
- ArrayList<BrokerData> brokerDatas = new ArrayList<>(topicRouteData.getBrokerDatas());
- for (BrokerData broker : brokerDatas) {
- String clusterName = broker.getCluster();
- String idc = clusterName.split(DeFiBusConstant.IDC_SEPERATER)[0];
- brokerIdcMap.put(broker.getBrokerName(), idc.toUpperCase());
- }
- Map<String/*IDC*/, List<MessageQueue>> result = new HashMap<>();
- for (MessageQueue mq : mqAll) {
- String idc = brokerIdcMap.get(mq.getBrokerName());
- if (idc == null) {
- idc = UNKNOWN_IDC;
- }
- if (result.get(idc) == null) {
- List<MessageQueue> mqList = new ArrayList<>();
- mqList.add(mq);
- result.put(idc, mqList);
- } else {
- result.get(idc).add(mq);
- }
- }
- return result;
- }
- private Map<String, List<String>> seperateCidsByIDC(List<String> cidAll) {
- Map<String/* idc */, List<String>> result = new HashMap<>();
- for (String cid : cidAll) {
- String cidIdc = extractIdcFromClientId(cid);
- if (cidIdc != null) {
- cidIdc = cidIdc.toUpperCase();
- if (result.get(cidIdc) != null) {
- result.get(cidIdc).add(cid);
- } else {
- List<String> cidList = new ArrayList<>();
- cidList.add(cid);
- result.put(cidIdc, cidList);
- }
- }
- }
- return result;
- }

- @Override
- public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
- //...
- DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerGroupInfo;
- Set<String> oldSub = deFiConsumerGroupInfo.findSubscribedTopicByClientId(clientChannelInfo.getClientId());
- boolean r1 = super.registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, isNotifyConsumerIdsChangedEnable);
- boolean r2 = deFiConsumerGroupInfo.registerClientId(subList, clientChannelInfo.getClientId());
- if (r1 || r2) {
- adjustQueueNum(oldSub, subList);
- if (isNotifyConsumerIdsChangedEnable) {
- // this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
- asyncNotifyClientChange(group, deFiConsumerGroupInfo);
- }
- }
- //...
- return r1 || r2;
- }
- private void adjustQueueNum(final Set<String> oldSub, final Set<SubscriptionData> subList) {
- for (SubscriptionData subscriptionData : subList) {
- if (!oldSub.contains(subscriptionData.getTopic())) {
- //new sub topic, increase queue num
- adjustQueueNumStrategy.increaseQueueNum(subscriptionData.getTopic());
- }
- }
- //...
- }

- private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) {
- //...
- switch (scaleType) {
- adjustReadQueueNumByConsumerCount(topic, 0, scaleType);
- adjustWriteQueueNumByConsumerCount(topic, 10 * 1000, scaleType);
- break;
- adjustWriteQueueNumByConsumerCount(topic, 0, scaleType);
- long delayTimeMinutes = Math.min(deFiBrokerController.getDeFiBusBrokerConfig().getScaleQueueSizeDelayTimeMinute(), 10);
- long delayTimeMillis = delayTimeMinutes * 60 * 1000;
- adjustReadQueueNumByConsumerCount(topic, delayTimeMillis, scaleType);
- break;
- }
- }

- @Override
- public void onException(Throwable e) {
- try {
- MessageQueueHealthManager messageQueueHealthManager
- = ((HealthyMessageQueueSelector) messageQueueSelector).getMessageQueueHealthManager();
- MessageQueue messageQueue = ((AtomicReference<MessageQueue>) selectorArg).get();
- if (messageQueue != null) {
- messageQueueSelector.getMessageQueueHealthManager().markQueueFault(messageQueue);//隔离队列
- if (messageQueueSelector.getMessageQueueHealthManager().isQueueFault(messageQueue)) {//若超过隔离时间,将队列移除隔离区
- LOGGER.warn("isolate send failed mq. {} cause: {}", messageQueue, e.getMessage());
- }
- }
- //logic of fuse
- if (e.getMessage().contains("CODE: " + DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG)) {
- //first retry initialize
- if (queueCount == 0) {
- List<MessageQueue> messageQueueList = producer.getDefaultMQProducer().getDefaultMQProducerImpl().getTopicPublishInfoTable()
- .get(msg.getTopic()).getMessageQueueList();
- queueCount = messageQueueList.size();
- String clusterPrefix = deFiBusProducer.getDeFiBusClientConfig().getClusterPrefix();
- if (!StringUtils.isEmpty(clusterPrefix)) {
- for (MessageQueue mq : messageQueueList) {
- if (messageQueueHealthManager.isQueueFault(mq)) {
- queueCount--;
- }
- }
- }
- }
- int retryTimes = Math.min(queueCount, deFiBusProducer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed());
- if (circuitBreakRetryTimes.get() < retryTimes) {
- circuitBreakRetryTimes.incrementAndGet();
- LOGGER.warn("fuse:send to [{}] circuit break, retry no.[{}] times, msgKey:[{}]", messageQueue.toString(), circuitBreakRetryTimes.intValue(), msg.getKeys());
- producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
- //no exception to client when retry
- return;
- } else {
- LOGGER.warn("fuse:send to [{}] circuit break after retry {} times, msgKey:[{}]", messageQueue.toString(), retryTimes, msg.getKeys());
- }
- } else {
- int maxRetryTimes = producer.getDeFiBusClientConfig().getRetryTimesWhenSendAsyncFailed();
- if (sendRetryTimes.getAndIncrement() < maxRetryTimes) {
- LOGGER.info("send message fail, retry {} now, msgKey: {}, cause: {}", sendRetryTimes.get(), msg.getKeys(), e.getMessage());
- producer.getDefaultMQProducer().send(msg, messageQueueSelector, selectorArg, this);
- return;
- } else {
- LOGGER.warn("send message fail, after retry {} times, msgKey:[{}]", maxRetryTimes, msg.getKeys());
- }
- }
- if (sendCallback != null) {
- sendCallback.onException(e);
- }
- } catch (Exception e1) {
- LOGGER.warn("onExcept fail", e1);
- if (sendCallback != null) {
- sendCallback.onException(e);
- }
- }
- }
- }

- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- SendMessageRequestHeader requestHeader = parseRequestHeader(request);
- String Topic = requestHeader.getTopic();
- int queueIdInt = requestHeader.getQueueId();
- if (deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().isRejectSendWhenMaxDepth()
- && Topic != null
- && !Topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
- && !Topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)
- && !Topic.contains(DeFiBusConstant.RR_REPLY_TOPIC)
- && !Topic.startsWith(DeFiBusConstant.RMQ_SYS)) {
- long maxQueueDepth = deFiQueueManager.getMaxQueueDepth(Topic);
- double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
- ConsumeQueueWaterMark minConsumeQueueWaterMark
- = deFiQueueManager.getMinAccumulated(Topic, queueIdInt);
- if (minConsumeQueueWaterMark != null) {
- long accumulate = minConsumeQueueWaterMark.getAccumulated();
- if (accumulate >= maxQueueDepth) {
- if (System.currentTimeMillis() % 100 == 0) {
- LOG.error("Quota exceed 100% for topic [{}] in queue [{}], current:[{}], max:[{}]", Topic, queueIdInt, accumulate, maxQueueDepth);
- }
- final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- response.setCode(DeFiBusResponseCode.CONSUME_DIFF_SPAN_TOO_LONG);
- response.setRemark(" consume span too long, maybe has slow consumer, so send rejected");
- return response;
- } else if (accumulate >= maxQueueDepth * highWatermark) {
- if (System.currentTimeMillis() % 100 == 0) {
- LOG.error("Quota exceed {}% for topic [{}] in queue [{}], current:[{}], max:[{}]", highWatermark * 100, Topic, queueIdInt, accumulate, maxQueueDepth);
- }
- }
- }
- //...
- }
- return super.processRequest(ctx, request);
- }

- public ConsumeQueueWaterMark calculateMinAccumulated(String topic, int queueId) {
- Set<String> subscribedGroups = deFiBrokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
- Set<String> checkGroups = new HashSet<String>();
- DeFiBusTopicConfig deFiBusTopicConfig = this.getBrokerController().getExtTopicConfigManager().selectExtTopicConfig(topic);
- long maxDepth = deFiBusTopicConfig != null ? deFiBusTopicConfig.getMaxQueueDepth() : DeFiBusTopicConfig.DEFAULT_QUEUE_LENGTH;
- double highWatermark = deFiQueueManager.getBrokerController().getDeFiBusBrokerConfig().getQueueDepthHighWatermark();
- ConsumeQueueWaterMark minDepth = null;
- long maxOffset = this.deFiBrokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
- LOG.debug("calculateMinAccumulated topic:{},queueID:{},subscribedGroups{}", topic, queueId, subscribedGroups);
- //calculate accumulated depth for each consumer group
- for (String consumerGroup : subscribedGroups) {
- if (topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || consumerGroup.startsWith(DeFiBusConstant.EXT_CONSUMER_GROUP)) {
- continue;
- }
- DeFiConsumerManager consumerManager = (DeFiConsumerManager) this.deFiBrokerController.getConsumerManager();
- DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) consumerManager.getConsumerGroupInfo(consumerGroup);
- //ignore offline consumer group
- if (deFiConsumerGroupInfo == null || deFiConsumerGroupInfo.getClientIdBySubscription(topic) == null
- || deFiConsumerGroupInfo.getClientIdBySubscription(topic).isEmpty()) {
- continue;
- }
- long ackOffset = queryOffset(consumerGroup, topic, queueId);
- long thisDepth = maxOffset - ackOffset;
- long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
- if (lastDeliverOffset >= 0) {
- thisDepth = maxOffset - lastDeliverOffset;
- }
- checkGroups.add(consumerGroup);
- ConsumeQueueWaterMark depthOfThisGroup = new ConsumeQueueWaterMark(consumerGroup, topic, queueId, lastDeliverOffset, thisDepth);
- if (minDepth == null) {
- minDepth = depthOfThisGroup;
- } else if (depthOfThisGroup.getAccumulated() < minDepth.getAccumulated()) {
- minDepth = depthOfThisGroup;
- }
- LOG.debug("topic:{},queueID:{},depthOfThisGroup:{} ,minDepth:{}", topic, queueId, depthOfThisGroup, minDepth);
- if (depthOfThisGroup.getAccumulated() > maxDepth) {
- LOG.error("Quota exceed 100% for topic:{},queueID:{},depthOfThisGroup:{} ,maxDepth:{} maxOffset: {} ackOffset: {}"
- , topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
- } else if (depthOfThisGroup.getAccumulated() > maxDepth * highWatermark) {
- LOG.error("Quota exceed {}% for topic:{}, queueID:{}, depthOfThisGroup:{}, maxDepth:{} maxOffset: {} ackOffset: {}"
- , highWatermark * 100, topic, queueId, depthOfThisGroup, maxDepth, maxOffset, ackOffset);
- }
- }
- if (checkGroups.isEmpty()) {
- minDepth = new ConsumeQueueWaterMark("NO_ONLINE_GROUP", topic, queueId, maxOffset, 0);
- }
- for (String consumerGroup : checkGroups) {
- long thisDepth = maxOffset - queryOffset(consumerGroup, topic, queueId);
- long lastDeliverOffset = queryDeliverOffset(consumerGroup, topic, queueId);
- if (lastDeliverOffset >= 0) {
- thisDepth = maxOffset - lastDeliverOffset;
- }
- if (thisDepth > maxDepth) {
- if (checkGroups.size() > 1 && minDepth.getAccumulated() < maxDepth * MIN_CLEAN_THRESHOLD) {
- autoUpdateDepth(consumerGroup, topic, queueId, maxDepth, maxOffset);
- }
- }
- }
- return minDepth;
- }

微众银行开发者们为解决MQ在不同场景下使用中的问题,基于RocketMQ定制化开发了许多高级特性,并已经全部开源,本文所述全部源码均为DeFiBus源码。GitHub - WeBankFinTech/DeFiBus: DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。DeFiBus=RPC+MQ,安全可控的分布式金融级消息总线。. Contribute to WeBankFinTech/DeFiBus development by creating an account on GitHub.https://github.com/WeBankFinTech/DeFiBus
