当前位置:   article > 正文

Kafka rebalance 重平衡深度解析

kafka rebalance


consumer group 是用于实现高伸缩性、高容错性的 consumer 机制。组内多个 consumer 实例可以同时读取 Kafka 消息,而且一旦有某个 consumer “挂”了,consumer group 会立即将己崩溃 consumer 负责的分区转交给其他 consumer 来负责,从而保证整个 group 可以继续工作,不会丢失数据。
consumer group 的 rebalance 本质上是一组协议,它规定了一个 consumer group 是如何达成一致来分配订阅 topic 的所有分区的 。 假设某个组下有 20 个 consumer 实例,该组订阅了一个有着 100 个分区的 topic 。 正常情况下, Kafka 会为每个 consumer 平均分配 5 个分区。这个分
配过程就被称为 rebalance 。 当 consumer 成功地执行 rebalance 后,组订阅 topic 的每个分区只会分配给组内的一个 consumer 实例。
Kafka 内置的一个全新的组协调协议(group coordination protocol) 。对于每个组而言, Kafka 的某个 broker 会被选举为组协调者(group coordinator)。coordinator 负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即 coordinator 负责对组执行 rebalance 操作。

rebalance 触发条件

组 rebalance 触发的条件有以下 3 个:

  1. 组成员发生变更,比如新 consumer 加入组,或己有 consumer 主动离开组,再或是己有 consumer 崩溃时则触发 rebalance 。
  2. 组订阅 topic 数发生变更,比如使用基于正则表达式的订阅,当匹配正则表达式的新 topic 被创建时则会触发 rebalance。
  3. 组订阅 topic 的分区数发生变更,比如使用命令行脚本增加了订阅 topic 的分区数。

