赞
踩
主要是kafka消费者的joinGroup这里的重点内容太多了,一篇也讲不完,所以再开一篇从几个方面来深入分析。
从上一篇我们知道针对consumeGroup也有好几种状态,分别是PreparingRebalance,CompletingRebalance,Stable,Dead,Empty
kafka总结的各个状态可以执行的操作及状态流转如图一
kafka中实现延时操作的类为DelayedOperation,在joinGroup请求处理中的延时操作在prepareRebalance方法中,代码如下,可以看到这里有个判断,如果group是Empty
态,则使用InitialDelayedJoin,否则使用DelayedJoin。group在什么时候会处于empty态呢?在组成员都离开组或者组创建的时候会处于这个状态。这也说明了kafka
针对组初始化时及组成员掉线或者退出之后再重平衡会有不同的处理。
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
info(s"rebalanceTimeout:${group.rebalanceTimeoutMs}---config:${groupConfig.groupInitialRebalanceDelayMs}")
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
//开启延时监控
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
我们先说明一下DelayedJoin中三个方法的作用,tryComplete是延时操作中给出的可以中途结束延时操作的方法,支持在某种情况下提前完成延时操作,onExpiration是指到截止时间后的处理,onComplete
表示完成时的处理,可手动完成或者延时处理到截止时间时完成。
我们来对比一下两种延时操作有什么不一样的?首先在prepareRebalance代码中可以看到两者初始化的参数不一样,然后InitialDelayedJoin是继承DelayedJoin的,且还复写了tryComplete
及onComplete方法。下面我们就从这三个方面来分析不同之处,深入理解为什么要这样处理。
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
purgatory: DelayedOperationPurgatory[DelayedJoin],
group: GroupMetadata,
configuredRebalanceDelay: Int,
delayMs: Int,
remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {
override def tryComplete(): Boolean = false
override def onComplete(): Unit = {
group.inLock {
if (group.newMemberAdded && remainingMs != 0) {
group.newMemberAdded = false
val delay = min(configuredRebalanceDelay, remainingMs)
val remaining = max(remainingMs - delayMs, 0)
purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
purgatory,
group,
configuredRebalanceDelay,
delay,
remaining
), Seq(GroupKey(group.groupId)))
} else
super.onComplete()
}
}
}
private[group] class DelayedJoin(coordinator: GroupCoordinator,
group: GroupMetadata,
rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
override def onExpiration() = coordinator.onExpireJoin()
override def onComplete() = coordinator.onCompleteJoin(group)
}
groupConfig.groupInitialRebalanceDelayMs为服务端group.initial.rebalance.delay.ms参数,默认是3000ms。
group.rebalanceTimeoutMs如果是consume客户端的话对应max.poll.interval.ms配置,默认是300000ms,即五分钟。
InitialDelayedJoin中有几个参数需要重点关注一下,第一个groupConfig.groupInitialRebalanceDelayMs表示配置的时间,第二个groupConfig
.groupInitialRebalanceDelayMs表示延时的时间,max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
表示还剩余的时间,可以看到剩余时间是用客户端的group.rebalanceTimeoutMs减去服务端的groupInitialRebalanceDelayMs。
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
在InitialDelayedJoin中总是为false,也就是不支持提前完成,而在DelayedJoin中代码如下,如果判定所有成员都加入的话就可提前完成延时操作
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
if (group.hasAllMembersJoined)
forceComplete()
else false
}
}
判断提前完成延时操作的代码如下,即判定成员的个数等于待加入的成员个数(members及numMembersAwaitingJoin都会在添加或者updateMember的时候更新)
且pendingMembers为空,pendingMembers在没有memberId且不是静态成员的时候第一次加入组请求会被记录在这个list里面
def hasAllMembersJoined: Boolean ={
members.size == numMembersAwaitingJoin && pendingMembers.isEmpty
}
在 InitialDelayedJoin中会判断如果是首次加入组且还有剩余的时间,则会再次发起InitialDelayedJoin延时,否则直接调用DelayedJoin中的onComplete
完成延时操作,从这里来看,我们其实也可以把groupConfig.groupInitialRebalanceDelayMs配置的稍微长一些,比如配置个10000ms,减少组初次加入时rebalance的次数。
override def onComplete(): Unit = {
group.inLock {
if (group.newMemberAdded && remainingMs != 0) {
group.newMemberAdded = false
val delay = min(configuredRebalanceDelay, remainingMs)
val remaining = max(remainingMs - delayMs, 0)
purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
purgatory,
group,
configuredRebalanceDelay,
delay,
remaining
), Seq(GroupKey(group.groupId)))
} else
super.onComplete()
}
}
官方给出的说法是配置groupInstanceId后会有更长的超时时间,减少rebalance的次数。今天我们就来仔细分析是如何实现的。
在上一节的源码分析中我们知道kafka对于 groupInstanceId的处理无非就以下两种
1、无memberId时生成memberId,然后将memberId与groupInstanceId的关系记录在staticMembers(HashMap)
中,如果原先已有memberId,则生成新的memberId并更新与groupInstanceId的关系
2、存在memberId时,则跟不配置groupInstanceId的消费者一样处理,调用updateMemberAndRebalance方法
综上,针对groupInstanceId实际就多了一层 staticMembers的映射关系,为什么要这么做呢,首先memberId是kafka服务端针对消费者设置的唯一id,在消费者接入时会由服务端来分配这个id,id
会记在消费者的内存中,一旦消费者重启则会丢失id,则需要向服务端重新请求加入组,我们知道在这种情况下,消费者会请求两次,第一次用空的memberId去请求服务端获取memberId,第二次再带有这个memberId
去请求加入组。如果我们配置了groupInstanceId,kafka服务端就会在首次请求时记录这个groupInstanceId与memberId
的关系,无需消费者再次带着memberId请求,不仅会在首次加入组时减少一次网络请求,还会在中途客户端重启的情况下,在限定的时间内重新加入组时,只更新staticMembers的关系,而不发起rebalance
,从而减少了消费者集群rebalance的次数,这种性能提升无疑是很必要的。
针对消费组rebalance有两个重要的参数需要注意
1、session.timeout.ms 消费者离开组的最大时间间隔,如果我们平常消费者重启需要10分钟左右,则可设置为10分钟,在10分钟以内且带有groupInstanceId的消费者重新接入组,集群不会发生rebalance,
这里还有一点,session.timeout.ms 的取值需要在服务端的group.min.session.timeout.ms,group.max.session.timeout.ms范围内,两边都可等。
2、max.poll.interval.ms 两次poll的最大时间间隔,若超过这个时间间隔还未发起poll操作,则消费组认为该消费组已离开了消费组,将进行再均衡操作,max.poll.interval
.ms参数在请求加入组时会作为rebalance_timeout_ms的值,而group的rebalanceTimeoutMs则是取当前还存在的member的最大rebalanceTimeoutMs,这一点也很重要。这块代码如下:
//kafka.coordinator.group.GroupMetadata#rebalanceTimeoutMs
def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
timeout.max(member.rebalanceTimeoutMs)
}
//1、消费者1、2、3都启动成功
[data-plane-kafka-request-handler-7] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]:
Stabilized group mykafka-group_4 generation 22 (__consumer_offsets-37)–2022-02-05 15:54:47
[data-plane-kafka-request-handler-3] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Assignment received from leader for group mykafka-group_4 for generation 22
//2、率先发现消费者3已离开,发起rebalance
[executor-Heartbeat] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member mykafka-group_4_3-6086fe20-5371-4c5d-a818-91936cffbc16 in group mykafka-group_4 has failed, removing it from the group
[executor-Heartbeat] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Preparing to rebalance group mykafka-group_4 in state PreparingRebalance with old generation 22 (__consumer_offsets-37) (reason: removing member mykafka-group_4_3-6086fe20-5371-4c5d-a818-91936cffbc16 on heartbeat expiration)
[executor-Heartbeat] INFO kafka.server.DelayedOperationPurgatory - tryCompleteElseWatch----------start------2022-02-05 15:57:56
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—0—
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—0—
[executor-Heartbeat] INFO kafka.server.DelayedOperationPurgatory - operation—delayMs:120000
[data-plane-kafka-request-handler-4] INFO kafka.server.KafkaApis - [KafkaApi-1] receive joinGroupRequest: {group_id=mykafka-group_4,session_timeout_ms=10000,rebalance_timeout_ms=60000,member_id=mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a,group_instance_id=null,protocol_type=consumer,protocols=[{name=sticky,metadata=java.nio.HeapByteBuffer[pos=0 lim=48 cap=48],_tagged_fields={}}],_tagged_fields={}}
[data-plane-kafka-request-handler-4] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—1—
[data-plane-kafka-request-handler-4] INFO kafka.server.KafkaApis - [KafkaApi-1] receive joinGroupRequest: {group_id=mykafka-group_4,session_timeout_ms=10000,rebalance_timeout_ms=60000,member_id=mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a,group_instance_id=null,protocol_type=consumer,protocols=[{name=sticky,metadata=java.nio.HeapByteBuffer[pos=0 lim=48 cap=48],_tagged_fields={}}],_tagged_fields={}}
[data-plane-kafka-request-handler-4] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—1—
//3、rebalance超时时间为2分钟,2分钟之后结束rebalance
[executor-Rebalance] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Stabilized group mykafka-group_4 generation 23 (__consumer_offsets-37)–2022-02-05 15:59:56
[data-plane-kafka-request-handler-3] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Assignment received from leader for group mykafka-group_4 for generation 23
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Joining group with current subscription: [topic_1]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Sending JoinGroup request:JoinGroupRequestData(groupId=‘mykafka-group_4’, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, groupInstanceId=null, protocolType=‘consumer’, protocols=[JoinGroupRequestProtocol(name=‘sticky’, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 25, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 22, 0, 0, 0, 0])])
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=23, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_1-a16c0f93-1b71-44cf-b527-32bf8ab7a6e1’, groupInstanceId=‘mykafka-group_4_1’, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 25, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 21, 0, 0, 0, 0]), JoinGroupResponseMember(memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 25, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 22, 0, 0, 0, 0])])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Finished assignment for group at generation 23: {mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a=Assignment(partitions=[topic_1-2, topic_1-1]), mykafka-group_4_1-a16c0f93-1b71-44cf-b527-32bf8ab7a6e1=Assignment(partitions=[topic_1-0])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Successfully joined group with generation 23
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Adding newly assigned partitions: topic_1-2, topic_1-1
//1、消费者1、2、3都启动成功
[data-plane-kafka-request-handler-7] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Stabilized group mykafka-group_4 generation 26 (__consumer_offsets-37)–2022-02-05 16:18:53
[data-plane-kafka-request-handler-3] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Assignment received from leader for group mykafka-group_4 for generation 26
//2、率先发现消费者3已离开,发起rebalance
[executor-Heartbeat] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member mykafka-group_4_3-42965fb6-3f33-4929-a6d6-c5eb961e743a in group mykafka-group_4 has failed, removing it from the group
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - member----mykafka-group_4_1-b84d78ec-7a88-4d4b-ad6e-e1df6c5f8769—120000
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - member----mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a—60000
[executor-Heartbeat] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Preparing to rebalance group mykafka-group_4 in state PreparingRebalance with old generation 26 (__consumer_offsets-37) (reason: removing member mykafka-group_4_3-42965fb6-3f33-4929-a6d6-c5eb961e743a on heartbeat expiration)
[executor-Heartbeat] INFO kafka.server.DelayedOperationPurgatory - tryCompleteElseWatch----------start------2022-02-05 16:22:26
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—0—
[executor-Heartbeat] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—0—
[executor-Heartbeat] INFO kafka.server.DelayedOperationPurgatory - operation—delayMs:120000
[data-plane-kafka-request-handler-6] INFO kafka.server.KafkaApis - [KafkaApi-1] receive joinGroupRequest: {group_id=mykafka-group_4,session_timeout_ms=10000,rebalance_timeout_ms=60000,member_id=mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a,group_instance_id=null,protocol_type=consumer,protocols=[{name=sticky,metadata=java.nio.HeapByteBuffer[pos=0 lim=48 cap=48],_tagged_fields={}}],_tagged_fields={}}
[data-plane-kafka-request-handler-6] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—1—
[data-plane-kafka-request-handler-0] INFO kafka.server.KafkaApis - [KafkaApi-1] receive joinGroupRequest: {group_id=mykafka-group_4,session_timeout_ms=10000,rebalance_timeout_ms=60000,member_id=mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a,group_instance_id=null,protocol_type=consumer,protocols=[{name=sticky,metadata=java.nio.HeapByteBuffer[pos=0 lim=48 cap=48],_tagged_fields={}}],_tagged_fields={}}
[data-plane-kafka-request-handler-0] INFO kafka.coordinator.group.GroupMetadata - hasAllMembersJoined----2—1—
//3、rebalance超时时间为2分钟,2分钟之后结束rebalance
[executor-Rebalance] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Stabilized group mykafka-group_4 generation 27 (__consumer_offsets-37)–2022-02-05 16:24:26
[data-plane-kafka-request-handler-3] INFO kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Assignment received from leader for group mykafka-group_4 for generation 27
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Joining group with current subscription: [topic_1]
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Sending JoinGroup request:JoinGroupRequestData(groupId=‘mykafka-group_4’, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, groupInstanceId=null, protocolType=‘consumer’, protocols=[JoinGroupRequestProtocol(name=‘sticky’, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 25, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 26, 0, 0, 0, 0])])
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=27, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 25, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 26, 0, 0, 0, 0])])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Finished assignment for group at generation 27: {mykafka-group_4_2-529ff7e3-87bd-4984-9dbe-e8afe7d1e12a=Assignment(partitions=[topic_1-2, topic_1-0, topic_1-1])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Successfully joined group with generation 27
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=mykafka-group_4_2, groupId=mykafka-group_4] Adding newly assigned partitions: topic_1-2, topic_1-1, topic_1-0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。