赞
踩
consumer group的rebalance本质上是一组协议,它规定了一个consumer group是如何达成一致来分配订阅topic的所有分区的。假设某个组下有20个consumer实例,该组订阅了一个有着l00个分区的topic。正常情况下,Kafka会为每个consumer平均分配5个分区。这个分配过程就被称为rebalance。当consumer成功地执行rebalance后,组订阅topic的每个分区只会分配给组内的一个consumer实例。
和旧版本consumer依托于ZooKeeper进行rebalance不同,新版本consumer使用了Kafka内置的一个全新的组协调协议(group coordination protocol)。对于每个组而言,Kafka的某个broker会被选举为组协调者(group coordinator)。coordinator负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作。
真实应用场景中引发rebalance最常见的原因就是违背了第一个条件,特别是consumer崩溃的情况。这里的崩溃不一定就是指consumer进程“挂掉”或consumer进程所在的机器宕机。当consumer无法在指定的时间内完成消息的处理,那么coordinator就认为该consumer已经崩溃,从而引发新一轮rebalance。生产环境中用户一定要结合自身业务特点仔细调优consumer参数request.timeout.ms
、max.poll.records
和max.poll.interval.ms
,以避免不必要的rebalance出现。
在rebalance时group下所有的consumer都会协调在一起共同参与分区分配。Kafka新版本consumer默认提供了3种分配策略,分别是range策略、round-robin策略和sticky策略。
所谓的分配策略决定了订阅topic的每个分区会被分配给哪个consumer。
通常意义上认为,如果group下所有consumer实例的订阅是相同,那么使用round-robin会带来更公平的分配方案,否则使用range策略的效果更好。此外,sticky策略在0.11.0.0版本才被引入,故目前使用的用户并不多。新版本consumer默认的分配策略是range。用户根据consumer参数partition.assignment.strategy
来进行设置。另外Kafka支持自定义的分配策略,用户可以创建自己的consumer分配器(assignor)。
针对rebalance过程中的分区分配,下面举一个简单的例子,加以说明。假设目前某个consumer group下有两个consumer:A和B。当第3个成员C加入时,满足了前面谈到的第一个触发条件,因此coordinator会执行rebalance,并根据range分配策略重新为A、B和C分配分区,如图所示。
由此可见,原先A和B分别处理3个分区的数据,rebalance之后A、B和C各自承担2个分区的消费,可以说这个分配方案非常公平,每个consumer上的负载是相同的。
某个consumer group可以执行任意次rebalance。为了更好地隔离每次rebalance上的数据,新版本consumer设计了rebalance generation用于标识某次rebalance。generation这个词类似于JVM分代垃圾收集器中“分代”(严格来说,JVM GC使用的是generational)的概念。表示rebalance之后的一届成员,在consumer中它是一个整数,通常从0开始。
Kafka引入consumer generation主要是为了保护consumer group的,特别是防止无效offset提交。比如上一届的consumer成员由于某些原因延迟提交了offset,但rebalance之后该group产生了新一届的group成员,而这次延迟的offset提交携带的是旧的generation信息,因此这次提交会
被consumer group拒绝。很多Kafka用户在使用consumer时经常碰到的ILLEGAL GENERATION异常就是这个原
因导致的。事实上,每个group进行rebalance之后,generation号都会加l,表示group进入了一个新的版本。如图所示,Generation1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入到Generation2时代,之后成员4加入,再次触发rebalance,group进入到Generation3时代。
前面提到过rebalance本质上是一组协议。group与coordinator共同使用这组协议完成group的rebalance。最新版本Kafka中提供了下面5个协议来处理rebalance相关事宜。
JoinGroup请求
:consumer请求加入组SyncGroup请求
:group leader把分配方案同步更新到组内所有成员中Heartbeat请求
:consumer定期向coordinator汇报心跳表明自己依然存货LeaveGroup请求
:consumer主动通知coordinator该consumer即将离组DescribeGroup请求
:查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。该请求类型主要供管理员使用。coordinator不使用该请求执行rebalance。在rebalance过程中,coordinator主要处理consumer发过来的JoinGroup和SyncGroup请求。当consumer主动离组时会发送LeaveGroup请求给coordinator。
在成功rebalance之后,组内所有consumer都需要定期地向coordinator发送Heartbeat请求。而每个consumer也是根据Heartbeat请求的响应中是否包含REBALANCE IN PROGRESS来判断当前group是否开启了新一轮rebalance。
consumer group在执行rebalance之前必须首先确定coordinator所在的broker,并创建与该broker相互通信的Socket连接。确定coordinator的算法与确定offset被提交到__consumer_offsets
目标分区的算法是相同的。算法如下:
Math.abs(groupID.hashCode) % offsets.topic.num.partitions
参数值(默认是50),假设是10__consumer_offsets
分区10的leader副本所在的broker,该即为这个group的coordinator。成功连接coordinator之后便可以执行rebalance操作。目前rebalance主要分为两步:加入组和同步更新分配方案。
加入组
:这一步中组内所有consumer(即group.id相同的所有consumer实例)向coordinator发送JoinGroup请求。当收集全JoinGroup请求后,coordinator从中选择一个consumer担任group的leader,并把所有成员信息以及它们的订阅信息发送给leader。特别需要注意的是,group的leader和coordinator不是一个概念。leader是某个同步更新分配方案
:这一步中leader开始制定分配方案,即根据前面提到的分配策略决定每个consumer都负责哪些topic的哪些分区。一旦分配完成,leader会把这个分配方案封装进SyncGroup请求并发送给coordinator。比较有意思的是,组内所有成员都会发送SyncGroup请求,不过只有leader发送的SyncGroup请求中包含了分配方案。consumer group分配方案是在consumer端执行的。Kafka将这个权力下放给客户端主要是因为这样做可以有更好的灵活性。比如在这种机制下用户可以自行实现类似于Hadoop那样的机架感知(rack-aware)分配方案。同一个机架上的分区数据被分配给相同机架上的consumer,减少网络传输的开销。而且,即使以后分区策略发生了变更,也只需要重启consumer应用即可,不必重启Kafka服务器。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。