赞
踩
问题和现象:
某个程序在消费kafka数据时,总是重复消费相关数据,仿佛在数据消费完毕之后,没有提交相应的偏移量。然而在程序中设置了自动提交:enable.auto.commit为true
检查日志,发现日志提示:
2020-03-26 17:20:21.414 WARN 28800 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1,
groupId=test-consumer-group] Synchronous auto-commit of offsets
{E2C-GDFS-0=OffsetAndMetadata{offset=9632, leaderEpoch=8, metadata=''}} failed:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
日志显示时当前消费者组中某个消费者自动提交出现错误,查看对应消费者信息:
可以看到,最后两个topic对应的消费者处于离线状态,程序虽然在消费数据,但无法和kafka同步偏移量等信息。
原因:
这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms,
该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
通俗点讲,就是消费者消费一次数据,对于该数据的处理过程太慢,导致消费下一条消息的时间延后,超过这个门限,默认300秒,消费者就会呗认为失败,被踢出消费者组。
解决
1、修改提交方式,改为手动提交(默认为自动提交);
2、根据实际情况,修改提交时间;
max.poll.interval.ms=300
3、kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用轮询方法poll()。每次轮询后,在处理完这一批消息后,才会继续下一次的轮询。
max.poll.records = 50
其他问题
如果一个消费者组有多个消费者,因某些原因其中一个消费者被踢出,当重启程序继续消费时,如果刚好kafka 消费者组在rebalance状态,很可能报JVM运行时错误。解决办法是,尽量将不相干的数据处理设置在不同的消费者组。
sh kafka-consumer-groups.sh --bootstrap-server node5:9092 --describe --group test-consumer-group
Warning: Consumer group 'test-consumer-group' is rebalancing.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。