赞
踩
说完消费者组,再来说说与消费者组息息相关的重平衡机制。重平衡可以说是kafka为人诟病最多的一个点了。
重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。
重平衡的触发条件主要有三个:
为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生
。
就我个人的经验来看,在实际生产环境中,因命中第 1 个条件而引发的重平衡是最常见的。另外,消费者组中的消费者实例依次启动也属于第 1 种情况,也就是说,每次消费者组启动时,必然会触发重平衡过程。
这部分内容我在专栏中已经详细介绍过了,就不再赘述了。如果你不记得的话,可以先去复习一下。
今天,我真正想引出的是另一个话题:重平衡过程是如何通知到其他消费者实例的
?答案就是,靠消费者端的心跳线程(Heartbeat Thread)。
Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着。在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是你写代码调用 KafkaConsumer.poll 方法的那个线程。
这样做有诸多弊病,最大的问题在于,消息处理逻辑也是在这个线程中完成的。因此,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”。自 0.10.1.0 版本开始,社区引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。
但这和重平衡又有什么关系呢?其实,重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
了解了什么是重平衡,重平衡的缺点和触发条件后,我们先来看看重平衡的三种不同策略,然后说说应该如何避免重平衡发生。
重平衡一旦开启,Broker 端的协调者组件就要开始忙了,主要涉及到控制消费者组的状态流转
。
当前,Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。严格来说,这套状态机属于非常底层的设计,Kafka 官网上压根就没有提到过,但你最好还是了解一下,因为它能够帮助你搞懂消费者组的设计原理,比如消费者组的过期位移(Expired Offsets)删除等。
目前,Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable
。那么,这 5 种状态的含义是什么呢?我们一起来看看下面这张表格。
了解了这些状态的含义之后,我们来看一张图片,它展示了状态机的各个状态流转。
我来解释一下消费者组启动时的状态流转过程。一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。
Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。我相信,你在 Kafka 的日志中一定经常看到下面这个输出:
Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
参考:【Kafka】kafka Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
这就是 Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执行过期位移删除的操作。
有了上面的内容作铺垫,我们就可以开始介绍重平衡流程了。重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。我们先从消费者的视角来审视一下重平衡的流程。
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案
。这两个步骤分别对应两类特定的请求:JoinGroup
请求和 SyncGroup
请求。
当组内成员加入组时,它会向协调者发送 JoinGroup
请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息
。一旦收集了全部成员的 JoinGroup
请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者
。
通常情况下,第一个发送 JoinGroup
请求的成员自动成为领导者。你一定要注意区分这里的领导者和之前我们介绍的领导者副本,它们不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者
。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案
。
选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求
。
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
接下来,我用一张图来形象地说明一下 JoinGroup 请求的处理过程。
就像前面说的,JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。
下面这张图描述的是 SyncGroup 请求的处理流程。
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
讲完这里,消费者端的重平衡流程我已经介绍完了。接下来,我们从协调者端来看一下重平衡是怎么执行的。
kafka提供了三种重平衡分配策略,这里顺便介绍一下:
具体实现位于,package org.apache.kafka.clients.consumer.RangeAssignor。
这种分配是基于每个主题的分区分配,如果主题的分区分区不能平均分配给组内每个消费者,那么对该主题,某些消费者会被分配到额外的分区。我们来看看具体的例子。
举例:目前有两个消费者C0和C1,两个主题t0和t1,每个主题三个分区,分别是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。
那么分配情况会是:
C0:t0p0, t0p1, t1p0, t1p1
C1:t0p2, t1p2
我来大概解释一下,range这种模式,消费者被分配的单位是基于主题的,拿上面的例子来说,是主题t0的三个分区分配给2个消费者,t1三个分区分配给消费者。于是便会出现消费者c0分配到主题t0两个分区,以及t1两个分区的情况(一个主题有三个分区,三个分区无法匹配两个消费者,势必有一个消费者分到两个分区),而非每个消费者分配两个主题各三个分区。
具体实现位于,package org.apache.kafka.clients.consumer.RoundRobinAssignor。
RoundRobin是基于全部主题的分区来进行分配的,同时这种分配也是kafka默认的rebalance分区策略。还是用刚刚的例子来看,
举例:两个消费者C0和C1,两个主题t0和t1,每个主题三个分区,分别是t0p0,t0p1,t0p2,和t1p0,t1p1,t1p2。
由于是基于全部主题的分区,那么分配情况会是:
C0:t0p0, t0p1, t1p1
C1:t1p0, t0p2, t1p2
因为是基于全部主题的分区来平均分配给消费者,所以这种分配策略能更加均衡得分配分区给每一个消费者。
上面说的都是同一消费者组内消费组都订阅相同主题的情况。更复杂的情况是,同一组内的消费者订阅不同的主题,那么任然可能会导致分区不均衡的情况。
还是举例说明,有三个消费者C0,C1,C2 。三个主题t0,t1,t2,分别有1,2,3个分区 t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。
其中,C0订阅t0,C1订阅t0,t1。C2订阅t0,t1,t2。最终订阅情况如下:
C0:t0p0
C1:t1p0
C2:t1p1,t2p0,t2p1,t2p2
这个结果乍一看有点迷,其实可以这样理解,按照序号顺序进行循环分配,t0只有一个分区,先碰到C0就分配给它了。t1有两个分区,被C1和C2订阅,那么会循环将两个分区分配出去,最后到t2,有三个分区,却只有C2订阅,那么就将三个分区分配给C2。
Sticky分配策略是最新的也是最复杂的策略,其具体实现位于package org.apache.kafka.clients.consumer.StickyAssignor。
这种分配策略是在0.11.0才被提出来的,主要是为了一定程度解决上面提到的重平衡非要重新分配全部分区的问题。称为粘性分配策略。
听名字就知道,主要是为了让目前的分配尽可能保持不变,只挪动尽可能少的分区来实现重平衡。
还是举例说明,有三个消费者C0,C1,C2 。三个主题t0,t1,t2,t3。每个主题各有两个分区, t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。
现在订阅情况如下:
C0:t0p0,t1p1,t3p0
C1:t0p1,t2p0,t3p1
C2:t1p0,t2p1
假设现在C1挂掉了,如果是RoundRobin分配策略,那么会变成下面这样:
C0:t0p0,t1p0,t2p0,t3p0
C2:t0p1,t1p1,t2p1,t3p1
就是说它会全部重新打乱,再分配,而如何使用Sticky分配策略,会变成这样:
C0:t0p0,t1p1,t3p0,t2p0
C2:t1p0,t2p1,t0p1,t3p1
也就是说,尽可能保留了原来的分区情况,不去改变它,在这个基础上进行均衡分配,不过这个策略目前似乎还有些bug,所以实际使用也不多。
要剖析协调者端处理重平衡的全流程,我们必须要分几个场景来讨论。这几个场景分别是新成员加入组、组成员主动离组、组成员崩溃离组、组成员提交位移。接下来,我们一个一个来讨论。
新成员入组是指组处于 Stable 状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。现在,我用一张时序图来说明协调者一端是如何处理新成员入组的。
何谓主动离组?就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员,因此我就不再赘述了,还是直接用一张图来说明。
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的,我们来看看下面这张图。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。还是老办法,我们使用一张图来说明。
在消费者启动时,某些参数会影响重平衡机制的发生,不当导致频繁重平衡,严重影响消费速度,下面跟大家说说这几个参数的一些要点:
session.timeout.ms
该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置数值小,可以更早发现消费者崩溃的信息,从而更快地开启重平衡,避免消费滞后,但是这也会导致频繁重平衡,这要根据实际业务来衡量。
max.poll.interval.ms
消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1分钟,那么该参数就需要设置成大于 1分钟的值,否则就会被 Coordinator 剔除消息组然后重平衡。
heartbeat.interval.ms
该参数跟 session.timeout.ms
紧密关联,前面也说过,只要在 session.timeout.ms
时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔的时间就是 session.timeout.ms
,因此,该参数值必须小于 session.timeout.ms
,以保持 session.timeout.ms
时间内有心跳。
要说完全避免重平衡,那是不可能滴,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以这里主要介绍如何尽力避免消费者故障。
而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制,这里也不多讨论。
首先要知道,如果消费者真正挂掉了,那我们是没有什么办法的,但实际中,会有一些情况,会让kafka错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。
当然要避免,那首先要知道哪些情况会出现错误判断挂掉的情况。在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。对这部分内容有兴趣可以看看我之前的这篇分布式系统一致性问题与Raft算法(上)。这里要说的是,在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。
还有一个参数,heartbeat.interval.ms
,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。
此外,还有最后一个参数,max.poll.interval.ms
,我们都知道消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过max.poll.interval.ms
这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。
小结一下,其实主要就是三个参数,session.timout.ms
控制心跳超时时间,heartbeat.interval.ms控制心跳发送频率,以及max.poll.interval.ms
控制poll的间隔。这里给出一个相对较为合理的配置,如下:
session.timout.ms:设置为6s
heartbeat.interval.ms:设置2s
max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟
reblance可以参考:
【kafka】kafka rebalance generation
【kafka】Kafka 之 Group 状态变化分析及 Rebalance 过程
【kafka】kafka 报错 no brokers found when trying to rebalance、broker id 丢失
【Kafka】Kafka 2.6新功能:消费者主动触发Rebalance
【kafka】记一次线上kafka一直rebalance故障 消费慢 数据积压
【kafka】kafka 控制台 消费报错 cant rebalance afer 4 retries
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。