当前位置:   article > 正文

kafka源码愫读(5)、ReplicaManager模块源码分析

received leaderandisrrequest with correlation id 39 from controller 2 epoch

1、ReplicaManager模块简介

replicaManager主要用来管理topic在本broker上的副本信息。并且读写日志的请求都是通过replicaManager进行处理的。
每个replicaManager实例都会持有一个Pool[TopicPartition, Partition]类型的allPartitions变量。Pool其实就是一个Map的封装。通过key(TopicPartition)我们可以知道这个partition的编号,以及他对应的partiton信息。
接着parition信息中有持有该partiton的各个Replica的信息,通过获取到本broker上的Replica,可以间接获取到Replica对应的Log对象实例,然后用Log对象实例来处理日志相关的读写操作。

2、ReplicaManager初始化

  • ReplicaManager主要通过startup()初始化,而startup()中主要初始化了三个周期性任务:
  • isr-expiration:定期判断topic-partition的isr中是否有replica因为延迟或宕机等与leader未及时同步,需要从irs列表中移除;
  • isr-change-propagation:当有topic-partition的isr变更时,会触发zk接口变更,进而触发controller进行相应的isr操作;
  • shutdown-idle-replica-alter-log-dirs-thread:定期关闭空闲的同步日志的副本;

2.1、maybeShrinkIs

此任务中主要做两件事,判断当前topic-partition中是否有未同步的副本,若有未同步的副本,则更新当前分区对应的isr及HW等相关信息;主要处理流程如下:

  1. def maybeShrinkIsr(): Unit = {
  2. val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {
  3. //判断是否需要进行isr的更新
  4. //只有当前副本为leader时才进行是否有未同步的副本检查
  5. //未同步判断:followerReplica.logEndOffset != leaderEndOffset && (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
  6. needsShrinkIsr()
  7. }
  8. val leaderHWIncremented = needsIsrUpdate && inWriteLock(leaderIsrUpdateLock) {
  9. leaderLogIfLocal match {
  10. case Some(leaderLog) =>
  11. //获取未同步的副本id列表
  12. val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
  13. //未同步列表为空表示isr需要变更
  14. if (outOfSyncReplicaIds.nonEmpty) {
  15. //获取新的isr
  16. val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds
  17. assert(newInSyncReplicaIds.nonEmpty)
  18. info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
  19. .format(inSyncReplicaIds.mkString(","),
  20. newInSyncReplicaIds.mkString(","),
  21. leaderLog.highWatermark,
  22. leaderLog.logEndOffset,
  23. outOfSyncReplicaIds.map { replicaId =>
  24. s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
  25. }.mkString(" ")
  26. )
  27. )
  28. // update ISR in zk and in cache
  29. //更新zk及本地缓存的isr列表
  30. shrinkIsr(newInSyncReplicaIds)
  31. // we may need to increment high watermark since ISR could be down to 1
  32. //判断是否需要更新HW值,比较分区的所有副本,获取messageOffset最小值作为新的HW
  33. maybeIncrementLeaderHW(leaderLog)
  34. } else {
  35. false
  36. }
  37. case None => false // do nothing if no longer leader
  38. }
  39. }
  40. // some delayed operations may be unblocked after HW changed
  41. //若HW更新,立即处理一些延迟的请求,主要为DelayedProduce,DelayedFetch,DelayedDeleteRecords等
  42. if (leaderHWIncremented)
  43. tryCompleteDelayedRequests()
  44. }

2.2、maybePropagateIsrChanges

