赞
踩
Kafka 为了保证集群节点宕机情况下消息生产消费功能的可用性,实现了一套失败转移机制,这套机制运行的流程如下图所示,实现的关键则在于以下两个请求:
节点注册请求 BrokerRegistration
broker 节点启动时会向集群 Leader 发送注册请求,集群 Leader 处理请求并将节点状态维护在本地列表中,请求完成后 broker 节点会定时向集群 Leader 报告心跳节点心跳请求 BrokerHeartbeat
Leader 节点收到节点心跳时将检查本地注册列表中是否有节点超时,如有超时则需要进行下线处理,将 Leader 副本分布在失败节点上的分区重新进行选主,同时生成元数据的变动记录,关于元数据变动传播生效的机制可参考 Kafka 3.0 源码笔记(9)-Kafka 服务端元数据的主从同步
以下为 kafka 实现分区副本 Leader 失败选举机制的源码时序示意,可以看到重要处理分为两个部分:
- 节点注册请求 BrokerRegistration 的处理
- 节点心跳请求 BrokerHeartbeat 的交互
通过process.roles
属性配置了 broker 角色的 Kafka 节点会在启动的时候创建 BrokerServer
对象,并执行 BrokerServer.scala#startup()
方法进行基本的初始化工作
这个方法的源码比较长,笔者在 Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析 中进行过相关分析,其中和本文相关的部分是调用
BrokerLifecycleManager.scala#start()
方法启动 broker 生命周期管理器
def startup(): Unit = { if (!maybeChangeStatus(SHUTDOWN, STARTING)) return try { info("Starting broker") ...... lifecycleManager.start(() => metadataListener.highestMetadataOffset(), BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), metaProps.clusterId, networkListeners, supportedFeatures) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) ...... // Block until we've caught up with the latest metadata from the controller quorum. lifecycleManager.initialCatchUpFuture.get() // Apply the metadata log changes that we've accumulated. metadataPublisher = new BrokerMetadataPublisher(config, metadataCache, logManager, replicaManager, groupCoordinator, transactionCoordinator, clientQuotaMetadataManager, featureCache, dynamicConfigHandlers.toMap) // Tell the metadata listener to start publishing its output, and wait for the first // publish operation to complete. This first operation will initialize logManager, // replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform // a potentially lengthy recovery-from-unclean-shutdown operation here, if required. metadataListener.startPublishing(metadataPublisher).get() // Log static broker configurations. new KafkaConfig(config.originals(), true) // Enable inbound TCP connections. socketServer.startProcessingRequests(authorizerFutures) // We're now ready to unfence the broker. This also allows this broker to transition // from RECOVERY state to RUNNING state, once the controller unfences the broker. lifecycleManager.setReadyToUnfence() maybeChangeStatus(STARTING, STARTED) } catch { case e: Throwable => maybeChangeStatus(STARTING, STARTED) fatal("Fatal error during broker startup. Prepare to shutdown", e) shutdown() throw e } }
BrokerLifecycleManager.scala#start()
方法看上去比较简单,关键在于创建 StartupEvent
事件对象,并将该事件投递到异步队列 KafkaEventQueue
中。有关于这个事件队列,笔者在 Kafka 3.0 源码笔记(8)-Kafka 服务端集群 Leader 对 CreateTopics 请求的处理 中详细分析了其运行原理,此处不再赘述,读者只需要知道事件被消费处理时会执行 StartupEvent#run()
方法
def start(highestMetadataOffsetProvider: () => Long,
channelManager: BrokerToControllerChannelManager,
clusterId: String,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]): Unit = {
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
channelManager, clusterId, advertisedListeners, supportedFeatures))
}
StartupEvent#run()
方实现如下,可以看到核心处理是向异步队列 KafkaEventQueue
中投递一个延时任务,这个任务的核心在于执行 BrokerLifecycleManager.scala#sendBrokerRegistration()
方法
private class StartupEvent(highestMetadataOffsetProvider: () => Long, channelManager: BrokerToControllerChannelManager, clusterId: String, advertisedListeners: ListenerCollection, supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event { override def run(): Unit = { _highestMetadataOffsetProvider = highestMetadataOffsetProvider _channelManager = channelManager _channelManager.start() _state = BrokerState.STARTING _clusterId = clusterId _advertisedListeners = advertisedListeners.duplicate() _supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures) eventQueue.scheduleDeferred("initialRegistrationTimeout", new DeadlineFunction(time.nanoseconds() + initialTimeoutNs), new RegistrationTimeoutEvent()) sendBrokerRegistration() info(s"Incarnation ${incarnationId} of broker ${nodeId} in cluster ${clusterId} " + "is now STARTING.") } }
BrokerLifecycleManager.scala#sendBrokerRegistration()
方法源码如下,关键处理其实分为两步:
- 组装 BrokerRegistration 请求对象
- 调用
BrokerToControllerChannelManager.scala#sendRequest()
方法向集群 Leader 发起网络请求,并设置请求响应的处理器为BrokerRegistrationResponseHandler
。此处发起网络请求的部分主要依赖底层NetworkClient
,笔者在Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 有提及,此处不再赘述
private def sendBrokerRegistration(): Unit = { val features = new BrokerRegistrationRequestData.FeatureCollection() _supportedFeatures.asScala.foreach { case (name, range) => features.add(new BrokerRegistrationRequestData.Feature(). setName(name). setMinSupportedVersion(range.min()). setMaxSupportedVersion(range.max())) } val data = new BrokerRegistrationRequestData(). setBrokerId(nodeId). setClusterId(_clusterId). setFeatures(features). setIncarnationId(incarnationId). setListeners(_advertisedListeners). setRack(rack.orNull) if (isTraceEnabled) { trace(s"Sending broker registration ${data}") } _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data), new BrokerRegistrationResponseHandler()) }
集群 Leader 处理完 BrokerRegistration 请求,将响应送达发起请求的节点时,将触发请求发起节点的BrokerRegistrationResponseHandler#onComplete()
方法对响应数据进行处理。这个方法比较简单,可以看到无论 broker 节点是否成功注册到 Leader 节点,核心都是定时发起下一次的交互请求
private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { if (response.authenticationException() != null) { error(s"Unable to register broker ${nodeId} because of an authentication exception.", response.authenticationException()); scheduleNextCommunicationAfterFailure() } else if (response.versionMismatch() != null) { error(s"Unable to register broker ${nodeId} because of an API version problem.", response.versionMismatch()); scheduleNextCommunicationAfterFailure() } else if (response.responseBody() == null) { warn(s"Unable to register broker ${nodeId}.") scheduleNextCommunicationAfterFailure() } else if (!response.responseBody().isInstanceOf[BrokerRegistrationResponse]) { error(s"Unable to register broker ${nodeId} because the controller returned an " + "invalid response type.") scheduleNextCommunicationAfterFailure() } else { val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { failedAttempts = 0 _brokerEpoch = message.data().brokerEpoch() registered = true initialRegistrationSucceeded = true info(s"Successfully registered broker ${nodeId} with broker epoch ${_brokerEpoch}") scheduleNextCommunicationImmediately() // Immediately send a heartbeat } else { info(s"Unable to register broker ${nodeId} because the controller returned " + s"error ${errorCode}") scheduleNextCommunicationAfterFailure() } } }
Kafka 集群的 Leader 节点收到 BrokerRegistration 请求,经过底层网络组件的协议解析后会将其分发到 ControllerApis.scala#handleBrokerRegistration()
方法进行处理,此处核心是调用 QuorumController.java#registerBroker()
方法完成节点注册的业务处理
def handleBrokerRegistration(request: RequestChannel.Request): Unit = { val registrationRequest = request.body[BrokerRegistrationRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) controller.registerBroker(registrationRequest.data).handle[Unit] { (reply, e) => def createResponseCallback(requestThrottleMs: Int, reply: BrokerRegistrationReply, e: Throwable): BrokerRegistrationResponse = { if (e != null) { new BrokerRegistrationResponse(new BrokerRegistrationResponseData(). setThrottleTimeMs(requestThrottleMs). setErrorCode(Errors.forException(e).code)) } else { new BrokerRegistrationResponse(new BrokerRegistrationResponseData(). setThrottleTimeMs(requestThrottleMs). setErrorCode(NONE.code). setBrokerEpoch(reply.epoch)) } } requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e)) } }
QuorumController.java#registerBroker()
方法的处理完全遵照 Kafka 3.0 源码笔记(8)-Kafka 服务端集群 Leader 对 CreateTopics 请求的处理 中提到的异步处理框架,核心业务被封装在 ControllerWriteEvent
事件中,可以看到有以下关键几步:
- 调用
ClusterControlManager.java#registerBroker()
方法将发起请求节点添加到本地列表中,维护其状态- 调用
QuorumController.java#rescheduleMaybeFenceStaleBrokers()
方法检查本地列表中是否有超时的节点,超时节点将被置为FENCED 状态
,并进行分区副本 Leader 的重新选举,关于这个方法下文将详细分析
@Override
public CompletableFuture<BrokerRegistrationReply>
registerBroker(BrokerRegistrationRequestData request) {
return appendWriteEvent("registerBroker", () -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, writeOffset + 1, featureControl.
finalizedFeatures(Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
});
}
ClusterControlManager.java#registerBroker()
方法的关键分为如下几步,至此 broker 节点注册到了 Leader 本地列表,BrokerRegistration 请求处理基本结束
- 首先根据请求携带的 brokerId 查找本地列表中的
BrokerRegistration
节点注册信息,如果这个节点已经注册了,则通过BrokerHeartbeatManager.java#hasValidSession()
方法检查该节点是否超时未报告心跳- 生成节点注册元数据记录
RegisterBrokerRecord
对象,这个记录对象将在ControllerWriteEvent
事件处理完成后在异步处理框架中写入到元数据 topic( __cluster_metadata),后续会重放到内存当中- 调用
BrokerHeartbeatManager.java#touch()
方法维护节点状态BrokerHeartbeatState
public ControllerResult<BrokerRegistrationReply> registerBroker( BrokerRegistrationRequestData request, long brokerEpoch, FeatureMapAndEpoch finalizedFeatures) { if (heartbeatManager == null) { throw new RuntimeException("ClusterControlManager is not active."); } int brokerId = request.brokerId(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (existing != null) { if (heartbeatManager.hasValidSession(brokerId)) { if (!existing.incarnationId().equals(request.incarnationId())) { throw new DuplicateBrokerRegistrationException("Another broker is " + "registered with that broker id."); } } else { if (!existing.incarnationId().equals(request.incarnationId())) { // Remove any existing session for the old broker incarnation. heartbeatManager.remove(brokerId); existing = null; } } } RegisterBrokerRecord record = new RegisterBrokerRecord().setBrokerId(brokerId). setIncarnationId(request.incarnationId()). setBrokerEpoch(brokerEpoch). setRack(request.rack()); for (BrokerRegistrationRequestData.Listener listener : request.listeners()) { record.endPoints().add(new BrokerEndpoint(). setHost(listener.host()). setName(listener.name()). setPort(listener.port()). setSecurityProtocol(listener.securityProtocol())); } for (BrokerRegistrationRequestData.Feature feature : request.features()) { Optional<VersionRange> finalized = finalizedFeatures.map().get(feature.name()); if (finalized.isPresent()) { if (!finalized.get().contains(new VersionRange(feature.minSupportedVersion(), feature.maxSupportedVersion()))) { throw new UnsupportedVersionException("Unable to register because " + "the broker has an unsupported version of " + feature.name()); } } record.features().add(new BrokerFeature(). setName(feature.name()). setMinSupportedVersion(feature.minSupportedVersion()). setMaxSupportedVersion(feature.maxSupportedVersion())); } if (existing == null) { heartbeatManager.touch(brokerId, true, -1); } else { heartbeatManager.touch(brokerId, existing.fenced(), -1); } List<ApiMessageAndVersion> records = new ArrayList<>(); records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion())); return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch)); }
在 2.1.1 节第5步中,节点注册成功则触发 BrokerLifecycleManager.scala#scheduleNextCommunicationImmediately()
执行,可以看到这个方法只是入口,核心其实是通过 BrokerLifecycleManager.scala#scheduleNextCommunication()
方法向异步队列 KafkaEventQueue
中投递延时事件 CommunicationEvent
,这个事件被消费时将触发 CommunicationEvent#run()
执行
private def scheduleNextCommunicationImmediately(): Unit = scheduleNextCommunication(0)
private def scheduleNextCommunication(intervalNs: Long): Unit = {
trace(s"Scheduling next communication at ${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " +
"ms from now.")
val deadlineNs = time.nanoseconds() + intervalNs
eventQueue.scheduleDeferred("communication",
new DeadlineFunction(deadlineNs),
new CommunicationEvent())
}
CommunicationEvent#run()
方法如下,可以看到这里会对当前节点的注册状态进行判断,进而决定是否发送心跳,这也就是其上游触发点不需要关心节点是否注册成功的原因
private class CommunicationEvent extends EventQueue.Event {
override def run(): Unit = {
if (registered) {
sendBrokerHeartbeat()
} else {
sendBrokerRegistration()
}
}
}
BrokerLifecycleManager.scala#sendBrokerHeartbeat()
方法发送 BrokerHeartbeat 请求 的处理流程其实和 BrokerRegistration 请求 类似,关键如下:
- 组装 BrokerHeartbeat 请求对象
- 调用
BrokerToControllerChannelManager.scala#sendRequest()
方法向集群 Leader 发起网络请求,并设置请求响应的处理器为BrokerHeartbeatResponseHandler
private def sendBrokerHeartbeat(): Unit = {
val metadataOffset = _highestMetadataOffsetProvider()
val data = new BrokerHeartbeatRequestData().
setBrokerEpoch(_brokerEpoch).
setBrokerId(nodeId).
setCurrentMetadataOffset(metadataOffset).
setWantFence(!readyToUnfence).
setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
if (isTraceEnabled) {
trace(s"Sending broker heartbeat ${data}")
}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
new BrokerHeartbeatResponseHandler())
}
BrokerHeartbeatResponseHandler#onComplete()
方法负责对 BrokerHeartbeat请求
的响应数据进行处理,可以看到这个方法的核心也是定时发起下一次交互,要么重新注册,要么向集群 Leader 报告心跳,不再赘述
private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { if (response.authenticationException() != null) { error(s"Unable to send broker heartbeat for ${nodeId} because of an " + "authentication exception.", response.authenticationException()); scheduleNextCommunicationAfterFailure() } else if (response.versionMismatch() != null) { error(s"Unable to send broker heartbeat for ${nodeId} because of an API " + "version problem.", response.versionMismatch()); scheduleNextCommunicationAfterFailure() } else if (response.responseBody() == null) { warn(s"Unable to send broker heartbeat for ${nodeId}. Retrying.") scheduleNextCommunicationAfterFailure() } else if (!response.responseBody().isInstanceOf[BrokerHeartbeatResponse]) { error(s"Unable to send broker heartbeat for ${nodeId} because the controller " + "returned an invalid response type.") scheduleNextCommunicationAfterFailure() } else { val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { failedAttempts = 0 _state match { case BrokerState.STARTING => if (message.data().isCaughtUp()) { info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") _state = BrokerState.RECOVERY initialCatchUpFuture.complete(null) } else { debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") } // Schedule the heartbeat after only 10 ms so that in the case where // there is no recovery work to be done, we start up a bit quicker. scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) case BrokerState.RECOVERY => if (!message.data().isFenced()) { info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") _state = BrokerState.RUNNING } else { info(s"The broker is in RECOVERY.") } scheduleNextCommunicationAfterSuccess() case BrokerState.RUNNING => debug(s"The broker is RUNNING. Processing heartbeat response.") scheduleNextCommunicationAfterSuccess() case BrokerState.PENDING_CONTROLLED_SHUTDOWN => if (!message.data().shouldShutDown()) { info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + "for the active controller.") if (!gotControlledShutdownResponse) { // If this is the first pending controlled shutdown response we got, // schedule our next heartbeat a little bit sooner than we usually would. // In the case where controlled shutdown completes quickly, this will // speed things up a little bit. scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) } else { scheduleNextCommunicationAfterSuccess() } } else { info(s"The controlled has asked us to exit controlled shutdown.") beginShutdown() } gotControlledShutdownResponse = true case BrokerState.SHUTTING_DOWN => info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.") case _ => error(s"Unexpected broker state ${_state}") scheduleNextCommunicationAfterSuccess() } } else { warn(s"Broker ${nodeId} sent a heartbeat request but received error ${errorCode}.") scheduleNextCommunicationAfterFailure() } } }
ControllerApis.scala#handleBrokerRegistration()
方法负责BrokerHeartbeat请求的处理,其源码如下,可以看到和处理节点注册的流程高度一致,此处将触发 QuorumController.java#processBrokerHeartbeat()
方法执行
def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = { val heartbeatRequest = request.body[BrokerHeartbeatRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit] { (reply, e) => def createResponseCallback(requestThrottleMs: Int, reply: BrokerHeartbeatReply, e: Throwable): BrokerHeartbeatResponse = { if (e != null) { new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(). setThrottleTimeMs(requestThrottleMs). setErrorCode(Errors.forException(e).code)) } else { new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(). setThrottleTimeMs(requestThrottleMs). setErrorCode(NONE.code). setIsCaughtUp(reply.isCaughtUp). setIsFenced(reply.isFenced). setShouldShutDown(reply.shouldShutDown)) } } requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e)) } }
QuorumController.java#processBrokerHeartbeat()
方法核心的业务逻辑如下:
- 调用
ReplicationControlManager#processBrokerHeartbeat()
方法依据请求数据更新节点信息- 调用
QuorumController.java#rescheduleMaybeFenceStaleBrokers()
方法检查本地列表中是否有超时的节点,超时节点将被置为FENCED 状态
,并需要进行分区副本 Leader 的重新选举
@Override public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(BrokerHeartbeatRequestData request) { return appendWriteEvent("processBrokerHeartbeat", new ControllerWriteOperation<BrokerHeartbeatReply>() { private final int brokerId = request.brokerId(); private boolean inControlledShutdown = false; @Override public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() { ControllerResult<BrokerHeartbeatReply> result = replicationControl. processBrokerHeartbeat(request, lastCommittedOffset); inControlledShutdown = result.response().inControlledShutdown(); rescheduleMaybeFenceStaleBrokers(); return result; } @Override public void processBatchEndOffset(long offset) { if (inControlledShutdown) { clusterControl.heartbeatManager(). updateControlledShutdownOffset(brokerId, offset); } } }); }
ReplicationControlManager#processBrokerHeartbeat()
方法实现如下,关键处理分为以下几步:
- 调用
BrokerHeartbeatManager.java#calculateNextBrokerState()
方法计算报告心跳的节点接下来的状态- 如果节点当前状态和接下来的状态不一致,则说明节点即将发生状态变化,需要进行对应的处理,生成元数据变动记录,后续将其封装到事件处理结果对象
- 更新维护节点状态到本地列表
ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat( BrokerHeartbeatRequestData request, long lastCommittedOffset) { int brokerId = request.brokerId(); long brokerEpoch = request.brokerEpoch(); clusterControl.checkBrokerEpoch(brokerId, brokerEpoch); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId)); List<ApiMessageAndVersion> records = new ArrayList<>(); if (states.current() != states.next()) { switch (states.next()) { case FENCED: handleBrokerFenced(brokerId, records); break; case UNFENCED: handleBrokerUnfenced(brokerId, brokerEpoch, records); break; case CONTROLLED_SHUTDOWN: generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]", brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); break; case SHUTDOWN_NOW: handleBrokerFenced(brokerId, records); break; } } heartbeatManager.touch(brokerId, states.next().fenced(), request.currentMetadataOffset()); boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset; BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp, states.next().fenced(), states.next().inControlledShutdown(), states.next().shouldShutDown()); return ControllerResult.of(records, reply); }
回到本节步骤2第2步,QuorumController.java#rescheduleMaybeFenceStaleBrokers()
方法的核心在于延时执行 ReplicationControl.java#maybeFenceOneStaleBroker()
方法检测心跳超时节点,并递归调用自身以便处理多个节点下线的情况
private void rescheduleMaybeFenceStaleBrokers() {
long nextCheckTimeNs = clusterControl.heartbeatManager().nextCheckTimeNs();
if (nextCheckTimeNs == Long.MAX_VALUE) {
cancelMaybeFenceReplicas();
return;
}
scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
// This following call ensures that if there are multiple brokers that
// are currently stale, then fencing for them is scheduled immediately
rescheduleMaybeFenceStaleBrokers();
return result;
});
}
ReplicationControlManager.java#maybeFenceOneStaleBroker()
方法的关键处理如下:
- 首先调用
BrokerHeartbeatManager.java#findOneStaleBroker()
检测是否存在心跳超时的节点- 如果存在则触发失败转移机制,取列表中超时时间最长的一个节点出来调用
ReplicationControlManager.java#handleBrokerFenced()
进行下线处理
ControllerResult<Void> maybeFenceOneStaleBroker() {
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is visible
// to the system prior to processing the next one
log.info("Fencing broker {} because its session has timed out.", brokerId);
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
});
return ControllerResult.of(records, null);
}
BrokerHeartbeatManager.java#findOneStaleBroker()
方法比较简单,可以看到就是从头遍历 unfenced
节点列表,通过 BrokerHeartbeatManager.java#hasValidSession()
方法判断节点是否超时
Optional<Integer> findOneStaleBroker() { BrokerHeartbeatStateIterator iterator = unfenced.iterator(); if (iterator.hasNext()) { BrokerHeartbeatState broker = iterator.next(); // The unfenced list is sorted on last contact time from each // broker. If the first broker is not stale, then none is. if (!hasValidSession(broker)) { return Optional.of(broker.id); } } return Optional.empty(); } private boolean hasValidSession(BrokerHeartbeatState broker) { if (broker.fenced()) { return false; } else { return broker.lastContactNs + sessionTimeoutNs >= time.nanoseconds(); } }
ReplicationControlManager.java#handleBrokerFenced()
开始进行失败转移处理,核心在于 ReplicationControlManager.java#generateLeaderAndIsrUpdates()
方法调用
void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
if (brokerRegistration == null) {
throw new RuntimeException("Can't find broker registration for broker " + brokerId);
}
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(brokerId).setEpoch(brokerRegistration.epoch()),
FENCE_BROKER_RECORD.highestSupportedVersion()));
}
ReplicationControlManager.java#generateLeaderAndIsrUpdates()
方法主要负责为副本 Leader 分布在失败节点上的分区重新选举 Leader 副本,此处关键处理为 PartitionChangeBuilder.java#build()
方法调用
void generateLeaderAndIsrUpdates(String context, int brokerToRemove, int brokerToAdd, List<ApiMessageAndVersion> records, Iterator<TopicIdPartition> iterator) { int oldSize = records.size(); // If the caller passed a valid broker ID for brokerToAdd, rather than passing // NO_LEADER, that node will be considered an acceptable leader even if it is // currently fenced. This is useful when handling unfencing. The reason is that // while we're generating the records to handle unfencing, the ClusterControlManager // still shows the node as fenced. // // Similarly, if the caller passed a valid broker ID for brokerToRemove, rather // than passing NO_LEADER, that node will never be considered an acceptable leader. // This is useful when handling a newly fenced node. We also exclude brokerToRemove // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. Function<Integer, Boolean> isAcceptableLeader = r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r)); while (iterator.hasNext()) { TopicIdPartition topicIdPart = iterator.next(); TopicControlInfo topic = topics.get(topicIdPart.topicId()); if (topic == null) { throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in isrMembers, but not in the topics map."); } PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId()); if (partition == null) { throw new RuntimeException("Partition " + topicIdPart + " existed in isrMembers, but not in the partitions map."); } PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicIdPart.topicId(), topicIdPart.partitionId(), isAcceptableLeader, () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)); // Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new // target ISR will be the same as the old one). builder.setTargetIsr(Replicas.toList( Replicas.copyWithout(partition.isr, brokerToRemove))); builder.build().ifPresent(records::add); } if (records.size() != oldSize) { if (log.isDebugEnabled()) { StringBuilder bld = new StringBuilder(); String prefix = ""; for (ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize); iter.hasNext(); ) { ApiMessageAndVersion apiMessageAndVersion = iter.next(); PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message(); bld.append(prefix).append(topics.get(record.topicId()).name).append("-"). append(record.partitionId()); prefix = ", "; } log.debug("{}: changing partition(s): {}", context, bld.toString()); } else if (log.isInfoEnabled()) { log.info("{}: changing {} partition(s)", context, records.size() - oldSize); } } }
PartitionChangeBuilder.java#build()
方法中和副本 Leader 选举相关的重点如下:
- 调用
PartitionChangeBuilder.java#shouldTryElection()
方法判断当前分区副本 Leader 是否在 ISR 列表中,不在则需要重新选举副本 Leader- 调用
PartitionChangeBuilder.java#tryElection()
方法执行重新选举分区副本 Leader 的动作- 生成
PARTITION_CHANGE_RECORD
元数据变动记录,记录分区信息的变化。该元数据变动在集群节点上重放时会检测 Leader 变动,分区 Leader 每变动一次版本号自增1。分区版本号主要和 Kafka 的异常恢复机制有关,感兴趣的读者可参考 Kafka 3.0 源码笔记(12)-Kafka 服务端分区异常恢复机制的源码分析
public Optional<ApiMessageAndVersion> build() { PartitionChangeRecord record = new PartitionChangeRecord(). setTopicId(topicId). setPartitionId(partitionId); completeReassignmentIfNeeded(); if (shouldTryElection()) { tryElection(record); } triggerLeaderEpochBumpIfNeeded(record); if (!targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { record.setIsr(targetIsr); } if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) { record.setReplicas(targetReplicas); } if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) { record.setRemovingReplicas(targetRemoving); } if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) { record.setAddingReplicas(targetAdding); } if (changeRecordIsNoOp(record)) { return Optional.empty(); } else { return Optional.of(new ApiMessageAndVersion(record, PARTITION_CHANGE_RECORD.highestSupportedVersion())); } }
PartitionChangeBuilder.java#tryElection()
方法实际完成分区副本 Leader 的选举,此处分为两个步骤:
- 新建 BestLeader 对象,在它的构造方法中完成 Leader 选举
- 判断新选举的 Leader 是否和分区的当前 Leader 相同,不相同才需要将其更新到元数据变动记录中传播出去
private void tryElection(PartitionChangeRecord record) {
BestLeader bestLeader = new BestLeader();
if (bestLeader.node != partition.leader) {
record.setLeader(bestLeader.node);
if (bestLeader.unclean) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(bestLeader.node));
}
}
}
BestLeader
选举比较简单,可以看到其实就是遍历本地分区的副本列表选取合适的副本即可,不过这里分为以下两种模式,至此分区副本失败选主的流程基本结束
- 从 ISR 列表中选取
第一个合适的节点(UNFENCED 状态)
,默认模式- 从所有节点中选取
第一个合适的节点(UNFENCED 状态)
,以上模式选举失败才进入,默认关闭
class BestLeader { final int node; final boolean unclean; BestLeader() { for (int replica : targetReplicas) { if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) { this.node = replica; this.unclean = false; return; } } if (uncleanElectionOk.get()) { for (int replica : targetReplicas) { if (isAcceptableLeader.apply(replica)) { this.node = replica; this.unclean = true; return; } } } this.node = NO_LEADER; this.unclean = false; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。