前提
本文的分析基于kafka 0.9版本的client, 0.10.1.0
中已经修改心跳线程为后台线程,并支持设置max.poll.records
,参见ChangeLog。
使用场景
Kafka是一个高吞吐量的分布式消息系统,在APM的移动端请求数据的处理中,使用了Kafka。Kafka数据使用多线程阻塞的方式进行消费,即每个线程通过poll()
的形式消费一个或者多个partition
, 每次得到的消息集处理完成之后才会继续进行下一次poll()
操作,同时使用了自动提交offset
的模式。Rebalance发生的原因有可能是集群的问题,但大部分都在客户端,一旦服务端在设定的超时时间内没有收到消费者发起的心跳,则认为这个消费者已经死掉,就会执行Rebalance动作。
从源码上,我们一路从KafkaConsumer.poll(timeout)
跟进来可以看到
- /**
- * Do one round of polling. In addition to checking for new data, this does any needed
- * heart-beating, auto-commits, and offset updates.
- * @param timeout The maximum time to block in the underlying poll
- * @return The fetched records (may be empty)
- */
- private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
- ...
- // 上面是一些检查动作
- fetcher.initFetches(cluster);
- client.poll(timeout);
- return fetcher.fetchedRecords();
- }
从注释中,我们可以看出poll动作会发出一些列的心跳、自动offset提交和更新的动作。这是我们设定了自动提交的时候,我们的消费者发出心跳和offset的地方。
再进client.poll(timeout)
方法中可以看到
- //ConsumerNetworkClient.java
- private void poll(long timeout, long now, boolean executeDelayedTasks) {
- ...
- //一些前置的判断
-
- // execute scheduled tasks
- if (executeDelayedTasks)
- delayedTasks.poll(now);
-
- ...
- //其他动作
- }
从源码里面可以看到会吧delayedTask里面的所有任务执行掉,其中就有我们的心跳任务。 那么,很明显,如果我们在两次poll()
调用的间隔做了太多的事情,也就是消费拉取下来的数据花了过长的时间,而没有及时发出心跳,则我们会被判定为死掉的节点,这个时候集群就会发起Rebalance。
Rebalance有什么影响
Rebalance本身是Kafka集群的一个保护设定,用于剔除掉无法消费或者过慢的消费者,然后由于我们的数据量较大,同时后续消费后的数据写入需要走网络IO,很有可能存在依赖的第三方服务存在慢的情况而导致我们超时。
Rebalance对我们数据的影响主要有以下几点:
- 数据重复消费: 消费过的数据由于提交offset任务也会失败,在partition被分配给其他消费者的时候,会造成重复消费,数据重复且增加集群压力
- Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
- 频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
- 数据不能及时消费,会累积lag,在Kafka的TTL之后会丢弃数据
上面的