方法的作用是将那些 isr 变动的 topic-partition 列表(isrChangeSet)通过 ReplicationUtils 的 propagateIsrChanges() 方法更新 zk 上,这时候 Controller 才能知道哪些 topic-partition 的 isr 发生了变动。

  1. def maybePropagateIsrChanges(): Unit = {
  2. val now = System.currentTimeMillis()
  3. isrChangeSet synchronized {
  4. //更新的isr集合非空
  5. if (isrChangeSet.nonEmpty &&
  6. //isr列表已有lastIsrChangeMs ms未更新
  7. (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
  8. //距离上次传播isr列表时间小于IsrChangePropagationInterval ms
  9. lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
  10. //通过zk进行isr变更传播
  11. zkClient.propagateIsrChanges(isrChangeSet)
  12. //清空isr变更集合
  13. isrChangeSet.clear()
  14. //更新isr传播时间
  15. lastIsrPropagationMs.set(now)
  16. }
  17. }
  18. }

3、主要处理方法

ReplicaManager处理KafkaApi中多种类型的请求,主要有LeaderAndIsr 、StopReplica 、UpdateMetadata 、Produce 、Fetch 、ListOffset 等;

3.1、becomeLeaderOrFollower()处理

  1. def becomeLeaderOrFollower(correlationId: Int,
  2. leaderAndIsrRequest: LeaderAndIsrRequest,
  3. onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
  4. if (stateChangeLogger.isTraceEnabled) {
  5. leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
  6. stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
  7. s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
  8. s"epoch ${leaderAndIsrRequest.controllerEpoch}")
  9. }
  10. }
  11. replicaStateChangeLock synchronized {
  12. //leaderAndIsr请求中的controllerEpoch与本地的进行比较,小于本地则非法
  13. if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
  14. stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
  15. s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
  16. s"Latest known controller epoch is $controllerEpoch")
  17. leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
  18. } else {
  19. val responseMap = new mutable.HashMap[TopicPartition, Errors]
  20. val controllerId = leaderAndIsrRequest.controllerId
  21. controllerEpoch = leaderAndIsrRequest.controllerEpoch
  22. // First check partition's leader epoch
  23. val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
  24. val updatedPartitions = new mutable.HashSet[Partition]
  25. //过滤leaderAndIsrRequest中partition的状态,去除offline状态的partition
  26. leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
  27. val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
  28. //获取本地topicPartition对应的分区信息
  29. val partitionOpt = getPartition(topicPartition) match {
  30. //分区离线
  31. case HostedPartition.Offline =>
  32. stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
  33. s"controller $controllerId with correlation id $correlationId " +
  34. s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
  35. "partition is in an offline log directory")
  36. responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
  37. None
  38. //分区在线
  39. case HostedPartition.Online(partition) =>
  40. updatedPartitions.add(partition)
  41. Some(partition)
  42. //分区不存在,将分区添加到allPartitions中并置为上线状态
  43. case HostedPartition.None =>
  44. val partition = Partition(topicPartition, time, this)
  45. allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
  46. updatedPartitions.add(partition)
  47. Some(partition)
  48. }
  49. partitionOpt.foreach { partition =>
  50. val currentLeaderEpoch = partition.getLeaderEpoch
  51. val requestLeaderEpoch = partitionState.leaderEpoch
  52. //获取副本集在本地的所有集合
  53. if (requestLeaderEpoch > currentLeaderEpoch) {
  54. // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
  55. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
  56. if (partitionState.replicas.contains(localBrokerId))
  57. partitionStates.put(partition, partitionState)
  58. else {
  59. stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
  60. s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
  61. s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
  62. responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
  63. }
  64. } else if (requestLeaderEpoch < currentLeaderEpoch) {
  65. stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
  66. s"controller $controllerId with correlation id $correlationId " +
  67. s"epoch $controllerEpoch for partition $topicPartition since its associated " +
  68. s"leader epoch $requestLeaderEpoch is smaller than the current " +
  69. s"leader epoch $currentLeaderEpoch")
  70. responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
  71. } else {
  72. stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
  73. s"controller $controllerId with correlation id $correlationId " +
  74. s"epoch $controllerEpoch for partition $topicPartition since its associated " +
  75. s"leader epoch $requestLeaderEpoch matches the current leader epoch")
  76. responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
  77. }
  78. }
  79. }
  80. //过滤出leader的副本
  81. val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
  82. partitionState.leader == localBrokerId
  83. }
  84. //获取follower副本
  85. val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys
  86. val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
  87. val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
  88. //调用makeLeaders,设置对应的分区为leader
  89. makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
  90. highWatermarkCheckpoints)
  91. else
  92. Set.empty[Partition]
  93. val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
  94. //调用makeFollowers,设置对应的分区为follower
  95. makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
  96. highWatermarkCheckpoints)
  97. else
  98. Set.empty[Partition]
  99. /*
  100. * KAFKA-8392
  101. * For topic partitions of which the broker is no longer a leader, delete metrics related to
  102. * those topics. Note that this means the broker stops being either a replica or a leader of
  103. * partitions of said topics
  104. */
  105. //去除topic相关的leader及follower的监控
  106. val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
  107. val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
  108. followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
  109. // remove metrics for brokers which are not followers of a topic
  110. leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
  111. leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
  112. val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
  113. /*
  114. * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
  115. * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
  116. * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
  117. * we need to map this topic-partition to OfflinePartition instead.
  118. */
  119. if (localLog(topicPartition).isEmpty)
  120. markPartitionOffline(topicPartition)
  121. }
  122. // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
  123. // have been completely populated before starting the checkpointing there by avoiding weird race conditions
  124. startHighWatermarkCheckPointThread()
  125. val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
  126. for (partition <- updatedPartitions) {
  127. val topicPartition = partition.topicPartition
  128. //若分区对应的日志不存在,则创建对应日志目录信息
  129. if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
  130. partition.log.foreach { log =>
  131. val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
  132. // Add future replica to partition's map
  133. //创建分区日志
  134. partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
  135. highWatermarkCheckpoints)
  136. // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
  137. // replica from source dir to destination dir
  138. logManager.abortAndPauseCleaning(topicPartition)
  139. futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
  140. partition.getLeaderEpoch, log.highWatermark))
  141. }
  142. }
  143. }
  144. //给新的分区添加Fetcher,其从leader同步消息
  145. replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
  146. //关闭空闲的Fetcher
  147. replicaFetcherManager.shutdownIdleFetcherThreads()
  148. replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
  149. //回调onLeadershipChange,groupCoordinator做一些选举及注册等相关工作
  150. onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
  151. val responsePartitions = responseMap.iterator.map { case (tp, error) =>
  152. new LeaderAndIsrPartitionError()
  153. .setTopicName(tp.topic)
  154. .setPartitionIndex(tp.partition)
  155. .setErrorCode(error.code)
  156. }.toBuffer
  157. new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
  158. .setErrorCode(Errors.NONE.code)
  159. .setPartitionErrors(responsePartitions.asJava))
  160. }
  161. }
  162. }

