当前位置:   article > 正文

Kafka 3.0 源码笔记(4)-Kafka 服务端对客户端的 Fetch 请求处理_kafka fetch

kafka fetch

前言

Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中笔者介绍了 Kafka 的客户端之一消费者 Consumer 拉取消息的流程,而Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析一文则从服务器层面解析了 Kafka 服务端接收请求的关键,本文则将以来自客户端的 Fetch 请求的处理为例,大致分析 Kafka 服务端对请求的业务处理

此处之所以强调来自客户端的 Fetch 请求(包括消费者 Fetch 请求和分区副本 Follower 作为客户端同步消息的 Fetch 请求) ,是因为在 KRaft 模式下作为集群 Leader 的 Kafka 节点会收到来自 Follower 的同步集群元数据的 Fetch 请求,负责这部分请求的处理组件截然不同,读者如有兴趣可参考 Kafka 3.0 源码笔记(9)-Kafka 服务端元数据的主从同步

1. Kafka 的文件存储结构

在这里插入图片描述

  • Topic
    发布订阅的消息主题,用作存储消息的第一层逻辑结构。每个主题包含多个分区,这些分区通常会分布在不同的 Broker 节点上,共同构成一个主题的物理基础
  • Partition
    主题下的分区,逻辑上分区由一组副本共同组成,其中 Leader 角色的副本实际存储来自客户端的消息,Follower 角色的副本通过同步 Leader 的数据来保证高可用。分区副本的数据结构为 Partition,是 Kafka 中用来存储消息的第二层结构,每个副本物理上是一组有序的消息日志
  • Log
    分区副本中的日志结构,实际是一组日志数据段的集合
  • LogSegment
    日志段数据结构,其内部保存了实际保存消息数据的文件对象,主要包括实际消息存储文件 FileRecords,偏移量索引文件OffsetIndex、时间戳索引文件TimeIndex

2. 来自客户端的 Fetch 请求处理

