赞
踩
每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat,group coordinator会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况
group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来
如果在两次poll操作之间,超过了这个时间,会进行重平衡
session.timeout.ms=10,
heartbeat.interval.ms=3
session.timeout.ms是个"逻辑"指标,它指定了一个阈值—10秒,在这个阈值内如果coordinator未收到consumer的任何消息,那coordinator就认为consumer挂了。而heartbeat.interval.ms是个"物理"指标,它告诉consumer要每3秒给coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多
设计的原因: 如果group coordinator在一个heartbeat.interval.ms周期内未收到consumer的心跳,就把该consumer移出group,这样设计显得不合理,有可能网络延时,有可能consumer出现了一次长时间GC,影响了心跳包的到达,就会造成误判,导致频繁的rebalance
在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的,如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 kafka consumer处理完消息,group coordinator早就将consumer 移出group了,,因为只有一个线程,在消息处理过程中就无法向group coordinator发送心跳包,超过3000ms未发送心跳包,group coordinator就将该consumer移出group了
kafka0.10.1之后的版本中,new KafkaConsumer对象后,在while true循环中执行consumer.poll拉取消息这个过程中,存在两个线程:
两个线程设计的优点
项目中经常碰到的 频繁consumer rebalance 错误
分析: 虽然是两个线程,可能消息处理线程执行时间很长,但是心跳线程一直在发送信息,看起来应该不会发生重平衡,为什么group coordinator怎么还老是将consumer移出group,然后导致不断地rebalance呢?
主要是由max.poll.interval.ms这个参数引起的,消息处理逻辑花了太长的时间,超过了max.poll.interval.ms ,那么此consumer提交offset就会失败。此外,在用户线程中,一般会做一些失败的重试处理,比如通过线程池的 ThreadPoolExecutor#afterExecute()方法捕获到异常,再次提交Runnable任务重新订阅kafka topic。那么意味着有新消费者加入group,就会引发 rebalance,而可悲的是:新的消费者还是来不及处理完所有消息,又被移出group。如此循环,就发生了不停地 rebalance 的现象
前提:重平衡无法避免。只能通过合理配置减少
session.timeout.ms 和 heartbeat.interval.ms参数设置
max.poll.interval.ms 参数值的设置 ,默认是5分钟。可以根据业务的实际处理实际,比如业务处理时间是8分钟,可以将其调成10分钟,避免频繁的rebalance
调整JVM参数,避免频繁GC导致的重平衡(JVM高手根据实际情况合理设置)
Kafka session.timeout.ms heartbeat.interval.ms参数的区别以及对数据存储的一些思考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。