在使用了最新版的 kafka-python 1.4.6 在 broker 对 topic 进行默认配置的情况下报出类似错误
CommitFailedError CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.
这里要申明一点,在 1.4.0 以上的 kafka-python 版本使用了独立的心跳线程去上报心跳。
这里报错大概表达的意思是 无法在默认 300000ms 中完成处理操作。我们通常会一次性 poll 拉默认 500 条数据下来。我们需要在 300s 中完成 500 条数据的处理。如果不能完成的话就可能会触发这个问题。
因为这个报错的提示写得非常清楚,所以我们先按这个方向去尝试处理这个问题。首先调高了我们的 max_poll_interval_ms 的时间,但是无效。
然后 records 的条数减少,依然无效,该报错还是会报错。这不禁让我怀疑触发这个问题的是否并非这里报错建议的那些地方。
所以我把目光放到了 broker 日志那边去,想看下到底是因为什么原因导致爆出类似错误。
在日志上发现了一些日志,对应的 consumer 在反复的 rebalance:
[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-05ed83f1-aa90-4950-b097-4cf467598082 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-f7826720-fef7-4b02-8104-d1f38065c2fe in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Preparing to rebalance group sync_group_20180321 with old generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-ac5f6aff-3600-4e67-a529-31674c72b1e4 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,716] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1091 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator) [2019-08-18 09:19:39,721] INFO [GroupCoordinator 0]: Assignment received from leader for group sync_group_20180321 for generation 1091 (kafka.coordinator.group.GroupCoordinator)
参考 sentry 打出来的错误,我们可以认为这和 sentry 爆出来的问题有直接关系。因此我们要从另外一个角度去思考一下为什么我的 max_poll_interval_ms 已经调高并且每次拉取处理条数降低却依然会报出此问题,并且频繁触发 rebalance 。
kafka-python 在 1.4.0 版本分离了心跳线程和 poll 主线程。我的第一反应就是会不会因为 poll 线程阻塞了心跳线程的切换,或者引起了某种死锁从而导致心跳线程无法正确的发起心跳。最后导致 broker 认为 group 已经死亡然后主动触发了 rebalance .
然后我去 kafka-python 的 gihub 搜索了一下类似问题,马上就发现了有不少人都有这个问题。
https://github.com/dpkp/kafka-python/issues/1418
从中找到一些有趣的信息,比如来自
I am seeing consumer rebalances even if there is no messages to consume. Start three consumers in a group and send some messages to topic and after that stop the producer. The consumer will start seeing rebalances after 5-6mins. Sample code here: https://stackoverflow.com/questions/54908902/kafka-consumer-rebalance-occurs-even-if-there-is-no-message-to-consume
他说即使在没有消息可以消费的情况下,也可以看到 kafka consumer 在过了 5 - 6 mins 之后开启了 rebalance 。
这就跟我们的问题非常相似,我们并不是 process 的过程消耗的时间过长而触发了 rebalance 而是有可能是因为消费得太快,导致有些消费者处于 空 poll 的状态从而阻塞了心跳线程。客观来说,我目前还会报出这个问题的 topic 有多达 50 个partitions,我开启了5个消费者对其进行消费,平均一个消费者需要消费 10 个parititons 。如果有某个 partitions 长期没有消费过来我们可能会被阻塞在那里最终导致 heartbeat 超时。 1.4.6 的客户端默认 10s 没心跳就超时,而发送间隔仅为 3s 。也就是连续三个周期没有发送就超时了。
下面看到 dpkp 的一个回复,表达了有可能就是被 poll 主线程阻塞,的问题,并且有 workaround 可以用来避免这种情况:
vimal: thanks for posting. I believe you may be hitting lock contention between an idle client.poll -- which can block and hold the client lock for the entire request_timeout_ms -- and the attempt by the heartbeat thread to send a new request. It seems to me that we may need to use KafkaClient.wakeup() to make sure that the polling thread drops the lock if/when we need to send a request from a different thread. This shouldn't be an issue when messages are flowing through your topics at a steady rate. If this is just a test environment, and you expect your production environment to have more steady live data, then you could just ignore the error in testing. But if you are managing a topic w/ very low traffic -- delays of minutes between consecutive messages, for example -- you might try to reduce the request_timeout_ms to something closer to the heartbeat_interval_ms, which should prevent the read side from blocking for much longer than the heartbeat timeout. But note that other timeouts may also need to change (max_poll_interval_ms and session_timeout_ms perhaps). Another workaround might be to reduce metadata_max_age_ms to something close / equal to your heartbeat_timeout_ms. This will cause more frequent metadata requests, but should unblock the send side when there is no socket data available for reads.
dpkp 的观点在于,如果我们数据发送过来的频率是稳定的,消费者是正好可以消费完队列里面的信息的情况的时候,不应该出现这样的问题。出现这样的问题与我们预期和看到报错的情况可能恰恰相反,不是我们消费得太慢,而是我们消费得太快,并且生产者发送消息的频率过低导致的。在 poll 不到消息的时候,主线程可能会面临阻塞,而无法及时切换到心跳线程进行心跳的发送,最终导致了这个问题。
他给到一个 trick 的方法来解决这个问题,当面临这种情况的时候我们可以把 metadata_max_age_ms 调整到和心跳发送频率差不多 close / equal to our heartbeat_timeout_ms.
发送 metadata_request 会解除我们发送端的阻塞,从而达到抑制死锁的效果。
self.kafka = kafka.KafkaConsumer( auto_offset_reset=auto_offset_reset, bootstrap_servers=['10.171.97.1:9092', '10.163.13.219:9092', '10.170.249.122:9092'], group_id=group_id, metadata_max_age_ms=metadata_max_age_ms ) self.kafka.subscribe(topics)
尝试补充了 metadata_max_age_ms 大约 3000 ms ,这个问题得到了很大程度的解决和缓解。
既然确定了可能是因为消费太快,然后生产慢导致的主线程锁住的问题,剩下可以验证一下是否真的是这样。尝试打日志看一下切换线程发送心跳的情况可以来确认该问题是否如此。
另外看代码发现 poll 主线程在 poll 之前会默认会进行 wakeup() 但是在 1.4.6里面也因为之前的 某个 bug 而默认关闭了,不知道是否有影响,等后续测试之后补上。
Reference:
https://github.com/dpkp/kafka-python/issues/1418 Heartbeat failed for group xxxWorker because it is rebalancing
https://github.com/dpkp/kafka-python/issues/1760 [1.4.5] KafkaProducer raises KafkaTimeoutError when attempting wakeup()
https://www.cnblogs.com/huxi2b/p/6815797.html Kafka 0.11版本新功能介绍 —— 空消费组延时rebalance