在这里插入图片描述

  1. 在 Kafka 的 BrokerServer 接收来自客户端的 Fetch 请求后,会将其投入到请求队列交由上层业务处理器处理,此时将触发 KafkaApis.scala#handle() 方法。可以看到这是个入口方法,其核心逻辑如下:

    1. 根据请求类型分发请求到不同的方法进行处理,来自客户端的 Fetch 请求将被 KafkaApis.scala#handleFetchRequest() 方法处理
    2. 在 finally 块中调用 ReplicaManager.scala#tryCompleteActions() 方法尝试执行请求处理过程中产生的延时任务
     override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
    
       if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
         // The socket server will reject APIs which are not exposed in this scope and close the connection
         // before handing them to the request handler, so this path should not be exercised in practice
         throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
       }
    
       request.header.apiKey match {
         case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
         case ApiKeys.FETCH => handleFetchRequest(request)
         case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
         case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
         case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
         case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
         case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
         case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
         case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
         case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
         case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
         case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
         case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
         case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
         case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
         case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
         case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
         case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
         case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest)
         case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
         case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
         case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
         case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
         case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
         case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
         case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
         case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
         case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
         case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
         case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
         case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
         case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
         case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
       }
     } catch {
       case e: FatalExitError => throw e
       case e: Throwable =>
         error(s"Unexpected error handling request ${request.requestDesc(true)} " +
           s"with context ${request.context}", e)
         requestHelper.handleError(request, e)
     } finally {
       // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
       // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
       // expiration thread for certain delayed operations (e.g. DelayedJoin)
       replicaManager.tryCompleteActions()
       // The local completion time may be set while processing the request. Only record it if it's unset.
       if (request.apiLocalCompleteTimeNanos < 0)
         request.apiLocalCompleteTimeNanos = time.nanoseconds
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
  2. KafkaApis.scala#handleFetchRequest() 方法非常长,不过核心逻辑比较简单:

    1. 首先从请求参数中确定客户端订阅消费的 topic
    2. 调用 ReplicaManager.scala#fetchMessages() 方法去本地文件读取消息数据,并设置 KafkaApis.scala#handleFetchRequest()#processResponseCallback() 函数为处理完成后的响应回调
     def handleFetchRequest(request: RequestChannel.Request): Unit = {
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
     val fetchRequest = request.body[FetchRequest]
     val fetchContext = fetchManager.newContext(
       fetchRequest.metadata,
       fetchRequest.fetchData,
       fetchRequest.toForget,
       fetchRequest.isFromFollower)
    
     val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
       // Fetch API version 11 added preferred replica logic
       Some(new DefaultClientMetadata(
         fetchRequest.rackId,
         clientId,
         request.context.clientAddress,
         request.context.principal,
         request.context.listenerName.value))
     } else {
       None
     }
    
     val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]()
     val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower) {
       // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
       if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
         fetchContext.foreachPartition { (topicPartition, data) =>
           if (!metadataCache.contains(topicPartition))
             erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
           else
             interesting += (topicPartition -> data)
         }
       } else {
         fetchContext.foreachPartition { (part, _) =>
           erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED)
         }
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they are fetching.
       val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
       fetchContext.foreachPartition { (topicPartition, partitionData) =>
         partitionDatas += topicPartition -> partitionData
       }
       val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
       partitionDatas.foreach { case (topicPartition, data) =>
         if (!authorizedTopics.contains(topicPartition.topic))
           erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition))
           erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           interesting += (topicPartition -> data)
       }
     }
    
     ......
    
     // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
     // no bytes were recorded in the recent quota window
     // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
     val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
       Int.MaxValue
     else
       quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
    
     val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
     val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
     if (interesting.isEmpty)
       processResponseCallback(Seq.empty)
     else {
       // call the replica manager to fetch messages from the local replica
       replicaManager.fetchMessages(
         fetchRequest.maxWait.toLong,
         fetchRequest.replicaId,
         fetchMinBytes,
         fetchMaxBytes,
         versionId <= 2,
         interesting,
         replicationQuota(fetchRequest),
         processResponseCallback,
         fetchRequest.isolationLevel,
         clientMetadata)
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
  3. ReplicaManager.scala#fetchMessages() 方法的核心处理相对清晰,需要关注的如下:

    1. 首先调用 ReplicaManager.scala#fetchMessages()#readFromLog() 函数进行实际的数据读取操作,可以看到这个方法的核心是调用 ReplicaManager.scala#fetchMessages()#readFromLocalLog() 方法。

      需注意,如果是来自分区副本 Follower 的 Fetch 请求,此处会调用 ReplicaManager.scala#updateFollowerFetchState() 更新本地保存的远程副本的 LEO,这部分处理主要和 Kafka 分区主从副本的数据同步机制有关,读者如有兴趣可参考 Kafka 3.0 源码笔记(10)-Kafka 服务端消息数据的主从同步源码分析

    2. 根据相关配置决定是否立即回调函数 KafkaApis.scala#handleFetchRequest()#processResponseCallback() 将数据返回给请求方,可以看到能够立即返回的条件有以下 5 个
      • 请求携带的超时参数小于 0,也就是说请求方不愿意等待
      • 请求订阅的 topic 为空
      • 服务端读到了足够的数据,可以返回
      • 读取数据时发生了异常
      • 请求处理期间 Kafka 集群版本发生了变更
    3. 如果不能立即返回,则生成一个延迟操作,将其投入到延迟队列等待触发
     def fetchMessages(timeout: Long,
                     replicaId: Int,
                     fetchMinBytes: Int,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota,
                     responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
                     isolationLevel: IsolationLevel,
                     clientMetadata: Option[ClientMetadata]): Unit = {
     val isFromFollower = Request.isValidBrokerId(replicaId)
     val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
     val fetchIsolation = if (!isFromConsumer)
       FetchLogEnd
     else if (isolationLevel == IsolationLevel.READ_COMMITTED)
       FetchTxnCommitted
     else
       FetchHighWatermark
    
     // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
     val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
     def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
       val result = readFromLocalLog(
         replicaId = replicaId,
         fetchOnlyFromLeader = fetchOnlyFromLeader,
         fetchIsolation = fetchIsolation,
         fetchMaxBytes = fetchMaxBytes,
         hardMaxBytesLimit = hardMaxBytesLimit,
         readPartitionInfo = fetchInfos,
         quota = quota,
         clientMetadata = clientMetadata)
       if (isFromFollower) updateFollowerFetchState(replicaId, result)
       else result
     }
    
     val logReadResults = readFromLog()
    
     // check if this fetch request can be satisfied right away
     var bytesReadable: Long = 0
     var errorReadingData = false
     var hasDivergingEpoch = false
     val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
     logReadResults.foreach { case (topicPartition, logReadResult) =>
       brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
    
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
       if (logReadResult.divergingEpoch.nonEmpty)
         hasDivergingEpoch = true
       bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
       logReadResultMap.put(topicPartition, logReadResult)
     }
    
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
     //                        5) we found a diverging epoch
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
         tp -> result.toFetchPartitionData(isReassignmentFetch)
       }
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
       fetchInfos.foreach { case (topicPartition, partitionData) =>
         logReadResultMap.get(topicPartition).foreach(logReadResult => {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
           fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
       val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
         fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
         responseCallback)
    
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
    
       // try to complete the request immediately, otherwise put it into the purgatory;
       // this is because while the delayed fetch operation is being created, new requests
       // may arrive and hence make this operation completable.
       delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
  4. replicaManager.scala#fetchMessages()#readFromLocalLog() 方法的处理比较简明,重点在于遍历 topic 列表,调用 replicaManager.scala#fetchMessages()#readFromLocalLog()#read() 方法读取每个 topic 下分区内存储的消息数据,可以看到这个方法的关键处理是调用 Partition.scala#readRecords() 方法执行读取操作

     def readFromLocalLog(replicaId: Int,
                        fetchOnlyFromLeader: Boolean,
                        fetchIsolation: FetchIsolation,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                        quota: ReplicaQuota,
                        clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
     val traceEnabled = isTraceEnabled
    
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset
       val partitionFetchSize = fetchInfo.maxBytes
       val followerLogStartOffset = fetchInfo.logStartOffset
    
       val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
       try {
         if (traceEnabled)
           trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
             s"remaining response limit $limitBytes" +
             (if (minOneMessage) s", ignoring response/partition size limits" else ""))
    
         val partition = getPartitionOrException(tp)
         val fetchTimeMs = time.milliseconds
    
         // If we are the leader, determine the preferred read-replica
         val preferredReadReplica = clientMetadata.flatMap(
           metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
    
         if (preferredReadReplica.isDefined) {
           replicaSelectorOpt.foreach { selector =>
             debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
               s"${preferredReadReplica.get} for $clientMetadata")
           }
           // If a preferred read-replica is set, skip the read
           val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = offsetSnapshot.highWatermark.messageOffset,
             leaderLogStartOffset = offsetSnapshot.logStartOffset,
             leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = -1L,
             lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
             preferredReadReplica = preferredReadReplica,
             exception = None)
         } else {
           // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
           val readInfo: LogReadInfo = partition.readRecords(
             lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
             fetchOffset = fetchInfo.fetchOffset,
             currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
             maxBytes = adjustedMaxBytes,
             fetchIsolation = fetchIsolation,
             fetchOnlyFromLeader = fetchOnlyFromLeader,
             minOneMessage = minOneMessage)
    
           val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
             // If the partition is being throttled, simply return an empty set.
             FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
           } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
             // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
             // progress in such cases and don't need to report a `RecordTooLargeException`
             FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
           } else {
             readInfo.fetchedData
           }
    
           LogReadResult(info = fetchDataInfo,
             divergingEpoch = readInfo.divergingEpoch,
             highWatermark = readInfo.highWatermark,
             leaderLogStartOffset = readInfo.logStartOffset,
             leaderLogEndOffset = readInfo.logEndOffset,
             followerLogStartOffset = followerLogStartOffset,
             fetchTimeMs = fetchTimeMs,
             lastStableOffset = Some(readInfo.lastStableOffset),
             preferredReadReplica = preferredReadReplica,
             exception = None)
         }
       } catch {
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
         // is supposed to indicate un-expected failure of a broker in handling a fetch request
         case e@ (_: UnknownTopicOrPartitionException |
                  _: NotLeaderOrFollowerException |
                  _: UnknownLeaderEpochException |
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
                  _: OffsetOutOfRangeException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = Log.UnknownOffset,
             leaderLogStartOffset = Log.UnknownOffset,
             leaderLogEndOffset = Log.UnknownOffset,
             followerLogStartOffset = Log.UnknownOffset,
             fetchTimeMs = -1L,
             lastStableOffset = None,
             exception = Some(e))
         case e: Throwable =>
           brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
           brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
    
           val fetchSource = Request.describeReplicaId(replicaId)
           error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
             s"on partition $tp: $fetchInfo", e)
    
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             divergingEpoch = None,
             highWatermark = Log.UnknownOffset,
             leaderLogStartOffset = Log.UnknownOffset,
             leaderLogEndOffset = Log.UnknownOffset,
             followerLogStartOffset = Log.UnknownOffset,
             fetchTimeMs = -1L,
             lastStableOffset = None,
             exception = Some(e))
       }
     }
    
     var limitBytes = fetchMaxBytes
     val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
     var minOneMessage = !hardMaxBytesLimit
     readPartitionInfo.foreach { case (tp, fetchInfo) =>
       val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
       val recordBatchSize = readResult.info.records.sizeInBytes
       // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
       if (recordBatchSize > 0)
         minOneMessage = false
       limitBytes = math.max(0, limitBytes - recordBatchSize)
       result += (tp -> readResult)
     }
     result
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
  5. Partition.scala#readRecords() 方法执行读取操作会比较请求带来的集群版本号和服务端保存的集群版本号,从而确定客户端和服务端是否有版本分歧,出现分歧需要在响应中通知到请求方做相应处理。如果不存在分歧,则调用 Log.scala#read() 方法开始进入下一级 Log 数据结构读取数据

      def readRecords(lastFetchedEpoch: Optional[Integer],
                   fetchOffset: Long,
                   currentLeaderEpoch: Optional[Integer],
                   maxBytes: Int,
                   fetchIsolation: FetchIsolation,
                   fetchOnlyFromLeader: Boolean,
                   minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
    
     // Note we use the log end offset prior to the read. This ensures that any appends following
     // the fetch do not prevent a follower from coming into sync.
     val initialHighWatermark = localLog.highWatermark
     val initialLogStartOffset = localLog.logStartOffset
     val initialLogEndOffset = localLog.logEndOffset
     val initialLastStableOffset = localLog.lastStableOffset
    
     lastFetchedEpoch.ifPresent { fetchEpoch =>
       val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
       val error = Errors.forCode(epochEndOffset.errorCode)
       if (error != Errors.NONE) {
         throw error.exception()
       }
    
       if (epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
         throw new OffsetOutOfRangeException("Could not determine the end offset of the last fetched epoch " +
           s"$lastFetchedEpoch from the request")
       }
    
       // If fetch offset is less than log start, fail with OffsetOutOfRangeException, regardless of whether epochs are diverging
       if (fetchOffset < initialLogStartOffset) {
         throw new OffsetOutOfRangeException(s"Received request for offset $fetchOffset for partition $topicPartition, " +
           s"but we only have log segments in the range $initialLogStartOffset to $initialLogEndOffset.")
       }
    
       if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
         val emptyFetchData = FetchDataInfo(
           fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
           records = MemoryRecords.EMPTY,
           firstEntryIncomplete = false,
           abortedTransactions = None
         )
    
         val divergingEpoch = new FetchResponseData.EpochEndOffset()
           .setEpoch(epochEndOffset.leaderEpoch)
           .setEndOffset(epochEndOffset.endOffset)
    
         return LogReadInfo(
           fetchedData = emptyFetchData,
           divergingEpoch = Some(divergingEpoch),
           highWatermark = initialHighWatermark,
           logStartOffset = initialLogStartOffset,
           logEndOffset = initialLogEndOffset,
           lastStableOffset = initialLastStableOffset)
       }
     }
    
     val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
     LogReadInfo(
       fetchedData = fetchedData,
       divergingEpoch = None,
       highWatermark = initialHighWatermark,
       logStartOffset = initialLogStartOffset,
       logEndOffset = initialLogEndOffset,
       lastStableOffset = initialLastStableOffset)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
  6. Log.scala#read() 方法的关键处理如下:

    1. 因为 Kafka 的消息数据分散存储在多个日志段中,此处需调用 LogSegments.scala#floorSegment() 通过 startOffset 参数定位到一个日志段对象 LogSegment
    2. 调用 LogSegment.scala#read() 方法读取这个日志段中指定位置的消息数据
      def read(startOffset: Long,
            maxLength: Int,
            isolation: FetchIsolation,
            minOneMessage: Boolean): FetchDataInfo = {
     maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
       trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
         s"total length $size bytes")
    
       val includeAbortedTxns = isolation == FetchTxnCommitted
    
       // Because we don't use the lock for reading, the synchronization is a little bit tricky.
       // We create the local variables to avoid race conditions with updates to the log.
       val endOffsetMetadata = nextOffsetMetadata
       val endOffset = endOffsetMetadata.messageOffset
       var segmentOpt = segments.floorSegment(startOffset)
    
       // return error on attempt to read beyond the log end offset or read below log start offset
       if (startOffset > endOffset || segmentOpt.isEmpty || startOffset < logStartOffset)
         throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
           s"but we only have log segments in the range $logStartOffset to $endOffset.")
    
       val maxOffsetMetadata = isolation match {
         case FetchLogEnd => endOffsetMetadata
         case FetchHighWatermark => fetchHighWatermarkMetadata
         case FetchTxnCommitted => fetchLastStableOffsetMetadata
       }
    
       if (startOffset == maxOffsetMetadata.messageOffset)
         emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
       else if (startOffset > maxOffsetMetadata.messageOffset)
         emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)
       else {
         // Do the read on the segment with a base offset less than the target offset
         // but if that segment doesn't contain any messages with an offset greater than that
         // continue to read from successive segments until we get some messages or we reach the end of the log
         var fetchDataInfo: FetchDataInfo = null
         while (fetchDataInfo == null && segmentOpt.isDefined) {
           val segment = segmentOpt.get
           val baseOffset = segment.baseOffset
    
           val maxPosition =
             // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
             if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
             else segment.size
    
           fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
           if (fetchDataInfo != null) {
             if (includeAbortedTxns)
               fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
           } else segmentOpt = segments.higherSegment(baseOffset)
         }
    
         if (fetchDataInfo != null) fetchDataInfo
         else {
           // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
           // this can happen when all messages with offset larger than start offsets have been deleted.
           // In this case, we will return the empty set with log end offset metadata
           FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
         }
       }
     }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
  7. LogSegment.scala#read() 方法的核心处理如下:

    1. 调用 LogSegment.scala#translateOffset() 方法将消息偏移量转换为消息在文件中的实际物理位点
    2. 确定消息的实际物理位置后,调用 FileRecords.java#slice() 方法返回一个逻辑切片的 FileRecords 对象用于表征消息数据
    @threadsafe
    def read(startOffset: Long,
            maxSize: Int,
            maxPosition: Long = size,
            minOneMessage: Boolean = false): FetchDataInfo = {
     if (maxSize < 0)
       throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
    
     val startOffsetAndSize = translateOffset(startOffset)
    
     // if the start position is already off the end of the log, return null
     if (startOffsetAndSize == null)
       return null
    
     val startPosition = startOffsetAndSize.position
     val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
    
     val adjustedMaxSize =
       if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
       else maxSize
    
     // return a log segment but with zero size in the case below
     if (adjustedMaxSize == 0)
       return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
    
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
     val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
    
     FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
       firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  8. LogSegment.scala#translateOffset() 方法是逻辑偏移量和实际物理位置映射转换的关键,其关键处理如下:

    1. 调用 OffsetIndex.scala#lookup() 方法从偏移量索引文件中找到消息数据的位点,需注意 Kafka 的索引是稀疏索引,因此在相同空间内相比稠密索引可以保存更多索引数据
    2. 调用 FileRecords.java#searchForOffsetWithSize() 方法结合请求的偏移量和索引记录的位置确定消息文件读取的起始位置

    • 稠密索引
      在密集索引中,每个搜索键值都有一个索引记录。这样可以加快搜索速度,但需要更多空间来存储索引记录本身
    • 稀疏索引
      在稀疏索引中,不会为每个搜索关键字创建索引记录,物理上相邻的两个索引记录其搜索关键字实际上不是相邻的,读者可以将其理解为跳跃表那样的结构
    @threadsafe
    private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
     val mapping = offsetIndex.lookup(offset)
     log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
  9. OffsetIndex.scala#lookup() 的源码如下,可以看到关键处有两点:

    1. 偏移量索引使用 mmap 来映射操作索引数据,这样索引数据不需要拷贝到用户态,提高了性能
    2. 调用 AbstractIndex.scala#largestLowerBoundSlotFor() 方法从索引数据中查找确定消息数据读取的起始位置
     def lookup(targetOffset: Long): OffsetPosition = {
     maybeLock(lock) {
       val idx = mmap.duplicate
       val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
       if(slot == -1)
         OffsetPosition(baseOffset, 0)
       else
         parseEntry(idx, slot)
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  10. AbstractIndex.scala#largestLowerBoundSlotFor() 的主要逻辑是从索引数据中二分查找确定消息数据在文件中的物理起始点,这里需要注意索引文件实际进行了冷热分区,其中关键如下:

    1. 使用所有索引数据 entry 的总量 _entries 减去热区数据大小_warmEntries,确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找
    2. 之所以这样处理,是因为 Kafka 的索引是在末尾追加写入的,并且一般写入的数据很快就会被读取,数据热点集中在尾部。索引数据一般都在页缓存中,而操作系统的内存是有限的,必然要通过类似 LRU 的机制淘汰页缓存。如果每次二分查找都从头开始,则索引中间部分的数据所在的页缓存大概率已经被淘汰掉,从而导致缺页中断,必须重新从磁盘上读文件,影响性能

    页缓存也叫文件缓冲,是文件系统数据在内存中的缓存结构,Kafka 的消息数据存储也充分利用了页缓存,如果消息写入消费速度相当,则消费时大概率直接命中缓存而不经过磁盘IO,极大提高性能。但是当某个消费者消费速度落后时,可能会导致 Kafka 节点上的页缓存频繁切换,拖累整个集群的性能

    protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
     indexSlotRangeFor(idx, target, searchEntity)._1
    
    private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
    // check if the index is empty
    if(_entries == 0)
      return (-1, -1)
    
    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
      // binary search for the entry
      var lo = begin
      var hi = end
      while(lo < hi) {
        val mid = (lo + hi + 1) >>> 1
        val found = parseEntry(idx, mid)
        val compareResult = compareIndexEntry(found, target, searchEntity)
        if(compareResult > 0)
          hi = mid - 1
        else if(compareResult < 0)
          lo = mid
        else
          return (mid, mid)
      }
      (lo, if (lo == _entries - 1) -1 else lo + 1)
    }
    
    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
    // check if the target offset is in the warm section of the index
    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
      return binarySearch(firstHotEntry, _entries - 1)
    }
    
    // check if the target offset is smaller than the least offset
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
      return (-1, 0)
    
    binarySearch(0, firstHotEntry)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  11. FileRecords.java#searchForOffsetWithSize() 方法的主要逻辑是根据偏移量找到文件数据中第一个大于该偏移量的消息的物理位点

    public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
        for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
            long offset = batch.lastOffset();
            if (offset >= targetOffset)
                return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
        }
        return null;
    }
    
    public Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
        return () -> batchIterator(start);
    }
    
    private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) {
        final int end;
        if (isSlice)
            end = this.end;
        else
            end = this.sizeInBytes();
        FileLogInputStream inputStream = new FileLogInputStream(this, start, end);
        return new RecordBatchIterator<>(inputStream);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  12. FileRecords.java#slice() 方法返回一个使用偏移量逻辑切片的 FileRecords 对象,需注意其内部包含操作文件流的 FileChannel 对象,后续实际进行网络数据发送的时候会调用到 FileRecords.java#writeTo() 方法将流中指定偏移量的数据写到 socket

    public FileRecords slice(int position, int size) throws IOException {
        int availableBytes = availableBytes(position, size);
        int startPosition = this.start + position;
        return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
  13. FileRecords.java#writeTo() 方法如下,实际发送数据时不需要将文件数据拷贝到用户态,而是通过 FileChannel#transferTo() 方法使用零拷贝直接在内核态把数据发送到 socket 缓冲区,极大提高性能

    @Override
    public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));
    
        long position = start + offset;
        long count = Math.min(length, oldSize - offset);
        return destChannel.transferFrom(channel, position, count);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/626008
推荐阅读
相关标签
  

闽ICP备14008679号