赞
踩
在 Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中笔者介绍了 Kafka 的客户端之一消费者 Consumer 拉取消息的流程,而Kafka 3.0 源码笔记(2)-Kafka 服务端的启动与请求处理源码分析一文则从服务器层面解析了 Kafka 服务端接收请求的关键,本文则将以来自客户端的 Fetch 请求的处理为例,大致分析 Kafka 服务端对请求的业务处理
此处之所以强调来自客户端的 Fetch 请求(包括消费者 Fetch 请求和分区副本 Follower 作为客户端同步消息的 Fetch 请求) ,是因为在 KRaft 模式下作为集群 Leader 的 Kafka 节点会收到来自 Follower 的同步集群元数据的 Fetch 请求,负责这部分请求的处理组件截然不同,读者如有兴趣可参考 Kafka 3.0 源码笔记(9)-Kafka 服务端元数据的主从同步
Topic
发布订阅的消息主题,用作存储消息的第一层逻辑结构。每个主题包含多个分区,这些分区通常会分布在不同的 Broker 节点上,共同构成一个主题的物理基础Partition
主题下的分区,逻辑上分区由一组副本共同组成,其中 Leader 角色的副本实际存储来自客户端的消息,Follower 角色的副本通过同步 Leader 的数据来保证高可用。分区副本的数据结构为Partition
,是 Kafka 中用来存储消息的第二层结构,每个副本物理上是一组有序的消息日志Log
分区副本中的日志结构,实际是一组日志数据段的集合LogSegment
日志段数据结构,其内部保存了实际保存消息数据的文件对象,主要包括实际消息存储文件FileRecords
,偏移量索引文件OffsetIndex
、时间戳索引文件TimeIndex
等
在 Kafka 的 BrokerServer
接收来自客户端的 Fetch 请求后,会将其投入到请求队列交由上层业务处理器处理,此时将触发 KafkaApis.scala#handle()
方法。可以看到这是个入口方法,其核心逻辑如下:
- 根据请求类型分发请求到不同的方法进行处理,来自客户端的 Fetch 请求将被
KafkaApis.scala#handleFetchRequest()
方法处理- 在 finally 块中调用
ReplicaManager.scala#tryCompleteActions()
方法尝试执行请求处理过程中产生的延时任务
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { try { trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") if (!apiVersionManager.isApiEnabled(request.header.apiKey)) { // The socket server will reject APIs which are not exposed in this scope and close the connection // before handing them to the request handler, so this path should not be exercised in practice throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled") } request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest) case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal) case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal) case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls) case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls) case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest) case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest) case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest) case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest) case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal) case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest) case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal) case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request) case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest) case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request) case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest) case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request) case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures) case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal) case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request) case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { case e: FatalExitError => throw e case e: Throwable => error(s"Unexpected error handling request ${request.requestDesc(true)} " + s"with context ${request.context}", e) requestHelper.handleError(request, e) } finally { // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the // expiration thread for certain delayed operations (e.g. DelayedJoin) replicaManager.tryCompleteActions() // The local completion time may be set while processing the request. Only record it if it's unset. if (request.apiLocalCompleteTimeNanos < 0) request.apiLocalCompleteTimeNanos = time.nanoseconds } }
KafkaApis.scala#handleFetchRequest()
方法非常长,不过核心逻辑比较简单:
- 首先从请求参数中确定客户端订阅消费的 topic
- 调用
ReplicaManager.scala#fetchMessages()
方法去本地文件读取消息数据,并设置KafkaApis.scala#handleFetchRequest()#processResponseCallback()
函数为处理完成后的响应回调
def handleFetchRequest(request: RequestChannel.Request): Unit = { val versionId = request.header.apiVersion val clientId = request.header.clientId val fetchRequest = request.body[FetchRequest] val fetchContext = fetchManager.newContext( fetchRequest.metadata, fetchRequest.fetchData, fetchRequest.toForget, fetchRequest.isFromFollower) val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { // Fetch API version 11 added preferred replica logic Some(new DefaultClientMetadata( fetchRequest.rackId, clientId, request.context.clientAddress, request.context.principal, request.context.listenerName.value)) } else { None } val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { fetchContext.foreachPartition { (topicPartition, data) => if (!metadataCache.contains(topicPartition)) erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else interesting += (topicPartition -> data) } } else { fetchContext.foreachPartition { (part, _) => erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) } } } else { // Regular Kafka consumers need READ permission on each partition they are fetching. val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)] fetchContext.foreachPartition { (topicPartition, partitionData) => partitionDatas += topicPartition -> partitionData } val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) partitionDatas.foreach { case (topicPartition, data) => if (!authorizedTopics.contains(topicPartition.topic)) erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else interesting += (topicPartition -> data) } } ...... // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given // no bytes were recorded in the recent quota window // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress val maxQuotaWindowBytes = if (fetchRequest.isFromFollower) Int.MaxValue else quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) if (interesting.isEmpty) processResponseCallback(Seq.empty) else { // call the replica manager to fetch messages from the local replica replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchMinBytes, fetchMaxBytes, versionId <= 2, interesting, replicationQuota(fetchRequest), processResponseCallback, fetchRequest.isolationLevel, clientMetadata) } }
ReplicaManager.scala#fetchMessages()
方法的核心处理相对清晰,需要关注的如下:
- 首先调用
ReplicaManager.scala#fetchMessages()#readFromLog()
函数进行实际的数据读取操作,可以看到这个方法的核心是调用ReplicaManager.scala#fetchMessages()#readFromLocalLog()
方法。需注意,如果是来自分区副本 Follower 的 Fetch 请求,此处会调用
ReplicaManager.scala#updateFollowerFetchState()
更新本地保存的远程副本的 LEO,这部分处理主要和 Kafka 分区主从副本的数据同步机制有关,读者如有兴趣可参考 Kafka 3.0 源码笔记(10)-Kafka 服务端消息数据的主从同步源码分析- 根据相关配置决定是否立即回调函数
KafkaApis.scala#handleFetchRequest()#processResponseCallback()
将数据返回给请求方,可以看到能够立即返回的条件有以下 5 个
- 请求携带的超时参数小于 0,也就是说请求方不愿意等待
- 请求订阅的 topic 为空
- 服务端读到了足够的数据,可以返回
- 读取数据时发生了异常
- 请求处理期间 Kafka 集群版本发生了变更
- 如果不能立即返回,则生成一个延迟操作,将其投入到延迟队列等待触发
def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, isolationLevel: IsolationLevel, clientMetadata: Option[ClientMetadata]): Unit = { val isFromFollower = Request.isValidBrokerId(replicaId) val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId) val fetchIsolation = if (!isFromConsumer) FetchLogEnd else if (isolationLevel == IsolationLevel.READ_COMMITTED) FetchTxnCommitted else FetchHighWatermark // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata) val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty) def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, fetchIsolation = fetchIsolation, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, clientMetadata = clientMetadata) if (isFromFollower) updateFollowerFetchState(replicaId, result) else result } val logReadResults = readFromLog() // check if this fetch request can be satisfied right away var bytesReadable: Long = 0 var errorReadingData = false var hasDivergingEpoch = false val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] logReadResults.foreach { case (topicPartition, logReadResult) => brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() if (logReadResult.error != Errors.NONE) errorReadingData = true if (logReadResult.divergingEpoch.nonEmpty) hasDivergingEpoch = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) } // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data // 5) we found a diverging epoch if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch) } responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] fetchInfos.foreach { case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests // may arrive and hence make this operation completable. delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }
replicaManager.scala#fetchMessages()#readFromLocalLog()
方法的处理比较简明,重点在于遍历 topic 列表,调用 replicaManager.scala#fetchMessages()#readFromLocalLog()#read()
方法读取每个 topic 下分区内存储的消息数据,可以看到这个方法的关键处理是调用 Partition.scala#readRecords()
方法执行读取操作
def readFromLocalLog(replicaId: Int, fetchOnlyFromLeader: Boolean, fetchIsolation: FetchIsolation, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota, clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = { val traceEnabled = isTraceEnabled def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset val partitionFetchSize = fetchInfo.maxBytes val followerLogStartOffset = fetchInfo.logStartOffset val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes) try { if (traceEnabled) trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " + s"remaining response limit $limitBytes" + (if (minOneMessage) s", ignoring response/partition size limits" else "")) val partition = getPartitionOrException(tp) val fetchTimeMs = time.milliseconds // If we are the leader, determine the preferred read-replica val preferredReadReplica = clientMetadata.flatMap( metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { replicaSelectorOpt.foreach { selector => debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + s"${preferredReadReplica.get} for $clientMetadata") } // If a preferred read-replica is set, skip the read val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = offsetSnapshot.highWatermark.messageOffset, leaderLogStartOffset = offsetSnapshot.logStartOffset, leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = -1L, lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset), preferredReadReplica = preferredReadReplica, exception = None) } else { // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition val readInfo: LogReadInfo = partition.readRecords( lastFetchedEpoch = fetchInfo.lastFetchedEpoch, fetchOffset = fetchInfo.fetchOffset, currentLeaderEpoch = fetchInfo.currentLeaderEpoch, maxBytes = adjustedMaxBytes, fetchIsolation = fetchIsolation, fetchOnlyFromLeader = fetchOnlyFromLeader, minOneMessage = minOneMessage) val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) { // If the partition is being throttled, simply return an empty set. FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else { readInfo.fetchedData } LogReadResult(info = fetchDataInfo, divergingEpoch = readInfo.divergingEpoch, highWatermark = readInfo.highWatermark, leaderLogStartOffset = readInfo.logStartOffset, leaderLogEndOffset = readInfo.logEndOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = fetchTimeMs, lastStableOffset = Some(readInfo.lastStableOffset), preferredReadReplica = preferredReadReplica, exception = None) } } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderOrFollowerException | _: UnknownLeaderEpochException | _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | _: OffsetOutOfRangeException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = Log.UnknownOffset, leaderLogStartOffset = Log.UnknownOffset, leaderLogEndOffset = Log.UnknownOffset, followerLogStartOffset = Log.UnknownOffset, fetchTimeMs = -1L, lastStableOffset = None, exception = Some(e)) case e: Throwable => brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() val fetchSource = Request.describeReplicaId(replicaId) error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " + s"on partition $tp: $fetchInfo", e) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), divergingEpoch = None, highWatermark = Log.UnknownOffset, leaderLogStartOffset = Log.UnknownOffset, leaderLogEndOffset = Log.UnknownOffset, followerLogStartOffset = Log.UnknownOffset, fetchTimeMs = -1L, lastStableOffset = None, exception = Some(e)) } } var limitBytes = fetchMaxBytes val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)] var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) val recordBatchSize = readResult.info.records.sizeInBytes // Once we read from a non-empty partition, we stop ignoring request and partition level size limits if (recordBatchSize > 0) minOneMessage = false limitBytes = math.max(0, limitBytes - recordBatchSize) result += (tp -> readResult) } result }
Partition.scala#readRecords()
方法执行读取操作会比较请求带来的集群版本号和服务端保存的集群版本号,从而确定客户端和服务端是否有版本分歧,出现分歧需要在响应中通知到请求方做相应处理。如果不存在分歧,则调用 Log.scala#read()
方法开始进入下一级 Log
数据结构读取数据
def readRecords(lastFetchedEpoch: Optional[Integer], fetchOffset: Long, currentLeaderEpoch: Optional[Integer], maxBytes: Int, fetchIsolation: FetchIsolation, fetchOnlyFromLeader: Boolean, minOneMessage: Boolean): LogReadInfo = inReadLock(leaderIsrUpdateLock) { // decide whether to only fetch from leader val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader) // Note we use the log end offset prior to the read. This ensures that any appends following // the fetch do not prevent a follower from coming into sync. val initialHighWatermark = localLog.highWatermark val initialLogStartOffset = localLog.logStartOffset val initialLogEndOffset = localLog.logEndOffset val initialLastStableOffset = localLog.lastStableOffset lastFetchedEpoch.ifPresent { fetchEpoch => val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false) val error = Errors.forCode(epochEndOffset.errorCode) if (error != Errors.NONE) { throw error.exception() } if (epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) { throw new OffsetOutOfRangeException("Could not determine the end offset of the last fetched epoch " + s"$lastFetchedEpoch from the request") } // If fetch offset is less than log start, fail with OffsetOutOfRangeException, regardless of whether epochs are diverging if (fetchOffset < initialLogStartOffset) { throw new OffsetOutOfRangeException(s"Received request for offset $fetchOffset for partition $topicPartition, " + s"but we only have log segments in the range $initialLogStartOffset to $initialLogEndOffset.") } if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) { val emptyFetchData = FetchDataInfo( fetchOffsetMetadata = LogOffsetMetadata(fetchOffset), records = MemoryRecords.EMPTY, firstEntryIncomplete = false, abortedTransactions = None ) val divergingEpoch = new FetchResponseData.EpochEndOffset() .setEpoch(epochEndOffset.leaderEpoch) .setEndOffset(epochEndOffset.endOffset) return LogReadInfo( fetchedData = emptyFetchData, divergingEpoch = Some(divergingEpoch), highWatermark = initialHighWatermark, logStartOffset = initialLogStartOffset, logEndOffset = initialLogEndOffset, lastStableOffset = initialLastStableOffset) } } val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage) LogReadInfo( fetchedData = fetchedData, divergingEpoch = None, highWatermark = initialHighWatermark, logStartOffset = initialLogStartOffset, logEndOffset = initialLogEndOffset, lastStableOffset = initialLastStableOffset) }
Log.scala#read()
方法的关键处理如下:
- 因为 Kafka 的消息数据分散存储在多个日志段中,此处需调用
LogSegments.scala#floorSegment()
通过 startOffset 参数定位到一个日志段对象LogSegment
- 调用
LogSegment.scala#read()
方法读取这个日志段中指定位置的消息数据
def read(startOffset: Long, maxLength: Int, isolation: FetchIsolation, minOneMessage: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " + s"total length $size bytes") val includeAbortedTxns = isolation == FetchTxnCommitted // Because we don't use the lock for reading, the synchronization is a little bit tricky. // We create the local variables to avoid race conditions with updates to the log. val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset var segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset if (startOffset > endOffset || segmentOpt.isEmpty || startOffset < logStartOffset) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $endOffset.") val maxOffsetMetadata = isolation match { case FetchLogEnd => endOffsetMetadata case FetchHighWatermark => fetchHighWatermarkMetadata case FetchTxnCommitted => fetchLastStableOffsetMetadata } if (startOffset == maxOffsetMetadata.messageOffset) emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) else if (startOffset > maxOffsetMetadata.messageOffset) emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns) else { // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log var fetchDataInfo: FetchDataInfo = null while (fetchDataInfo == null && segmentOpt.isDefined) { val segment = segmentOpt.get val baseOffset = segment.baseOffset val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment else segment.size fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) } else segmentOpt = segments.higherSegment(baseOffset) } if (fetchDataInfo != null) fetchDataInfo else { // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } } }
LogSegment.scala#read()
方法的核心处理如下:
- 调用
LogSegment.scala#translateOffset()
方法将消息偏移量转换为消息在文件中的实际物理位点- 确定消息的实际物理位置后,调用
FileRecords.java#slice()
方法返回一个逻辑切片的FileRecords
对象用于表征消息数据
@threadsafe def read(startOffset: Long, maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { if (maxSize < 0) throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log") val startOffsetAndSize = translateOffset(startOffset) // if the start position is already off the end of the log, return null if (startOffsetAndSize == null) return null val startPosition = startOffsetAndSize.position val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize // return a log segment but with zero size in the case below if (adjustedMaxSize == 0) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) }
LogSegment.scala#translateOffset()
方法是逻辑偏移量和实际物理位置映射转换的关键,其关键处理如下:
- 调用
OffsetIndex.scala#lookup()
方法从偏移量索引文件中找到消息数据的位点,需注意 Kafka 的索引是稀疏索引,因此在相同空间内相比稠密索引可以保存更多索引数据- 调用
FileRecords.java#searchForOffsetWithSize()
方法结合请求的偏移量和索引记录的位置确定消息文件读取的起始位置
稠密索引
在密集索引中,每个搜索键值都有一个索引记录。这样可以加快搜索速度,但需要更多空间来存储索引记录本身稀疏索引
在稀疏索引中,不会为每个搜索关键字创建索引记录,物理上相邻的两个索引记录其搜索关键字实际上不是相邻的,读者可以将其理解为跳跃表那样的结构
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
val mapping = offsetIndex.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
OffsetIndex.scala#lookup()
的源码如下,可以看到关键处有两点:
- 偏移量索引使用 mmap 来映射操作索引数据,这样索引数据不需要拷贝到用户态,提高了性能
- 调用
AbstractIndex.scala#largestLowerBoundSlotFor()
方法从索引数据中查找确定消息数据读取的起始位置
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
}
AbstractIndex.scala#largestLowerBoundSlotFor()
的主要逻辑是从索引数据中二分查找确定消息数据在文件中的物理起始点,这里需要注意索引文件实际进行了冷热分区,其中关键如下:
- 使用所有索引数据 entry 的总量
_entries
减去热区数据大小_warmEntries
,确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找- 之所以这样处理,是因为 Kafka 的索引是在末尾追加写入的,并且一般写入的数据很快就会被读取,数据热点集中在尾部。索引数据一般都在页缓存中,而操作系统的内存是有限的,必然要通过类似 LRU 的机制淘汰页缓存。如果每次二分查找都从头开始,则索引中间部分的数据所在的页缓存大概率已经被淘汰掉,从而导致缺页中断,必须重新从磁盘上读文件,影响性能
页缓存也叫文件缓冲,是文件系统数据在内存中的缓存结构,Kafka 的消息数据存储也充分利用了页缓存,如果消息写入消费速度相当,则消费时大概率直接命中缓存而不经过磁盘IO,极大提高性能。但是当某个消费者消费速度落后时,可能会导致 Kafka 节点上的页缓存频繁切换,拖累整个集群的性能
protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int = indexSlotRangeFor(idx, target, searchEntity)._1 private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = { // check if the index is empty if(_entries == 0) return (-1, -1) def binarySearch(begin: Int, end: Int) : (Int, Int) = { // binary search for the entry var lo = begin var hi = end while(lo < hi) { val mid = (lo + hi + 1) >>> 1 val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) } val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries) // check if the target offset is in the warm section of the index if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { return binarySearch(firstHotEntry, _entries - 1) } // check if the target offset is smaller than the least offset if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) binarySearch(0, firstHotEntry) }
FileRecords.java#searchForOffsetWithSize()
方法的主要逻辑是根据偏移量找到文件数据中第一个大于该偏移量的消息的物理位点
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { long offset = batch.lastOffset(); if (offset >= targetOffset) return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes()); } return null; } public Iterable<FileChannelRecordBatch> batchesFrom(final int start) { return () -> batchIterator(start); } private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) { final int end; if (isSlice) end = this.end; else end = this.sizeInBytes(); FileLogInputStream inputStream = new FileLogInputStream(this, start, end); return new RecordBatchIterator<>(inputStream); }
FileRecords.java#slice()
方法返回一个使用偏移量逻辑切片的 FileRecords
对象,需注意其内部包含操作文件流的 FileChannel
对象,后续实际进行网络数据发送的时候会调用到 FileRecords.java#writeTo()
方法将流中指定偏移量的数据写到 socket
public FileRecords slice(int position, int size) throws IOException {
int availableBytes = availableBytes(position, size);
int startPosition = this.start + position;
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
}
FileRecords.java#writeTo()
方法如下,实际发送数据时不需要将文件数据拷贝到用户态,而是通过 FileChannel#transferTo()
方法使用零拷贝直接在内核态把数据发送到 socket 缓冲区,极大提高性能
@Override
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, oldSize - offset);
return destChannel.transferFrom(channel, position, count);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。