真实应用场景中引发 rebalance 最常见的原因就是违背了第一个条件,特别是 consumer 崩溃的情况。这里的崩横不一定就是指 consumer 进程“挂掉”或 consumer 进程所在的机器岩机 。
当 consumer 无法在指定的时间内完成消息的处理,那么 coordinator 就认为该 consumer 己经崩溃,从而引发新一轮 rebalance。可以通过 [[消费者参数#max poll interval ms|max.poll.interval.ms]] 参数配置 consumer 处理逻辑最大时间。

分区分配策略

Kafka consumer 默认提供了 3 种分配策略,分别是 range 策略、round-robin 策略和 sticky 策略。通过 consumer 参数 partition.assignment. strategy 来进行配置。
所谓的分配策略决定了订阅 topic 的每个分区会被分配给哪个 consumer 。 range 策略主要是基于范围的思想。它将单个 topic 的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段井依次分配给每个 consumer。假设有 1 个消费者线程订阅了 2 个 topic; round-robin 策略则会把所有 topic 的所有分区顺序摆开,然后轮询式地分配给各个 consumer。我们以 spring-kafka 举例,假设我们使用@KafkaListener 同时监听了 2 个 Topic,每个 topic 的分区为 3,concurrency 设置为 6。
如果是 range 策略的话,则是 3 个线程,负责 2 个 Topic 共 6 个分区。其他 3 个线程空闲。如果是 round-robin 策略,则是每个线程分配一个分区。
另外 Kafka 支持自定义的分配策略,用户可以创建自己的 consumer 分配器(assignor)。

rebalance generation

某个 consumer group 可以执行任意次 rebalance。为了更好地隔离每次 rebalance 上的数据,新版本 consumer 设计了 rebalance generation 用于标识某次 rebalance, consumer 中它是一个整数,通常从 0 开始。
Kafka 引入 consumer generation 主要是为了保护 consumer group 的,特别是防止无效 offset 提交 。
比如上一届的 consumer 成员由于某些原因延迟提交了 offset,但 rebalance 之后该 group 产生了新一届的 group 成员,而这次延迟的 offset 提交携带的是旧的 generation 信息,因此这次提交会被 consumer group 拒绝。在使用 consumer 时经常碰到的 ILLEGAL GE 阳 RATION 异常就是这个原
因导致的。
事实上,每个 group 进行 rebalance 之后, generation 号都会加 1,表示 group 进入了一个新的版本。Generation 1 时 group 有 3 个成员,随后成员 2 退出组, coordinator 触发 rebalance, consumer group 进入到 Generation 2 时代,之后成员 4 加入,再次触发 rebalance, group 进入到 Generation 3 时代。
fuKwiS

消费者状态机

重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
xXLVg5
状态机的各个状态流转:
kXTiLC
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
如果 Kafka 日志中出现 Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.,这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。

rebalance 协议

上文提到过 rebalance 本质上是一组协议。Kafka 中提供了下面 5 个协议来处理 rebalance 相关事宜:

  • Join Group 请求:consumer 请求加入组。
  • SyncGroup 请求:group leader 把分配方案同步更新到组内所有成员中 。
  • Heartbeat 请求: consumer 定期向 coordinator 汇报心跳表明自己依然存活。
  • LeaveGroup 请求: consumer 主动通知 coordinator 该 consumer 即将离组 。
  • DescribeGroup 请求:查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。该请求类型主要供管理员使用。coordinator 不使用该请求执行 rebalance。

在 rebalance 过程中, coordinator 主要处理 consumer 发过来 JoinGroup 和 SyncGroup 请求 。当 consumer 主动离组时会发送 LeaveGroup 请求给 coordinator。在成功 rebalance 之后,组内所有 consumer 都需要定期地向 coordinator 发送[[心跳请求]]。
而每个 consumer 也是根据 Heartbeat 请求的响应中是否包含
REBALANCE_IN_PROGRESS 来判断当前 group 是否开启了新一轮 rebalance。
通过 [[消费者参数#heartbeat interval ms|heartbeat.interval.ms]] 可以控制重平衡的通知频率。

消费者端 rebalance 流程

consumer group 在执行 rebalance 之前必须首先确定 coordinator 所在的 broker,并创建与该 broker 相互通信的 Socket 连接。确定 coordinator 的算法与确定 offset 被提交到 consumer offsets 目标分区的算法是相同的,算法如下:

  • 计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions 参数值(默认是 50),假设是 10。
  • 寻找 [[__consumer_offsets]] 分区 10 的 leader 副本所在的 broker,该 broker 即为这个 group 的 coordinator。

成功连接 coordinator 之后便可以执行 rebalance 操作。目前 rebalance 主要分为两步:加入组和同步更新分配方案。

  • 加入组:这一步中组内所有 consumer (即 group.id 相同的所有 consumer 实例)向 coordinator 发送 JoinGroup 请求 。 当收集全 JoinGroup 请求后, coordinator 从中选择一个 consumer 担任 group 的 leader,并把所有成员信息以及它们的订阅信息发送给 leader。特别需要注意的是, group 的 leader 和 coordinator 不是一个概念。leader 是某个 consumer 实例, coordinator 通常是 Kafka 集群中的一个 broker 。 另外 leader 而非 coordinator 负责为整个 group 的所有成员制定分配方案。
    TGROfY
  • 同步更新分配方案:这一步中 leader 开始制定分配方案,即根据前面提到的分配策略决定每个 consumer 都负责哪些 topic 的哪些分区。 一旦分配完成, leader 会把这个分配方案封装进 SyncGroup 请求并发送给 coordinator。组内所有成员都会发送 SyncGroup 请求,不过只有 leader 发送的 SyncGroup 请求中包含了分配方案。coordinator 接收到分配方案后把属于每个 consumer 的方案单独抽取出来作为 SyncGroup 请求的 response 返还给各自的 consumer。
    BRrBSN

Broker 端重平衡场景解析

新成员入组

新成员入组是指组处于 Stable 状态后,有新成员加入。主要是组稳定了之后有新成员加入的情形。当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
0PSZxa

组成员主动离场

消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
EQBxQj

组成员崩溃离场

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由 [[消费者参数#session timeout ms|session.timeout.ms]] 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。
EDNoaO

重平衡时协调者对组内成员提交位移的处理

正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
Ie0rgb

rebalance 监听器

consumer 默认把位移提交到 [[__consumer_offsets]] 中,但是 Kafka 也支持用户把位移提交到外部存储中,比如数据库中。若要实现这个功能,用户就必须使用 rebalance 监听器 。 使用 rebalance 监听器的前提是用户使用 consumer group 。 如果使用的是独立 consumer 或是直接手动分配分区,那么 rebalance 监听器是无效的。
rebalance 监听器有一个主要的接口回调类
ConsumerRebalanceListener,里面就两个方法 onPartitionsRevoked 和 onPartitionAssigned。在 coordinator 开启新一轮 rebalance 前
onPartitionsRevoked 方法会被调用,而 rebalance 完成后会调用 onPartitionsAssigned 方法。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号