LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=11,那么表示该副本保存了11条消息,位移值范围是[0, 10]
ReplicaManager 可以说是 Server 端重要的组成部分,Server 端的多种类型的请求都是调用ReplicaManager 来处理:
public class LeaderAndIsrRequestData implements ApiMessage { int controllerId; int controllerEpoch; long brokerEpoch; byte type; List<LeaderAndIsrPartitionState> ungroupedPartitionStates; List<LeaderAndIsrTopicState> topicStates; List<LeaderAndIsrLiveLeader> liveLeaders; ... } public static class LeaderAndIsrPartitionState implements Message { String topicName; int partitionIndex; int controllerEpoch; int leader; int leaderEpoch; List<Integer> isr; int zkVersion; List<Integer> replicas; List<Integer> addingReplicas; List<Integer> removingReplicas; }
def stopReplicas(correlationId: Int, controllerId: Int, controllerEpoch: Int, brokerEpoch: Long, partitionStates: Map[TopicPartition, StopReplicaPartitionState] ): (mutable.Map[TopicPartition, Errors], Errors) = { replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[TopicPartition, Errors] if (controllerEpoch < this.controllerEpoch) { stateChangeLogger.warn(s"Ignoring StopReplica request from " + s"controller $controllerId with correlation id $correlationId " + s"since its controller epoch $controllerEpoch is old. " + s"Latest known controller epoch is ${this.controllerEpoch}") (responseMap, Errors.STALE_CONTROLLER_EPOCH) } else { this.controllerEpoch = controllerEpoch ... stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) => if (e.isInstanceOf[KafkaStorageException]) { stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") } else { stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " + s"${e.getClass.getName} exception: ${e.getMessage}") responseMap.put(topicPartition, Errors.forException(e)) } responseMap.put(topicPartition, Errors.forException(e)) } (responseMap, Errors.NONE) } } } protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { // First stop fetchers for all partitions. val partitions = partitionsToStop.keySet replicaFetcherManager.removeFetcherForPartitions(partitions) replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) // Second remove deleted partitions from the partition map. Fetchers rely on the // ReplicaManager to get Partition's information so they must be stopped first. val partitionsToDelete = mutable.Set.empty[TopicPartition] partitionsToStop.forKeyValue { (topicPartition, shouldDelete) => if (shouldDelete) { getPartition(topicPartition) match { case hostedPartition: NonOffline => if (allPartitions.remove(topicPartition, hostedPartition)) { maybeRemoveTopicMetrics(topicPartition.topic) // Logs are not deleted here. They are deleted in a single batch later on. // This is done to avoid having to checkpoint for every deletions. hostedPartition.partition.delete() } } partitionsToDelete += topicPartition } completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) } errorMap }
UpdateMetadata 请求主要controller监听到元数据变化了,通知其他非controller的broker更新数据,使集群的broker的元数据保持一致
def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, origin, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) ... if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediately val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus } responseCallback(produceResponseStatus) } ... } private def isValidRequiredAcks(requiredAcks: Short): Boolean = { requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 } def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { if (operation.safeTryCompleteOrElse { watchKeys.foreach(key => watchForOperation(key, operation)) if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet() }) return true // if it cannot be completed by now and hence is watched, add to the expire queue also if (!operation.isCompleted) { if (timerEnabled) timeoutTimer.add(operation)//加入到时间轮 if (operation.isCompleted) { // cancel the timer task operation.cancel() } } false }
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => val minIsr = leaderLog.config.minInSyncReplicas val inSyncSize = isrState.isr.size if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " + s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition") } val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin, interBrokerProtocolVersion) (info, maybeIncrementLeaderHW(leaderLog)) case None => throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d" .format(topicPartition, localBrokerId)) } } info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same) }
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = { // maybeIncrementLeaderHW is in the hot path, the following code is written to // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => // Note here we are using the "maximal", see explanation above if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId))) { newHighWatermark = replica.logEndOffsetMetadata } } leaderLog.maybeIncrementHighWatermark(newHighWatermark) match { case Some(oldHighWatermark) => debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark") true case None => def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = { case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata" } if (isTraceEnabled) { val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata) trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " + s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}") } false } }
在上面LeaderAndIsr 请求的时候提到,ReplicaManager接受到请求后,如果分区的leader的brokerId和本地的不一样,则改broker就是这个分区的follow副本,则会启动ReplicaFetcherThread线程不断发送fetch请求同步leader副本的数据。
----------ShutdownableThread---------------------------------- def doWork(): Unit override def run(): Unit = { isStarted = true info("Starting") try { while (isRunning) doWork()//一直循环执行doWork,其中doWork是抽象方法,其实现在AbstractFetcherThread } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") } -----------AbstractFetcherThread-------------------------- override def doWork(): Unit = { maybeTruncate()//截取日志 maybeFetch() //向远程的leader分区拉取日志 }
private def maybeTruncate(): Unit = { val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions() if (partitionsWithEpochs.nonEmpty) { truncateToEpochEndOffsets(partitionsWithEpochs) } if (partitionsWithoutEpochs.nonEmpty) { truncateToHighWatermark(partitionsWithoutEpochs) } } private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = { val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions)//向远程leader partition 拉取罪行的endOffsets //Ensure we hold a lock during truncation. inLock(partitionMapLock) { //Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs val epochEndOffsets = endOffsets.filter { case (tp, _) => val curPartitionState = partitionStates.stateValue(tp) val partitionEpochRequest = latestEpochsForPartitions.getOrElse(tp, { throw new IllegalStateException( s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request") }) val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch } val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } }
public FetchRequest build(short version) { if (version < 3) { maxBytes = DEFAULT_RESPONSE_MAX_BYTES; } FetchRequestData fetchRequestData = new FetchRequestData(); fetchRequestData.setReplicaId(replicaId); fetchRequestData.setMaxWaitMs(maxWait);//最多等待的时候,如果leader没有数据会等待一段时间在拉取数据 fetchRequestData.setMinBytes(minBytes);//为了保证拉取的性能,这里会做赞批处理 fetchRequestData.setMaxBytes(maxBytes);//如果延迟的消息过多,则会分多长拉取同步消息 ...... FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition() .setPartition(topicPartition.partition()) .setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))//当前的leaderEpoch .setLastFetchedEpoch(partitionData.lastFetchedEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) .setFetchOffset(partitionData.fetchOffset)//拉取的offset .setLogStartOffset(partitionData.logStartOffset)//lso .setPartitionMaxBytes(partitionData.maxBytes); fetchTopic.partitions().add(fetchPartition); } ..... return new FetchRequest(fetchRequestData, version); }
--------------processFetchRequest------------------------------------ if (responseData.nonEmpty) { // process fetched data inLock(partitionMapLock) { responseData.forKeyValue { (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // In this case, we only want to process the fetch response if the partition state is ready for fetch and // the current offset is the same as the offset requested. val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { partitionData.error match { case Errors.NONE => try { // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset, partitionData)//如果有同步的消息数据,则进行写消息处理 logAppendInfoOpt.foreach { logAppendInfo => val validBytes = logAppendInfo.validBytes val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset val lag = Math.max(0L, partitionData.highWatermark - nextOffset) fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
def updateFollowerFetchState(followerId: Int, followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long): Boolean = { getReplica(followerId) match { case Some(followerReplica) => val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = followerReplica.logEndOffset followerReplica.updateFetchState( followerFetchOffsetMetadata, followerStartOffset, followerFetchTimeMs, leaderEndOffset) val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val leaderLWIncremented = newLeaderLW > oldLeaderLW maybeExpandIsr(followerReplica, followerFetchTimeMs) val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) { inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)) } } else { false } if (leaderLWIncremented || leaderHWIncremented) tryCompleteDelayedRequests() true case None => false } } private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = { // maybeIncrementLeaderHW is in the hot path, the following code is written to // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => // Note here we are using the "maximal", see explanation above if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId))) { newHighWatermark = replica.logEndOffsetMetadata } } leaderLog.maybeIncrementHighWatermark(newHighWatermark) match { case Some(oldHighWatermark) => debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark") true case None => def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = { case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata" } if (isTraceEnabled) { val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata) trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " + s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}") } false } }
