赞
踩
组 rebalance 触发的条件有以下 3 个:
真实应用场景中引发 rebalance 最常见的原因就是违背了第一个条件,特别是 consumer 崩溃的情况。这里的崩横不一定就是指 consumer 进程“挂掉”或 consumer 进程所在的机器岩机 。
当 consumer 无法在指定的时间内完成消息的处理,那么 coordinator 就认为该 consumer 己经崩溃,从而引发新一轮 rebalance。可以通过 [[消费者参数#max poll interval ms|max.poll.interval.ms]] 参数配置 consumer 处理逻辑最大时间。
Kafka consumer 默认提供了 3 种分配策略,分别是 range 策略、round-robin 策略和 sticky 策略。通过 consumer 参数 partition.assignment. strategy 来进行配置。
所谓的分配策略决定了订阅 topic 的每个分区会被分配给哪个 consumer 。 range 策略主要是基于范围的思想。它将单个 topic 的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段井依次分配给每个 consumer。假设有 1 个消费者线程订阅了 2 个 topic; round-robin 策略则会把所有 topic 的所有分区顺序摆开,然后轮询式地分配给各个 consumer。我们以 spring-kafka 举例,假设我们使用@KafkaListener 同时监听了 2 个 Topic,每个 topic 的分区为 3,concurrency 设置为 6。
如果是 range 策略的话,则是 3 个线程,负责 2 个 Topic 共 6 个分区。其他 3 个线程空闲。如果是 round-robin 策略,则是每个线程分配一个分区。
另外 Kafka 支持自定义的分配策略,用户可以创建自己的 consumer 分配器(assignor)。
某个 consumer group 可以执行任意次 rebalance。为了更好地隔离每次 rebalance 上的数据,新版本 consumer 设计了 rebalance generation 用于标识某次 rebalance, consumer 中它是一个整数,通常从 0 开始。
Kafka 引入 consumer generation 主要是为了保护 consumer group 的,特别是防止无效 offset 提交 。
比如上一届的 consumer 成员由于某些原因延迟提交了 offset,但 rebalance 之后该 group 产生了新一届的 group 成员,而这次延迟的 offset 提交携带的是旧的 generation 信息,因此这次提交会被 consumer group 拒绝。在使用 consumer 时经常碰到的 ILLEGAL GE 阳 RATION 异常就是这个原
因导致的。
事实上,每个 group 进行 rebalance 之后, generation 号都会加 1,表示 group 进入了一个新的版本。Generation 1 时 group 有 3 个成员,随后成员 2 退出组, coordinator 触发 rebalance, consumer group 进入到 Generation 2 时代,之后成员 4 加入,再次触发 rebalance, group 进入到 Generation 3 时代。
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转。当前,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
状态机的各个状态流转:
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。
如果 Kafka 日志中出现 Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
,这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。
上文提到过 rebalance 本质上是一组协议。Kafka 中提供了下面 5 个协议来处理 rebalance 相关事宜:
在 rebalance 过程中, coordinator 主要处理 consumer 发过来 JoinGroup 和 SyncGroup 请求 。当 consumer 主动离组时会发送 LeaveGroup 请求给 coordinator。在成功 rebalance 之后,组内所有 consumer 都需要定期地向 coordinator 发送[[心跳请求]]。
而每个 consumer 也是根据 Heartbeat 请求的响应中是否包含
REBALANCE_IN_PROGRESS 来判断当前 group 是否开启了新一轮 rebalance。
通过 [[消费者参数#heartbeat interval ms|heartbeat.interval.ms]] 可以控制重平衡的通知频率。
consumer group 在执行 rebalance 之前必须首先确定 coordinator 所在的 broker,并创建与该 broker 相互通信的 Socket 连接。确定 coordinator 的算法与确定 offset 被提交到 consumer offsets 目标分区的算法是相同的,算法如下:
成功连接 coordinator 之后便可以执行 rebalance 操作。目前 rebalance 主要分为两步:加入组和同步更新分配方案。
新成员入组是指组处于 Stable 状态后,有新成员加入。主要是组稳定了之后有新成员加入的情形。当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员。
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由 [[消费者参数#session timeout ms|session.timeout.ms]] 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
consumer 默认把位移提交到 [[__consumer_offsets]] 中,但是 Kafka 也支持用户把位移提交到外部存储中,比如数据库中。若要实现这个功能,用户就必须使用 rebalance 监听器 。 使用 rebalance 监听器的前提是用户使用 consumer group 。 如果使用的是独立 consumer 或是直接手动分配分区,那么 rebalance 监听器是无效的。
rebalance 监听器有一个主要的接口回调类
ConsumerRebalanceListener,里面就两个方法 onPartitionsRevoked 和 onPartitionAssigned。在 coordinator 开启新一轮 rebalance 前
onPartitionsRevoked 方法会被调用,而 rebalance 完成后会调用 onPartitionsAssigned 方法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。