当前位置:   article > 正文

RocketMQ你不得不了解的 Rebalance机制源码分析_rocketmq rebalance机制

rocketmq rebalance机制

RocketMQ版本

  • version: 5.1.0

RocketMQ中consumer消费模型

在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型

我们知道在我们创建topic的时候需要指定一个参数就是读队列数

这里假设我们的topic是xiaozoujishu-topic,我们的读队列数 是4个,我们同一gid下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢 首先需要明确的是:

  1. 这里我们的消费模式是集群消费
  2. queue的负载均衡算法是使用默认的AllocateMessageQueueAveragely(平均分配) 假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:

四个队列分配给一个消费者

此时如果我们再启动一个消费者,那么这时候就会进行Rebalance,然后此时我们的队列分配就变成如下:

所以通过上面的队列分配我就知道Rebalance是个啥了,我们下面对Rebalance进行一些定义

RocketMQ的Rebalance是什么

Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配

Rebalance的目的

从上面可以看出Rebalance的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力

但是Rebalance也带来了一些危害,后面我们会重点分析下

Rebalance的触发原因

我们这里先说结论

  1. 订阅Topic的队列数量变化
  2. 消费者组信息变化

这里是最深层的原因,就是topic的队列数量、消费组信息 实际我们可以将这些归结为Rebalance的元数据,这些元数据的变更,就会引起clinet的Rebalance

注意RocketMQ的Rebalance是发生在client

这些元数据都在管broker管理 核心就是这三个类

  • TopicConfigManager
  • SubscriptionGroupManager
  • ConsumerManager

只要这个三个类的信息有变化,client就会进行Rebalance。 下面我们可以具体说下什么情况下会让这三个类变化

订阅Topic的队列数量变化

什么情况下订阅Topic的队列数量会变化呢?

  1. broker扩容
  2. broker缩容
  3. broker宕机(本质也是类似缩容)

消费者组信息变化

什么时候消费者组信息会变化呢?

核心就是consumer的上下线,具体细分又可以分为如下原因:

  1. 服务日常滚动升级
  2. 服务扩容
  3. 服务订阅消息发生变化

源码分析

上面大致介绍了Rebalance的触发原因,现在我们结合源码来具体分析下

我们就从consumer的启动开始分析吧

这里我们以最简单的demo为例

  1. java复制代码 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  2. consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
  3. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  4. consumer.subscribe(TOPIC, "*");
  5. consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
  6. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
  7. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  8. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  9. });
  10. consumer.start();
  11. System.out.printf("Consumer Started.%n");

这里我们直接注意到 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 这个方法,看名字就知道是client向所有的broker发送心跳

我们进入到sendHeartbeatToAllBrokerWithLock方法看看

  1. java复制代码 private void sendHeartbeatToAllBroker() {
  2. final HeartbeatData heartbeatData = this.prepareHeartbeatData();
  3. final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
  4. final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
  5. if (producerEmpty && consumerEmpty) {
  6. log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
  7. return;
  8. }
  9. if (this.brokerAddrTable.isEmpty()) {
  10. return;
  11. }
  12. long times = this.sendHeartbeatTimesTotal.getAndIncrement();
  13. for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
  14. String brokerName = brokerClusterInfo.getKey();
  15. HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
  16. if (oneTable == null) {
  17. continue;
  18. }
  19. for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
  20. Long id = singleBrokerInstance.getKey();
  21. String addr = singleBrokerInstance.getValue();
  22. if (addr == null) {
  23. continue;
  24. }
  25. if (consumerEmpty && MixAll.MASTER_ID != id) {
  26. continue;
  27. }
  28. try {
  29. int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
  30. if (!this.brokerVersionTable.containsKey(brokerName)) {
  31. this.brokerVersionTable.put(brokerName, new HashMap<>(4));
  32. }
  33. this.brokerVersionTable.get(brokerName).put(addr, version);
  34. if (times % 20 == 0) {
  35. log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
  36. log.info(heartbeatData.toString());
  37. }
  38. } catch (Exception e) {
  39. if (this.isBrokerInNameServer(addr)) {
  40. log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
  41. } else {
  42. log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
  43. id, addr, e);
  44. }
  45. }
  46. }
  47. }
  48. }

这段代码主要是通过this.brokerAddrTable.entrySet()获取到所有的master broker地址,然后进行心跳发送

具体的心跳发送代码实际是在下面代码中进行的

  1. java复制代码 int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());

我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT

我们看看RequestCode.HEART_BEAT的调用找到`broker的处理逻辑

很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor类的processRequest

我们具体进去看看这个方法

可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看

进去这个方法我们能看到一个比较核心的方法 registerConsumer

很明显这个方法就是注册consumer的方法

这个方法里面和Rebalance相关比较核心的方法就是这三个

  1. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

这里我们可以看看clientChannelInfo里面是个啥玩意

具体深入到updateChannel方法里面就是判断是否为新的client,是就更新channelInfoTable

2.updateSubscription

这个方法就是判断订阅关系是否发生了变化并更新订阅关系

  1. callConsumerIdsChangeListener
  • callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); 这个方法就是通知client进行Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE事件

这里我们可以简单看看事件定义的类型有哪些

我们直接看看具体的事件处理类

可以看到实现类有多个,我们直接看broker模块的DefaultConsumerIdsChangeListener类即可

可以看到这里是给该group所有的client发送Rebalance消息

具体的消息状态码是 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED

client Rebalance

通过上面我们大致找到了整个通信过程,但是实际的Rebalance是发生在client,所以我们还是需要继续回到client的代码

我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED 找到client的处理类ClientRemotingProcessor

实际处理方法就是

java复制代码this.mqClientFactory.rebalanceImmediately();

我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance线程

所以实际的方法调用还是在RebalanceServicerun方法

最终还是调用的是MQConsumerInner接口中的doRebalance方法

这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?

原来是cleint也会定时去Rebalance 默认是20s一次,可以配置

可以通过参数rocketmq.client.rebalance.waitInterval去配置

那么为什么client还要自己去循环Rebalance

原来这里是防止因为网络等其他原因丢失了broker的请求,后续网络回复了,也能进行进行Rebalance

下面我们继续看看Rebalance的实现细节

这里我们以常用的DefaultMQPushConsumerImpl为例

实际这里最终调用的还是抽象类RebalanceImpldoRebalance方法

可以看到这里的Rebalance是按照topic的维度

我们先理解订阅单个topic的原理

这里的就是先对topic的queue排序,然后对consumer排序, 然后调用AllocateMessageQueueStrategy的allocate方法 这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析

这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2

那么就是 c1:q1、q2 c2:q2、q3

需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue

client什么时候触发Rebalance

上面分析了这么多原理,这里我们总结下client什么时候会触发Rebalance

  1. consumer启动时会向所有master broker发送心跳,然后broker发送信息通知所有consumer触发Rebalance
  2. 启动完成后consumer会周期的触发Rebalance,防止因为网络等问题丢失broker的通知而没有Rebalance
  3. 当consumer停止时,也会通过之前分析的事件机制,触发注销comsuer事件然后通知所有的comsuer触发Rebalance

总结

这里我们详细介绍了client是如何触发Rebalance的,以及触发Rebalance的时机,也介绍了Rebalance的好处。 实际还有很多细节我们限于篇幅暂未分析。 后面我们会继续分析Rebalance的坏处和一些详细的Rebalance算法

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/925342
推荐阅读
相关标签
  

闽ICP备14008679号