赞
踩
Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡。
比如:我们有10个分区,当我们有一个消费者时,该消费者消费10个分区,当我们增加一个消费者,理论上每个消费者消费5个分区,这个分配的过程我们成为Rebalance(重平衡)
常见的有三种情况会触发Rebalance:
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。
最简单粗暴的就是 : 减少组成员数量发生变化
每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。
除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
当增加一个消费者进程时,broker server.log中GroupCoordinator 打印日志如下
- [2020-03-28 23:03:59,453] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 7 (__consumer_offsets-23) (reason: Adding new member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
- [2020-03-28 23:04:02,005] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 8 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
- [2020-03-28 23:04:02,008] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 8 (kafka.coordinator.group.GroupCoordinator)
在Consumer客户端Debug日志中有以下信息提示,说明已经该group产生了Rebalance
- 23:04:07,379 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@442675e1)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null)
- 23:04:07,379 DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 146 to node 2147483647
当减少一个消费者组的进程时,broker server.log中GroupCoordinator 打印日志如下
- [2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f in group test-consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
- [2020-03-28 23:04:39,367] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer in state PreparingRebalance with old generation 8 (__consumer_offsets-23) (reason: removing member consumer-1-b1c600af-a17a-465d-9b43-01fd82df0b8f on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
- [2020-03-28 23:04:43,765] INFO [GroupCoordinator 0]: Stabilized group test-consumer generation 9 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
- [2020-03-28 23:04:43,768] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer for generation 9 (kafka.coordinator.group.GroupCoordinator)
在未停止的Consumer客户端Debug日志中有以下信息提示,说明已经该group产生了Rebalance
- 23:04:56,507 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JoinGroup ((type: JoinGroupRequest, groupId=test-consumer, sessionTimeout=10000, rebalanceTimeout=10000, memberId=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@49e202ad)) to coordinator DESKTOP-I0EG1MJ.localdomain:9092 (id: 2147483647 rack: null)
- 23:24:56,507 DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=test-consumer] Sending JOIN_GROUP {group_id=test-consumer,session_timeout=10000,rebalance_timeout=10000,member_id=consumer-1-ccc4eca4-04a4-4547-910f-142c58506c14,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} with correlation id 321 to node 2147483647
注意:注意对于不同topic,使用相同consumer group,如果有一个消费者程序停止或新增,所有相同consumer group都会Rebalance
所以在我们日常开发中,不想干的业务也要避免Consumer Group 设置成不相同的
下面是我大概整理的Rebalance的知识点参考《Kafka核心技术与实战》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。