赞
踩
当broker
节点是partition
的副本节点但并不是其leader
节点时,此时在当前对应的broker
节点中会加入或启动fetch
线程来向leader节点发起副本同步请求.
当节点初始化完成启动或监听到partition
状态变更后(当前节点变更为follower
节点时),会通过replicaFetcherManager
启动副本同步线程.
启动fetch
线程的代码片段:
partitionsToStartFetching.forKeyValue { (topicPartition, partition) => val nodeOpt = partition.leaderReplicaIdOpt //partition记录的leader节点的brokerId. .flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) .flatMap(_.node(listenerName).asScala) nodeOpt match { case Some(node) => val log = partition.localLogOrException partitionAndOffsets.put(topicPartition, InitialFetchState( log.topicId, /*leader节点的链接信息*/ new BrokerEndPoint(node.id, node.host, node.port), partition.getLeaderEpoch, //当前leader的最新的leaderEpoch值. initialFetchOffset(log) //初始同步的offset为log.endOffset. )) case None => stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${ partition.topicId} " + s"from leader ${ partition.leaderReplicaIdOpt} because it is not alive.") } } //启动或加入已经存在的线程. replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
此组件在brokerServer
中用于处理当前节点中管理的partitions
副本与其leader
进行副本同步的线程(可以理解为一个线程池).
向每个broker
进行fetch
操作的线程数由num.replica.fetchers
配置,默认值为1
,当前broker的fetch
线程总数量为numFetchers
*brokerSize
.
函数作用于监听到当前节点有新的followerPartition
时,向目标leader节点启动fetch
同步副本的线程(此过程需要对ReplicaFetcherManager
加锁).
函数生成FetcherThread
的流程见代码片段的注释:
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = { lock synchronized { //Step=>1,根据`topicPartition`的hash值(time33)找到其对应`numFetchers`对应在leaderBroker线程的solt, val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) => BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition)) } //当solt对应的fetch线程不存在时,用于创建线程(createFetcherThread),并按brokerIdAndFetcherId添加到fetcherThreadMap中. def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId): T = { val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread) fetcherThread.start() fetcherThread } //Step=>2,检查`brokerIdAndFetcherId`对应的`fetcherThread`是否存在, //==>如果存在,直接将topicPartition加入到线程的队列中. //==>如果不存在,通过`addAndStartFetcherThread`启动一个新的`fetcherThread`,并把topicPartition添加到线程的队列中. for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) { val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match { case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker => // reuse the fetcher thread currentFetcherThread case Some(f) => f.shutdown() addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) case None => addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) } //将topicParttion添加到`fetcherThread`的队列中. addPartitionsToFetcherThread(fetcherThread, initialFetchOffsets) } } }
从其实现代码可以看出:,函数的核心逻辑主要分为两个步骤:
=>1,
执行createFetcherThread
函数初始化启动fetcherThread
线程,并将线程添加到fetcherThreadMap
容器中.
==>1,1, 初始化BrokerBlockingSender
实例,此实例生成client端网络通信,作用于向目标broker节点发起网络请求.
==>1,2, 初始化FetchSessionHandler
实例,Kafka FetchSession增量拉取分区的实现(client端辅助处理程序).
当partition数据没有变化时,通过session可以减少fetch
请求传入的partition的数量.
==>1,3, 初始化RemoteLeaderEndPoint
实例,此实例处理fetch
的具体实现.
==>1,4, 初始化ReplicaFetcherThread
线程,在addFetcherForPartitions
中会添加到fetcherThreadMap
容器并启动线程.
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") val threadName = s"${ prefix}ReplicaFetcherThread-$fetcherId-${ sourceBroker.id}" val logContext = new LogContext(s"[ReplicaFetcher replicaId=${ brokerConfig.brokerId}, leaderId=${ sourceBroker.id}, " + s"fetcherId=$fetcherId] ") val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId, s"broker-${ brokerConfig.brokerId}-fetcher-$fetcherId", logContext) val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, replicaManager, quotaManager, metadataVersionSupplier) new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager, quotaManager, logContext.logPrefix, metadataVersionSupplier) }
=>2,
执行addPartitionsToFetcherThread
函数,将要从leader
进行副本同步的topicPartition
添加到fetcherThread
线程中.
protected def addPartitionsToFetcherThread(fetcherThread: T,
initialOffsetAndEpochs: collection.Map[TopicPartition, InitialFetchState]): Unit = {
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${
fetcherThread.leader.brokerEndPoint().id} for partitions $initialOffsetAndEpochs")
}
从函数的代码实现能看到,其主要调用fetcherThread
中的addPartitions
函数来将topicPartition
添加到线程中.
==>ReplicaFetcherThread.addPartitions
函数代码实现:
def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = { partitionMapLock.lockInterruptibly() try { failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => //初始时,`currentState = null` val currentState = partitionStates.stateValue(tp) //初始化partition对应的`PartitionFetchState`,此时: //==>如果当前节点中对应partition的"leader-epoch-checkpoint"文件中有记录`epochAndOffset`时 //=====>对应的`ReplicaState`为`Fetching`,否则为`Truncating`. //==>当`ReplicaState`为`Truncating`时,将会在线程首次处理时转换为`Fetching`将initOffset设置为高水位线. val updatedState = partitionFetchState(tp, initialFetchState, currentState) partitionStates.updateAndMoveToEnd(tp, updatedState) } partitionMapCond.signalAll() initialFetchStates.keySet } finally partitionMapLock.unlock() }
至此:ReplicaFetcherThread
线程的doWark
函数将开始处理向leader
进行副本同步的操作.
这时的fetch
操作分为两个阶段:
=>1,
当前partition
在本地副本中"leader-epoch-checkpoint"
文件有历史记录的epochAndOffset
值,
此时会向leader发起一个OffsetsForLeaderEpoch
请求(此时表示当前节点的fetch请求的initOffset从上次同步时epoch的endOffset位置开始).
=>2,
当前partition
在本地副本是一个新分配的副本,此时"leader-epoch-checkpoint"
文件未记录任何epochAndOffset
,
此时表示未进行过任何的副本同步,直接从本地副本记录的logEndOffset
位置开始进行fetch
.
此请求通常发生在follower
节点在向leader
同步过程中本地副本状态ReplicaState
变更为Truncating
,同时:
其partition
对应"leader-epoch-checkpoint"
文件有记录最后更新的epochAndOffset
值时(表示节点已经进行过副本同步或其以前本身是leader)
传入参数:
topicPartition
=> 表示要同步lastEpoch
对应的topicPartition
.
currentLeaderEpoch
=> 当前节点记录的最新leader
节点的leaderEpoch
.
leaderEpoch
=> 当前节点在"leader-epoch-checkpoint"
文件中记录的最后一个更新时对应的leaderEpoch
这个值原则上小于或等于currentLeaderEpoch
的值.
在follower
节点中,发起OffsetsForLeaderEpoch
请求由RemoteLeaderEndPoint
中的fetchEpochEndOffsets
函数来实现.
其实现的代码片段:
val topics = new OffsetForLeaderTopicCollection(partitions.size) partitions.forKeyValue { (topicPartition, epochData) => var topic = topics.find(topicPartition.topic) if (topic == null) { topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) topics.add(topic) } topic.partitions.add(epochData) } //生成OffsetsForLeaderEpoch请求 val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower( metadataVersionSupplier().offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId) debug(s"Sending offset for leader epoch request $epochRequest") //通过blockingSender发起请求,此请求是阻塞式请求,线程会处理block状态,直接leader端响应. try { val response = blockingSender.sendRequest(epochRequest) val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse] debug(s"Received leaderEpoch response $response") //将结果返回给调用方, responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult => offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult => val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition) tp -> offsetForLeaderPartitionResult } }.toMap }
针对follower
发起的主求,在leader
端将由KafkaApis
中的handleOffsetForLeaderEpochRequest
处理程序处理,
其最终将由ReplicaManager
中的lastOffsetForLeaderEpoch
函数来进行处理并生成请求方需要的返回值.
在Leader节点中ReplicaManager
中的lastOffsetForLeaderEpoch
处理请求的代码片段:
其实现主要通过传入的leaderEpoch
查找epoch
对应的endOffset
记录并响应给请求方.
case HostedPartition.Online(partition) =>
val currentLeaderEpochOpt =
if (offsetForLeaderPartition.currentLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
Optional.empty[Integer]
else
Optional.of[Integer](offsetForLeaderPartition.currentLeaderEpoch)
partition.lastOffsetForLeaderEpoch(
currentLeaderEpochOpt,
offsetForLeaderPartition.leaderEpoch,
fetchOnlyFromLeader = true)
其中lastOffsetForLeaderEpoch
函数可能的返回值包含如下几类(通过查找"leader-epoch-checkpoint"
):
=>1: leaderEpoch = endOffset = -1
表示follower
端当前cache的epoch
值在leader
端不存在.
=>2: leaderEpoch = requestEpoch and endOffset = leaderLogEndOffset
表示follower
中cache的epoch
与当前leader最新的epoch
相同,此时返回当前leader
本地副本的logEndOffset
.
=>3: 当前follower
端cache的epoch
小于当前leader
的最新epoch
,此时endOffset
的返回值为:
leader端查找到大于当前requestEpoch
(follower
节点cache的epoch
)的第一个epoch
对应的startOffset
.
也就是requestEpoch
对应的endOffset
.
其处理响应的实现代码如下所示:
private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = {
//此时,endOffsets根据leader端的响应,已经获取到对应的结果.
val endOffsets = leader.fetchEpochEndOffsets(latestEpochsForPartitions)
//Ensure we hold a lock during truncation.
inLock(partitionMapLock) {
//Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs
val epochEndOffsets = endOffsets.filter {
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。