3.2、makeLeaders()处理

  1. private def makeLeaders(controllerId: Int,
  2. controllerEpoch: Int,
  3. partitionStates: Map[Partition, LeaderAndIsrPartitionState],
  4. correlationId: Int,
  5. responseMap: mutable.Map[TopicPartition, Errors],
  6. highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
  7. partitionStates.keys.foreach { partition =>
  8. stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
  9. s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
  10. s"partition ${partition.topicPartition}")
  11. }
  12. for (partition <- partitionStates.keys)
  13. responseMap.put(partition.topicPartition, Errors.NONE)
  14. val partitionsToMakeLeaders = mutable.Set[Partition]()
  15. try {
  16. // First stop fetchers for all the partitions
  17. //停止leader的Fetcher,不再从旧的leader同步消息
  18. replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
  19. // Update the partition information to be the leader
  20. //更新分区信息,初始化其为leader
  21. partitionStates.foreach { case (partition, partitionState) =>
  22. try {
  23. //初始分区信息,主要为日志信息、本地同步偏移等、follower副本初始信息等
  24. if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) {
  25. partitionsToMakeLeaders += partition
  26. stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
  27. s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
  28. s"(last update controller epoch ${partitionState.controllerEpoch})")
  29. } else
  30. stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
  31. s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
  32. s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " +
  33. s"since it is already the leader for the partition.")
  34. } catch {
  35. case e: KafkaStorageException =>
  36. stateChangeLogger.error(s"Skipped the become-leader state change with " +
  37. s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
  38. s"(last update controller epoch ${partitionState.controllerEpoch}) since " +
  39. s"the replica for the partition is offline due to disk error $e")
  40. val dirOpt = getLogDir(partition.topicPartition)
  41. error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
  42. responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
  43. }
  44. }
  45. } catch {
  46. case e: Throwable =>
  47. partitionStates.keys.foreach { partition =>
  48. stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
  49. s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e)
  50. }
  51. // Re-throw the exception for it to be caught in KafkaApis
  52. throw e
  53. }
  54. partitionStates.keys.foreach { partition =>
  55. stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
  56. s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
  57. }
  58. partitionsToMakeLeaders
  59. }

3.3、appendRecords()处理

此处理主要是向leader的副本中添加消息记录,并会等待消息同步到其他follower副本,直到同步超时或完成;

  1. def appendRecords(timeout: Long,
  2. requiredAcks: Short,
  3. internalTopicsAllowed: Boolean,
  4. origin: AppendOrigin,
  5. entriesPerPartition: Map[TopicPartition, MemoryRecords],
  6. responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
  7. delayedProduceLock: Option[Lock] = None,
  8. recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
  9. if (isValidRequiredAcks(requiredAcks)) {
  10. val sTime = time.milliseconds
  11. //将消息同步到本地的leader日志中
  12. val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
  13. origin, entriesPerPartition, requiredAcks)
  14. debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
  15. //消息处理的接口
  16. val produceStatus = localProduceResults.map { case (topicPartition, result) =>
  17. topicPartition ->
  18. ProducePartitionStatus(
  19. result.info.lastOffset + 1, // required offset
  20. new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
  21. result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
  22. }
  23. recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
  24. //需要延迟进行应答处理?
  25. if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
  26. // create delayed produce operation
  27. val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
  28. val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
  29. // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
  30. val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
  31. // try to complete the request immediately, otherwise put it into the purgatory
  32. // this is because while the delayed produce operation is being created, new
  33. // requests may arrive and hence make this operation completable.
  34. //添加到监控队列中
  35. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  36. } else {
  37. // we can respond immediately
  38. //立即应答
  39. val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
  40. responseCallback(produceResponseStatus)
  41. }
  42. } else {
  43. // If required.acks is outside accepted range, something is wrong with the client
  44. // Just return an error and don't handle the request at all
  45. val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
  46. topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
  47. LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
  48. }
  49. responseCallback(responseStatus)
  50. }
  51. }

