赞
踩
Rebalance 的触发条件有三种:
组成员崩溃外,其它都是主动触发的,能比较好地控制。
组成员崩溃 则是预料不到、意外发生的,遇到问题的时候也不好排查。但对于组成员崩溃也是有一些通用的处理策略
Rebalance 如何通知其他 consumer 进程?
Rebalance 的通知机制是靠 Consumer 端的心跳线程
rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
对于每个Consumer Group,Coordinator会存储以下信息:
简单来说分为两步:
Rebalance 本质上也是一组协议。Consumer Group 与 Coordinator 共同使用它来完成 Consumer Group 的 Rebalance
Coordinator 在 Rebalance 的时候主要用到了前面4种请求
Rebalance 一旦发生,必定会涉及到 Consumer Group 的状态流转,此时 Kafka 为我们设计了一套完整的状态机机制,来帮助 Broker Coordinator 完成整个重平衡流程
session.timeout.ms: consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会引发 rebalance。
heartbeat.interval.ms: consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。
max.poll.interval.ms : consumer 每两次 poll 消息的时间间隔。简单地说就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么此值就要相应延长。否则如果时间到了但 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。
max.poll.records: 每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会引发 rebalance。
所以消费者心跳超时、消费者处理时间过长都会引起rebalance。
消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。
在kafka 的消费者参数设置中,与心跳相关的两个参数为:
需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持心跳。
一般来说,超时时间应该是心跳间隔的 3 倍时间,因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。
session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。
如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起rebalance。
在 kafka 的消费者参数设置中,与消费处理的两个参数为:
对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。
除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,是单线程去消费消息和执行心跳的,如果线程卡在处理消息,那么这时候即使到时间要心跳了,还是没有线程可以去执行心跳操作。
对于 rebalance 类问题的处理策略,简单来讲就是处理好心跳超时问题和消费处理超时问题
对于心跳超时问题。一般是调整超时时间(session.timeout.ms)和心跳间隔时间(heartbeat.interval.ms)的比例及数值。
阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
对于消费处理超时问题。一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)。
阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力(records < 单个线程每秒消费的条数 * 消费线程的个数 * session.timeout的秒数)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。