赞
踩
今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重。错误日志如下
这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms
,
该属性意思为kafka消费者在每一轮poll()
调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()
没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
https://www.jianshu.com/p/271f88f06eb3
1.增加max.poll.interval.ms
处理时长
kafka消费者 默认此间隔时长为300s,本次故障是300s都没处理完成,于是改成500s。
max.poll.interval.ms=500000
2.设置分区拉取阈值
kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用轮询方法poll()。每次轮询后,在处理完这一批消息后,才会继续下一次的轮询。
max.poll.records = 50
Unable to read additional data from client sessionid 0x00, likely client has closed socket
EndOfStreamException: Unable to read additional data from client sessionid 0x6362257b44e5068d, like
问题:
Unable to read additional data from client sessionid 0x0, likely client has closed socket
[2014-11-13 10:28:47,989] INFO Accepted socket connection from /192.168.50.33:2676 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-11-13 10:28:47,989] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn)
问题:
配置的超时时间太短,Zookeeper没有读完Consumer的数据,连接就被Consumer断开了!
解决:
在config/server.properties文件中将超时连接属性的值调大一点,如下:
zookeeper.session.timeout.ms=400000
客户端连接Zookeeper时,配置的超时时长过短。致使Zookeeper还没有读完Consumer的数据,连接就被Consumer断开了。
初始化Zookeeper连接时,将接收超时参数值调整大一些即可,默认是毫秒(ms)
在C++中,在设置第三个参数recv_timeout
时,设置大一些,比如10000ms就可以解决这里的问题。
是zoo.cfg里面的tickTime这个参数,后来我把tickTime2000改为10000之后就好了
对于目前版本的Kafka来说,consumer的rebalance的确有需要需要改进的部分,很容易想到的包括:
值得高兴的是,社区已经实现了第一个改进并将其集成进0.11.0.0版本中,也就是说用户在升级到0.11后便可以体验到这种延时rebalance的效果,主要表现为空消费组从EMPTY到STABLE的时间间隔应该显著缩短。本文将简要介绍一下该新功能以及实现原理。
新增参数:group.initial.rebalance.delay.ms
对于用户来说,这个改进最直接的效果就是新增了一个broker配置:group.initial.rebalance.delay.ms,默认是3秒钟。用户需要在server.properties文件中自行修改为想要配置的值。这个参数的主要效果就是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。目前来看,这个参数的使用还是很方便的~
WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available.
这是因为你的配置文件中的PLAINTEXT跟你请求的内容不同。举例来说,我在配置文件里配置的listeners=PLAINTEXT://10.127.96.151:9092,但是我想测试的时候请求的是./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning
正确的应该是./kafka-console-consumer.sh --bootstrap-server 10.127.96.151:9092 --topic topic1 --from-beginning
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group Notify_Notify
./kafka-consumer-groups.sh --bootstrap-server 10.0.22.11:9092 --describe --group Notify_Notify
因为配置 /etc/security/limits.conf 文件不生效的,需要在 kafka 的systemd脚本中添加 [Service] 中 LimitNOFILE=655360
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。