1、ReplicaManager模块简介
replicaManager主要用来管理topic在本broker上的副本信息。并且读写日志的请求都是通过replicaManager进行处理的。
每个replicaManager实例都会持有一个Pool[TopicPartition, Partition]类型的allPartitions变量。Pool其实就是一个Map的封装。通过key(TopicPartition)我们可以知道这个partition的编号,以及他对应的partiton信息。
接着parition信息中有持有该partiton的各个Replica的信息,通过获取到本broker上的Replica,可以间接获取到Replica对应的Log对象实例,然后用Log对象实例来处理日志相关的读写操作。
2、ReplicaManager初始化
- ReplicaManager主要通过startup()初始化,而startup()中主要初始化了三个周期性任务:
- isr-expiration:定期判断topic-partition的isr中是否有replica因为延迟或宕机等与leader未及时同步,需要从irs列表中移除;
- isr-change-propagation:当有topic-partition的isr变更时,会触发zk接口变更,进而触发controller进行相应的isr操作;
- shutdown-idle-replica-alter-log-dirs-thread:定期关闭空闲的同步日志的副本;
2.1、maybeShrinkIs
此任务中主要做两件事,判断当前topic-partition中是否有未同步的副本,若有未同步的副本,则更新当前分区对应的isr及HW等相关信息;主要处理流程如下:
- def maybeShrinkIsr(): Unit = {
- val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {
- //判断是否需要进行isr的更新
- //只有当前副本为leader时才进行是否有未同步的副本检查
- //未同步判断:followerReplica.logEndOffset != leaderEndOffset && (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
- needsShrinkIsr()
- }
- val leaderHWIncremented = needsIsrUpdate && inWriteLock(leaderIsrUpdateLock) {
- leaderLogIfLocal match {
- case Some(leaderLog) =>
- //获取未同步的副本id列表
- val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
- //未同步列表为空表示isr需要变更
- if (outOfSyncReplicaIds.nonEmpty) {
- //获取新的isr
- val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds
- assert(newInSyncReplicaIds.nonEmpty)
- info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
- .format(inSyncReplicaIds.mkString(","),
- newInSyncReplicaIds.mkString(","),
- leaderLog.highWatermark,
- leaderLog.logEndOffset,
- outOfSyncReplicaIds.map { replicaId =>
- s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
- }.mkString(" ")
- )
- )
-
- // update ISR in zk and in cache
- //更新zk及本地缓存的isr列表
- shrinkIsr(newInSyncReplicaIds)
-
- // we may need to increment high watermark since ISR could be down to 1
- //判断是否需要更新HW值,比较分区的所有副本,获取messageOffset最小值作为新的HW
- maybeIncrementLeaderHW(leaderLog)
- } else {
- false
- }
-
- case None => false // do nothing if no longer leader
- }
- }
-
- // some delayed operations may be unblocked after HW changed
- //若HW更新,立即处理一些延迟的请求,主要为DelayedProduce,DelayedFetch,DelayedDeleteRecords等
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
- }
2.2、maybePropagateIsrChanges
方法的作用是将那些 isr 变动的 topic-partition 列表(isrChangeSet)通过 ReplicationUtils 的 propagateIsrChanges() 方法更新 zk 上,这时候 Controller 才能知道哪些 topic-partition 的 isr 发生了变动。
- def maybePropagateIsrChanges(): Unit = {
- val now = System.currentTimeMillis()
- isrChangeSet synchronized {
- //更新的isr集合非空
- if (isrChangeSet.nonEmpty &&
- //isr列表已有lastIsrChangeMs ms未更新
- (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
- //距离上次传播isr列表时间小于IsrChangePropagationInterval ms
- lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
- //通过zk进行isr变更传播
- zkClient.propagateIsrChanges(isrChangeSet)
- //清空isr变更集合
- isrChangeSet.clear()
- //更新isr传播时间
- lastIsrPropagationMs.set(now)
- }
- }
- }
3、主要处理方法
ReplicaManager处理KafkaApi中多种类型的请求,主要有LeaderAndIsr 、StopReplica 、UpdateMetadata 、Produce 、Fetch 、ListOffset 等;
3.1、becomeLeaderOrFollower()处理
- def becomeLeaderOrFollower(correlationId: Int,
- leaderAndIsrRequest: LeaderAndIsrRequest,
- onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
- if (stateChangeLogger.isTraceEnabled) {
- leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
- stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
- s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
- s"epoch ${leaderAndIsrRequest.controllerEpoch}")
- }
- }
- replicaStateChangeLock synchronized {
- //leaderAndIsr请求中的controllerEpoch与本地的进行比较,小于本地则非法
- if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
- s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
- s"Latest known controller epoch is $controllerEpoch")
- leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
- } else {
- val responseMap = new mutable.HashMap[TopicPartition, Errors]
- val controllerId = leaderAndIsrRequest.controllerId
- controllerEpoch = leaderAndIsrRequest.controllerEpoch
-
- // First check partition's leader epoch
- val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
- val updatedPartitions = new mutable.HashSet[Partition]
-
- //过滤leaderAndIsrRequest中partition的状态,去除offline状态的partition
- leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
- val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
- //获取本地topicPartition对应的分区信息
- val partitionOpt = getPartition(topicPartition) match {
- //分区离线
- case HostedPartition.Offline =>
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
- "partition is in an offline log directory")
-
- responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
- None
-
- //分区在线
- case HostedPartition.Online(partition) =>
- updatedPartitions.add(partition)
- Some(partition)
-
- //分区不存在,将分区添加到allPartitions中并置为上线状态
- case HostedPartition.None =>
- val partition = Partition(topicPartition, time, this)
- allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
- updatedPartitions.add(partition)
- Some(partition)
- }
-
- partitionOpt.foreach { partition =>
- val currentLeaderEpoch = partition.getLeaderEpoch
- val requestLeaderEpoch = partitionState.leaderEpoch
- //获取副本集在本地的所有集合
- if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
- // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
- if (partitionState.replicas.contains(localBrokerId))
- partitionStates.put(partition, partitionState)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
- s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
- }
- } else if (requestLeaderEpoch < currentLeaderEpoch) {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition since its associated " +
- s"leader epoch $requestLeaderEpoch is smaller than the current " +
- s"leader epoch $currentLeaderEpoch")
- responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
- } else {
- stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
- s"controller $controllerId with correlation id $correlationId " +
- s"epoch $controllerEpoch for partition $topicPartition since its associated " +
- s"leader epoch $requestLeaderEpoch matches the current leader epoch")
- responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
- }
- }
- }
-
- //过滤出leader的副本
- val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
- partitionState.leader == localBrokerId
- }
- //获取follower副本
- val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys
-
- val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
- val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
- //调用makeLeaders,设置对应的分区为leader
- makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
- highWatermarkCheckpoints)
- else
- Set.empty[Partition]
- val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
- //调用makeFollowers,设置对应的分区为follower
- makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
- highWatermarkCheckpoints)
- else
- Set.empty[Partition]
-
- /*
- * KAFKA-8392
- * For topic partitions of which the broker is no longer a leader, delete metrics related to
- * those topics. Note that this means the broker stops being either a replica or a leader of
- * partitions of said topics
- */
- //去除topic相关的leader及follower的监控
- val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
- val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
- followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
-
- // remove metrics for brokers which are not followers of a topic
- leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
-
- leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
- val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
- /*
- * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
- * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
- * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
- * we need to map this topic-partition to OfflinePartition instead.
- */
- if (localLog(topicPartition).isEmpty)
- markPartitionOffline(topicPartition)
- }
-
- // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
- // have been completely populated before starting the checkpointing there by avoiding weird race conditions
- startHighWatermarkCheckPointThread()
-
- val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
- for (partition <- updatedPartitions) {
- val topicPartition = partition.topicPartition
- //若分区对应的日志不存在,则创建对应日志目录信息
- if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
- partition.log.foreach { log =>
- val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
-
- // Add future replica to partition's map
- //创建分区日志
- partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
- highWatermarkCheckpoints)
-
- // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
- // replica from source dir to destination dir
- logManager.abortAndPauseCleaning(topicPartition)
-
- futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
- partition.getLeaderEpoch, log.highWatermark))
- }
- }
- }
- //给新的分区添加Fetcher,其从leader同步消息
- replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
-
- //关闭空闲的Fetcher
- replicaFetcherManager.shutdownIdleFetcherThreads()
- replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
- //回调onLeadershipChange,groupCoordinator做一些选举及注册等相关工作
- onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
- val responsePartitions = responseMap.iterator.map { case (tp, error) =>
- new LeaderAndIsrPartitionError()
- .setTopicName(tp.topic)
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code)
- }.toBuffer
- new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
- .setErrorCode(Errors.NONE.code)
- .setPartitionErrors(responsePartitions.asJava))
- }
- }
- }
3.2、makeLeaders()处理
- private def makeLeaders(controllerId: Int,
- controllerEpoch: Int,
- partitionStates: Map[Partition, LeaderAndIsrPartitionState],
- correlationId: Int,
- responseMap: mutable.Map[TopicPartition, Errors],
- highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
- partitionStates.keys.foreach { partition =>
- stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
- s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
- s"partition ${partition.topicPartition}")
- }
-
- for (partition <- partitionStates.keys)
- responseMap.put(partition.topicPartition, Errors.NONE)
-
- val partitionsToMakeLeaders = mutable.Set[Partition]()
-
- try {
- // First stop fetchers for all the partitions
- //停止leader的Fetcher,不再从旧的leader同步消息
- replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
- // Update the partition information to be the leader
- //更新分区信息,初始化其为leader
- partitionStates.foreach { case (partition, partitionState) =>
- try {
- //初始分区信息,主要为日志信息、本地同步偏移等、follower副本初始信息等
- if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) {
- partitionsToMakeLeaders += partition
- stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
- s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
- s"(last update controller epoch ${partitionState.controllerEpoch})")
- } else
- stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
- s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
- s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " +
- s"since it is already the leader for the partition.")
- } catch {
- case e: KafkaStorageException =>
- stateChangeLogger.error(s"Skipped the become-leader state change with " +
- s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
- s"(last update controller epoch ${partitionState.controllerEpoch}) since " +
- s"the replica for the partition is offline due to disk error $e")
- val dirOpt = getLogDir(partition.topicPartition)
- error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
- responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
- }
- }
-
- } catch {
- case e: Throwable =>
- partitionStates.keys.foreach { partition =>
- stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
- s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e)
- }
- // Re-throw the exception for it to be caught in KafkaApis
- throw e
- }
-
- partitionStates.keys.foreach { partition =>
- stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
- s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
- }
-
- partitionsToMakeLeaders
- }
3.3、appendRecords()处理
此处理主要是向leader的副本中添加消息记录,并会等待消息同步到其他follower副本,直到同步超时或完成;
- def appendRecords(timeout: Long,
- requiredAcks: Short,
- internalTopicsAllowed: Boolean,
- origin: AppendOrigin,
- entriesPerPartition: Map[TopicPartition, MemoryRecords],
- responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
- delayedProduceLock: Option[Lock] = None,
- recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
- if (isValidRequiredAcks(requiredAcks)) {
- val sTime = time.milliseconds
- //将消息同步到本地的leader日志中
- val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
- origin, entriesPerPartition, requiredAcks)
- debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-
- //消息处理的接口
- val produceStatus = localProduceResults.map { case (topicPartition, result) =>
- topicPartition ->
- ProducePartitionStatus(
- result.info.lastOffset + 1, // required offset
- new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
- result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
- }
-
- recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
-
- //需要延迟进行应答处理?
- if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
- // create delayed produce operation
- val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
- val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
-
- // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
- val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-
- // try to complete the request immediately, otherwise put it into the purgatory
- // this is because while the delayed produce operation is being created, new
- // requests may arrive and hence make this operation completable.
- //添加到监控队列中
- delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
-
- } else {
- // we can respond immediately
- //立即应答
- val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
- responseCallback(produceResponseStatus)
- }
- } else {
- // If required.acks is outside accepted range, something is wrong with the client
- // Just return an error and don't handle the request at all
- val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
- topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
- LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
- }
- responseCallback(responseStatus)
- }
- }
3.4、fetchMessages()处理
此处理主要为从副本拉取消息,当超时或拉取到足够的消息时才返回,消费者可以从任何副本拉取消息,但follower只能从leader拉取消息;
- def fetchMessages(timeout: Long,
- replicaId: Int,
- fetchMinBytes: Int,
- fetchMaxBytes: Int,
- hardMaxBytesLimit: Boolean,
- fetchInfos: Seq[(TopicPartition, PartitionData)],
- quota: ReplicaQuota,
- responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
- isolationLevel: IsolationLevel,
- clientMetadata: Option[ClientMetadata]): Unit = {
- //消息是从follower来的
- val isFromFollower = Request.isValidBrokerId(replicaId)
- //消息是从消费者来的
- val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
-
- //若follower拉取,则直接从队列最后拉取
- //若隔离等级为读提交的,则从提交的消息开始拉取
- //否则从HW处开始拉取
- val fetchIsolation = if (!isFromConsumer)
- FetchLogEnd
- else if (isolationLevel == IsolationLevel.READ_COMMITTED)
- FetchTxnCommitted
- else
- FetchHighWatermark
-
- // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
- //判断是否只能从leader拉取消息
- val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
- def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
- val result = readFromLocalLog(
- replicaId = replicaId,
- fetchOnlyFromLeader = fetchOnlyFromLeader,
- fetchIsolation = fetchIsolation,
- fetchMaxBytes = fetchMaxBytes,
- hardMaxBytesLimit = hardMaxBytesLimit,
- readPartitionInfo = fetchInfos,
- quota = quota,
- clientMetadata = clientMetadata)
- //更新follower的fetch信息
- if (isFromFollower) updateFollowerFetchState(replicaId, result)
- else result
- }
-
- //读取消息
- val logReadResults = readFromLog()
-
- // check if this fetch request can be satisfied right away
- var bytesReadable: Long = 0
- var errorReadingData = false
- val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
- var anyPartitionsNeedHwUpdate = false
- logReadResults.foreach { case (topicPartition, logReadResult) =>
- if (logReadResult.error != Errors.NONE)
- errorReadingData = true
- bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
- logReadResultMap.put(topicPartition, logReadResult)
- if (isFromFollower && logReadResult.followerNeedsHwUpdate) {
- anyPartitionsNeedHwUpdate = true
- }
- }
-
- // respond immediately if 1) fetch request does not want to wait
- // 2) fetch request does not require any data
- // 3) has enough data to respond
- // 4) some error happens while reading data
- // 5) any of the requested partitions need HW update
- //看是否能立即返回
- if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
- val fetchPartitionData = logReadResults.map { case (tp, result) =>
- tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
- result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId))
- }
- responseCallback(fetchPartitionData)
- } else {
- // construct the fetch results from the read results
- //构建fetch的结果
- val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
- fetchInfos.foreach { case (topicPartition, partitionData) =>
- logReadResultMap.get(topicPartition).foreach(logReadResult => {
- val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
- fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
- })
- }
- val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
- fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
- val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
- responseCallback)
-
- // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
- val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
-
- // try to complete the request immediately, otherwise put it into the purgatory;
- // this is because while the delayed fetch operation is being created, new requests
- // may arrive and hence make this operation completable.
- delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
- }
- }