3.4、fetchMessages()处理

此处理主要为从副本拉取消息,当超时或拉取到足够的消息时才返回,消费者可以从任何副本拉取消息,但follower只能从leader拉取消息;

  1. def fetchMessages(timeout: Long,
  2. replicaId: Int,
  3. fetchMinBytes: Int,
  4. fetchMaxBytes: Int,
  5. hardMaxBytesLimit: Boolean,
  6. fetchInfos: Seq[(TopicPartition, PartitionData)],
  7. quota: ReplicaQuota,
  8. responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
  9. isolationLevel: IsolationLevel,
  10. clientMetadata: Option[ClientMetadata]): Unit = {
  11. //消息是从follower来的
  12. val isFromFollower = Request.isValidBrokerId(replicaId)
  13. //消息是从消费者来的
  14. val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
  15. //若follower拉取,则直接从队列最后拉取
  16. //若隔离等级为读提交的,则从提交的消息开始拉取
  17. //否则从HW处开始拉取
  18. val fetchIsolation = if (!isFromConsumer)
  19. FetchLogEnd
  20. else if (isolationLevel == IsolationLevel.READ_COMMITTED)
  21. FetchTxnCommitted
  22. else
  23. FetchHighWatermark
  24. // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
  25. //判断是否只能从leader拉取消息
  26. val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
  27. def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  28. val result = readFromLocalLog(
  29. replicaId = replicaId,
  30. fetchOnlyFromLeader = fetchOnlyFromLeader,
  31. fetchIsolation = fetchIsolation,
  32. fetchMaxBytes = fetchMaxBytes,
  33. hardMaxBytesLimit = hardMaxBytesLimit,
  34. readPartitionInfo = fetchInfos,
  35. quota = quota,
  36. clientMetadata = clientMetadata)
  37. //更新follower的fetch信息
  38. if (isFromFollower) updateFollowerFetchState(replicaId, result)
  39. else result
  40. }
  41. //读取消息
  42. val logReadResults = readFromLog()
  43. // check if this fetch request can be satisfied right away
  44. var bytesReadable: Long = 0
  45. var errorReadingData = false
  46. val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
  47. var anyPartitionsNeedHwUpdate = false
  48. logReadResults.foreach { case (topicPartition, logReadResult) =>
  49. if (logReadResult.error != Errors.NONE)
  50. errorReadingData = true
  51. bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
  52. logReadResultMap.put(topicPartition, logReadResult)
  53. if (isFromFollower && logReadResult.followerNeedsHwUpdate) {
  54. anyPartitionsNeedHwUpdate = true
  55. }
  56. }
  57. // respond immediately if 1) fetch request does not want to wait
  58. // 2) fetch request does not require any data
  59. // 3) has enough data to respond
  60. // 4) some error happens while reading data
  61. // 5) any of the requested partitions need HW update
  62. //看是否能立即返回
  63. if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
  64. val fetchPartitionData = logReadResults.map { case (tp, result) =>
  65. tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
  66. result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId))
  67. }
  68. responseCallback(fetchPartitionData)
  69. } else {
  70. // construct the fetch results from the read results
  71. //构建fetch的结果
  72. val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
  73. fetchInfos.foreach { case (topicPartition, partitionData) =>
  74. logReadResultMap.get(topicPartition).foreach(logReadResult => {
  75. val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
  76. fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
  77. })
  78. }
  79. val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
  80. fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
  81. val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
  82. responseCallback)
  83. // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
  84. val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
  85. // try to complete the request immediately, otherwise put it into the purgatory;
  86. // this is because while the delayed fetch operation is being created, new requests
  87. // may arrive and hence make this operation completable.
  88. delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  89. }
  90. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/325960
推荐阅读
相关标签
  

闽ICP备14008679号