赞
踩
在Kafka中每个topic一般都会有多个分区,每个分区会按照一定规则(轮询等,也可以自定义)分配给对应的消费者,那么一旦消费者无法接收消息后,就必须对分区进行重新分配,保证消息能够正常的被消费。
比如原来一个主题中有两个分区,分别对应一个消费者组中的两个消费者,如下图,如果consumer2挂了,那么就需要通过分区再均衡的方式,让consumer3来消费partition2中的数据。
关于分区再均衡分配策略可以参考之前写的一篇专门的文章。Kafka三种分区分配策略比较
Kafka中分区再均衡主要分为两步,第一步是JoinGroup,第二步是SyncGroup,通过coordinator来协调各消费组的元数据信息以及组成员之间的关系,包括选举group leader、处理JoinGroup、SyncGroup请求等等。
最后再根据源码简单了解一下整个流程
入口方法 poll
public boolean poll(Timer timer) {
maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}
if (!client.ensureFreshMetadata(timer)) {
return false;
}
maybeUpdateSubscriptionMetadata();
}
if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
ensureActiveGroup
boolean ensureActiveGroup(final Timer timer) {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
if (!ensureCoordinatorReady(timer)) {
return false;
}
startHeartbeatThreadIfNeeded();
return joinGroupIfNeeded(timer);
}
如有必要,准备开始JoinGroup
joinGroupIfNeeded
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
//如果开启了自动提交,再均衡之前会提交offset
//如果实现了ConsumerRebalanceListener接口,重写了onPartitionsRevoked方法,可以在分区再均衡之前
//拿到主题分区,处理offset等。。。
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
//发送加入group请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
if (future.succeeded()) {
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// We reset the join group future only after the completion callback returns. This ensures
// that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
timer.sleep(retryBackoffMs);
}
}
return true;
}
onJoinPrepare
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled
//如果开启了自动提交,再均衡之前提交offset
maybeAutoCommitOffsetsSync(time.timer(rebalanceTimeoutMs));
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
Set<TopicPartition> revoked = subscriptions.assignedPartitions();
log.info("Revoking previously assigned partitions {}", revoked);
try {
//可以重写的方法,分区再均衡开始前调用
listener.onPartitionsRevoked(revoked);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
}
isLeader = false;
subscriptions.resetGroupSubscription();
}
initiateJoinGroup
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
// we store the join future in case we are woken up by the user after beginning the
// rebalance in the call to poll below. This ensures that we do not mistakenly attempt
// to rejoin before the pending rebalance has completed.
if (joinFuture == null) {
// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
// Note that this must come after the call to onJoinPrepare since we must be able to continue
// sending heartbeats if that callback takes some time.
disableHeartbeatThread();
//客户端开始分区再均衡了。。。
state = MemberState.REBALANCING;
//向coordinator发送JoinGroup的请求
joinFuture = sendJoinGroupRequest();
//处理完成后的回调
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
//客户端已经加入到Group中,并开始发送心跳
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
sendJoinGroupRequest
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(groupId)
.setSessionTimeoutMs(this.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
// Note that we override the request timeout using the rebalance timeout since that is the
// maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
//请求完成后,接着由JoinGroupResponseHandler处理
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler());
}
JoinGroupResponseHandler
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
//如果是leader,负责分配方案。leader和follower都会发送SyncGroup请求
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
"to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
onJoinLeader
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(assignment.getKey())
.setAssignment(Utils.toArray(assignment.getValue()))
);
}
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
//分配的方案
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
onJoinFollower
private RequestFuture<ByteBuffer> onJoinFollower() {
// send follower's sync group with an empty assignment
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
//follower不负责分配方案
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
当SyncGroup完成后,回到joinGroupIfNeeded主线方法中。
调用onJoinComplete方法
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
handleAssignmentMismatch(assignment);
return;
}
Set<TopicPartition> assignedPartitions = subscriptions.assignedPartitions();
// The leader may have assigned partitions which match our subscription pattern, but which
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment, generation);
// reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
try {
//可以重写的方法,分区再均衡完成后调用。
listener.onPartitionsAssigned(assignedPartitions);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。