赞
踩
因为公司的项目是由一个第三方的旧系统迁移过渡开发过来的,而且时间很急,所以有许多数据需要修正。为了不影响线上的业务,修复数据的逻辑是在另一个应用的,可以通过管理系统圈定数据范围,然后发送到kafka。
但是某一天kafka上的数据一直在堆积,高峰期一直下不去。查询了很多日志,发现很多消费者的消费速度异常地慢,而且存在重复消费的情况(业务上是允许重复消费)。最后发现kafka不断地在重平衡,导致数据一直不能尽快被消费。
那么,是什么导致了系统频繁重平衡呢?
要想知道什么是重平衡rebalance
,那就要先了解消费组consumer group
。
多个消费者consumer
组成一个消费组,它们共同消费一个topic
,一个topic
的一个parition
只能被一个consumer
消费。
kafka为消费组定义了5种状态,他们分别是:Empty,Dead, PreparingRebalance,CompletingRebalance,Stable.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QZuZ09lF-1648635120578)(https://note.youdao.com/yws/res/22077/WEBRESOURCEbf97371f8e6740ae16c614478f5ccb52)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CkgDQMoR-1648635120580)(https://note.youdao.com/yws/res/22092/WEBRESOURCEad062645525be744596521507474549a)]
生产者、kafka broker、消费者
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ikHw5DXs-1648635120581)(https://note.youdao.com/yws/res/22095/WEBRESOURCE63d131b701c2004ff89546fe98ca5503)]
rebalance 其实就是对 partition 进行重新分配
重平衡时,消费者端会发出JoinGroup
请求加入组,发送SyncGroup
请求同步领导消费组(Leader Consumer)分配的方案。
当组内成员加入组时,会向将自己订阅的主题上报。协作者收集完组内的JoinGrop
后,会选择其中一个作为该消费者组的领导者。通常情况下,第一个发送 JoinGroup
请求的成员自动成为领导者。消费组领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案,然后开始发送`SyncGroup请求。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OT9wCAxC-1648635120581)(https://note.youdao.com/yws/res/22113/WEBRESOURCE13f5ab0821280f083cc42e48e269b482)]
消费组领导者和其他组员发送SyncGroup
请求同步分组消费信息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IJUAtOYU-1648635120582)(https://note.youdao.com/yws/res/22115/WEBRESOURCEac31cee9b906f04b176d0ba67cf1875f)]
三种情况会触发rebalance:
相比起之前的两个情况,这种情况在实际情况中更加常见。因为订阅分区数、以及订阅 topic 数都是我们主动改变才会发生,而组内消费组成员个数发生变化,则是更加随机的。
「消费组内成员个数发生变化」的几种情况:
新成员入组是指组处于 Stable
状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。
当协调者收到新的 JoinGroup
请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的
何谓主动离组?就是指消费者实例所在线程或进程调用 close()
方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup
请求。协调者收到 LeaveGroup
请求后,依然会以心跳响应的方式通知其他成员。
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数session.timeout.ms
控制的。
对于「新成员加入」、「组成员主动离开」都是我们主动触发的,能比较好地控制。但是「组成员崩溃」则是我们预料不到的,遇到问题的时候也比较不好排查。
re-balance问题与kafka消费组配置的四个参数有关:
session.timeout.ms
设置了超时时间
heartbeat.interval.ms
心跳时间间隔
max.poll.interval.ms
每次消费的处理时间
max.poll.records
每次消费的消息数
session.timeout.ms
表示 consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。
heartbeat.interval.ms
表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms
的值是heartbeat.interval.ms
值的 3 倍以上。
max.poll.interval.ms
表示 consumer 每两次 poll 消息的时间间隔。简单地说,其实就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么下次就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。
max.poll.records
表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms
设置的时间内能消费完,否则会发生 rebalance。
简单来说,会导致崩溃的几个点是:
消费者心跳超时,导致 rebalance。
消费者处理时间过长,导致 rebalance。
我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。
而 kafka 的消费者参数设置中,跟心跳相关的两个参数为:
session.timeout.ms
设置了超时时间
heartbeat.interval.ms
心跳时间间隔
如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起重平衡。
而 kafka 的消费者参数设置中,跟消费处理的两个参数为:
max.poll.interval.ms
每次消费的处理时间
max.poll.records
每次消费的消息数
不过Kafka从0.10.1.0开始,heartbeat就由独立的线程处理了,不受poll影响。
顺着上面的思路,我们知道当消费者处理时间过长时,而项目里的消费逻辑的是很耗时和不可控的,所以可以做一下猜测:
消费后的数据,当offset还没有提交时,partition
就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费
回归到项目代码和配置中。
@KafkaListener(
topics = "xxxx"
containerFactory = "kafkaDataUpdateListenerContainerFactory"
)
public void loadDataUpdate(List<ConsumerRecord<?,?>> records, Acknowledgment ack){
// 反序列化收到的数据
// 任务分发,可能会分发到一些处理时间很长的方法中去
// 记录日志
}
从代码中发现消费者使用的是自动提交,而且是没有配置session.timeout.ms
的。因为消费kafka的方法内有耗时任务,导致了offset还没有提交,与partition
失联。
找到问题产生的原因,其实就很好解决。只要在消费者正确解析了收到的数据后,立刻调用Acknowledgment.ack.acknowledge()
方法提交offset就好了。
kafka自动提交导致重平衡,kafka堆积问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。