当前位置:   article > 正文

kafka日志对象(三)—— Log的操作_kafka日志截断

kafka日志截断

Log 的常见操作分为 4 大部分:

  • 高水位管理操作:高水位的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一。
  • 日志段管理:Log 是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码的核心。
  • 关键位移值管理:日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等。确保这些位移值的正确性,是构建消息引擎一致性的基础。
  • 读写操作:所谓的操作日志,大体上就是指读写日志。读写操作是kafka高吞吐量的基础。

高水位管理操作

一. 高水位定义

代码只有一行:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

这行语句传达了两个重要的事实:

  • 高水位值是 volatile(易变型)的。因为多个线程可能同时读取它,因此需要设置成 volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用 Java Monitor 锁来确保并发修改的线程安全。
  • 高水位值的初始值是 Log Start Offset 值。上一篇有提到,每个 Log 对象都会维护一个 Log Start Offset 值。当首次构建高水位时,它会被赋值成 Log Start Offset 值。

看下 LogOffsetMetadata 的代码:

  1. case class LogOffsetMetadata(messageOffset: Long,
  2. segmentBaseOffset: Long = Log.UnknownOffset,
  3. relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)

里面保存了三个重要的变量:

  • messageOffset:消息位移值,这是最重要的信息。我们总说高水位值(HW),其实指的就是这个变量的值。
  • segmentBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
  • relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。Kafka 什么时候需要计算位置之间的字节数呢?就是在读取日志的时候。假设每次读取时只能读 1MB 的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了 1MB。

二. 获取和设置高水位值

  1. // getter method:读取高水位的位移值
  2. def highWatermark: Long = highWatermarkMetadata.messageOffset
  3. // setter method:设置高水位值
  4. private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
  5. if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
  6. throw new IllegalArgumentException("High watermark offset should be non-negative")
  7. lock synchronized { // 保护Log对象修改的Monitor锁
  8. highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
  9. producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
  10. maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分,忽略它……
  11. }
  12. trace(s"Setting high watermark $newHighWatermark")
  13. }

三. 更新高水位值

源码还定义了两个更新高水位值的方法:updateHighWatermark 和 maybeIncrementHighWatermark。从名字上来看,前者是一定要更新高水位值的,而后者是可能会更新也可能不会。

  1. // updateHighWatermark method
  2. def updateHighWatermark(hw: Long): Long = {
  3. // 新高水位值一定介于[Log Start Offset,Log End Offset]之间
  4. val newHighWatermark = if (hw < logStartOffset)
  5. logStartOffset
  6. else if (hw > logEndOffset)
  7. logEndOffset
  8. else
  9. hw
  10. // 调用Setter方法来更新高水位值
  11. updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
  12. newHighWatermark // 最后返回新高水位值
  13. }
  14. // maybeIncrementHighWatermark method
  15. def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  16. // 新高水位值不能越过Log End Offset
  17. if (newHighWatermark.messageOffset > logEndOffset)
  18. throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
  19. s"log end offset $logEndOffsetMetadata")
  20. lock.synchronized {
  21. val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值
  22. // 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新!
  23. // 另外,如果新高水位值在新日志段上,也可执行更新高水位操作
  24. if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
  25. (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
  26. updateHighWatermarkMetadata(newHighWatermark)
  27. Some(oldHighWatermark) // 返回老的高水位值
  28. } else {
  29. None
  30. }
  31. }
  32. }
  • updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值;
  • maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。

需要注意的是,Leader 副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。就像我刚才说的,Follower 副本成功拉取 Leader 副本的消息后必须更新高水位值,但 Producer 端向 Leader 副本写入消息时,分区的高水位值就可能不需要更新——因为它可能需要等待其他 Follower 副本同步的进度。因此,源码中定义了两个更新的方法,它们分别应用于不同的场景。

四. 读取高水位值

关于高水位值管理的最后一个操作是 fetchHighWatermarkMetadata 方法。它不仅仅是获取高水位值,还要获取高水位的其他元数据信息,即日志段起始位移和物理位置信息。下面是它的实现逻辑:

  1. private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
  2. checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭
  3. val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量,避免多线程访问干扰
  4. if (offsetMetadata.messageOffsetOnly) { //没有获得到完整的高水位元数据
  5. lock.synchronized {
  6. val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // 通过读日志文件的方式把完整的高水位元数据信息拉出来
  7. updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
  8. fullOffset
  9. }
  10. } else { // 否则,直接返回即可
  11. offsetMetadata
  12. }
  13. }

日志段管理

所谓的日志段管理,无非就是增删改查。从上一篇知道日志段的定义就是一个Map。

1. 增加

就是Map方法的append

def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)

2. 删除

删除操作相对来说复杂一点。Kafka 有很多留存策略,包括基于时间维度的、基于空间维度的和基于 Log Start Offset 维度的。那啥是留存策略呢?其实,它本质上就是根据一定的规则决定哪些日志段可以删除。

从源码角度来看,Log 中控制删除操作的总入口是 deleteOldSegments 无参方法:

  1. def deleteOldSegments(): Int = {
  2. if (config.delete) {
  3. deleteRetentionMsBreachedSegments() +
  4. deleteRetentionSizeBreachedSegments() +
  5. deleteLogStartOffsetBreachedSegments()
  6. } else {
  7. deleteLogStartOffsetBreachedSegments()
  8. }
  9. }

代码中的 deleteRetentionMsBreachedSegments、deleteRetentionSizeBreachedSegments 和 deleteLogStartOffsetBreachedSegments 分别对应于上面时间、空间、Log Start Offset那 3 个策略。

这张图画出了删除底层调用的方法。上面 3 个留存策略方法底层都会调用带参数版本的 deleteOldSegments 方法,而这个方法又相继调用了 deletableSegments 和 deleteSegments 方法。

首先是带参数版的 deleteOldSegments 方法:

  1. private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
  2. lock synchronized {
  3. val deletable = deletableSegments(predicate)
  4. if (deletable.nonEmpty)
  5. info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
  6. deleteSegments(deletable)
  7. }
  8. }

该方法只有两个步骤:

  • 使用传入的函数计算哪些日志段对象能够被删除;
  • 调用 deleteSegments 方法删除这些日志段。

接下来是 deletableSegments 方法:

  1. private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
  2. if (segments.isEmpty) { // 如果当前压根就没有任何日志段对象,直接返回
  3. Seq.empty
  4. } else {
  5. val deletable = ArrayBuffer.empty[LogSegment]
  6. var segmentEntry = segments.firstEntry
  7. // 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
  8. // 1. 测定条件函数predicate = false
  9. // 2. 扫描到包含Log对象高水位值所在的日志段对象
  10. // 3. 最新的日志段对象不包含任何消息
  11. // 最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。
  12. // 在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的!
  13. while (segmentEntry != null) {
  14. val segment = segmentEntry.getValue
  15. val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
  16. val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
  17. if (nextSegmentEntry != null)
  18. (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
  19. else
  20. (null, logEndOffset, segment.size == 0)
  21. if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
  22. deletable += segment
  23. segmentEntry = nextSegmentEntry
  24. } else {
  25. segmentEntry = null
  26. }
  27. }
  28. deletable
  29. }
  30. }

最后是 deleteSegments 方法,这个方法执行真正的日志段删除操作:

  1. private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
  2. maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
  3. val numToDelete = deletable.size
  4. if (numToDelete > 0) {
  5. // 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉
  6. if (segments.size == numToDelete)
  7. roll()
  8. lock synchronized {
  9. checkIfMemoryMappedBufferClosed() // 确保Log对象没有被关闭
  10. // 删除给定的日志段对象以及底层的物理文件
  11. removeAndDeleteSegments(deletable, asyncDelete = true)
  12. // 尝试更新日志的Log Start Offset值
  13. maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
  14. }
  15. }
  16. numToDelete
  17. }
  18. }

为什么要在删除日志段对象之后,尝试更新 Log Start Offset 值?

Log Start Offset 值是整个 Log 对象对外可见消息的最小位移值。如果我们删除了日志段对象,很有可能对外可见消息的范围发生了变化,自然要看一下是否需要更新 Log Start Offset 值。这就是 deleteSegments 方法最后要更新 Log Start Offset 值的原因。也就是上一篇中我手画图说明位移3之前截断想表达的意思。

3. 修改

说完了日志段删除,接下来我们来看如何修改日志段对象。

其实,源码里面不涉及修改日志段对象,所谓的修改或更新也就是替换而已,用新的日志段对象替换老的日志段对象。举个简单的例子。segments.put(1L, newSegment) 语句在没有 Key=1 时是添加日志段,否则就是替换已有日志段。

4. 查询

最后再说下查询日志段对象。源码中需要查询日志段对象的地方太多了,但主要都是利用了 ConcurrentSkipListMap 的现成方法。

  • segments.firstEntry:获取第一个日志段对象;
  • segments.lastEntry:获取最后一个日志段对象,即 Active Segment;
  • segments.higherEntry:获取第一个起始位移值≥给定 Key 值的日志段对象;
  • segments.floorEntry:获取最后一个起始位移值≤给定 Key 值的日志段对象。

关键位移值管理

Log 对象维护了一些关键位移值数据,比如 Log Start Offset、LEO 等。因为这些数据经常用到。所以分析下。

代码中定义更新 LEO 的 updateLogEndOffset 方法:

  1. private def updateLogEndOffset(offset: Long): Unit = {
  2. nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
  3. if (highWatermark >= offset) {
  4. updateHighWatermarkMetadata(nextOffsetMetadata)
  5. }
  6. if (this.recoveryPoint > offset) {
  7. this.recoveryPoint = offset
  8. }
  9. }

需要注意的是,如果在更新过程中发现新 LEO 值小于高水位值,那么 Kafka 还要更新高水位值,因为对于同一个 Log 对象而言,高水位值是不能越过 LEO 值的。

那么,Log 对象什么时候需要更新 LEO 呢?源码中以下几个地方调用了updateLogEndOffset方法:

  • Log 对象初始化时:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。
  • 写入新消息时:这个最容易理解。以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。
  • Log 对象发生日志切分(Log Roll)时:日志切分是啥呢?其实就是创建一个全新的日志段对象,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。
  • 日志截断(Log Truncation)时:这个也是显而易见的。日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。

那同样,Kafka 什么时候需要更新 Log Start Offset 呢?

  • Log 对象初始化时:和 LEO 类似,Log 对象初始化时要给 Log Start Offset 赋值,一般是将第一个日志段的起始位移值赋值给它。
  • 日志截断时:同理,一旦日志中的部分消息被删除,可能会导致 Log Start Offset 发生变化,因此有必要更新该值。
  • Follower 副本同步时:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。
  • 删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 Log Start Offset 值的变化。
  • 删除消息时:严格来说,这么描述有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。

读写操作

1. 写操作

在 Log 中,涉及写操作的方法有 3 个:appendAsLeader、appendAsFollower 和 append。

appendAsLeader 是用于写 Leader 副本的,appendAsFollower 是用于 Follower 副本同步的。它们的底层都调用了 append 方法。

所以看下append方法:

  1. private def append(records: MemoryRecords,
  2. origin: AppendOrigin,
  3. interBrokerProtocolVersion: ApiVersion,
  4. assignOffsets: Boolean,
  5. leaderEpoch: Int): LogAppendInfo = {
  6. maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
  7. // 第1步:分析和验证待写入消息集合,并返回校验结果
  8. val appendInfo = analyzeAndValidateRecords(records, origin)
  9. // 如果压根就不需要写入任何消息,直接返回即可
  10. if (appendInfo.shallowCount == 0)
  11. return appendInfo
  12. // 第2步:消息格式规整,即删除无效格式消息或无效字节
  13. var validRecords = trimInvalidBytes(records, appendInfo)
  14. lock synchronized {
  15. checkIfMemoryMappedBufferClosed() // 确保Log对象未关闭
  16. if (assignOffsets) { // 需要分配位移
  17. // 第3步:使用当前LEO值作为待写入消息集合中第一条消息的位移值
  18. val offset = new LongRef(nextOffsetMetadata.messageOffset)
  19. appendInfo.firstOffset = Some(offset.value)
  20. val now = time.milliseconds
  21. val validateAndOffsetAssignResult = try {
  22. LogValidator.validateMessagesAndAssignOffsets(validRecords,
  23. topicPartition,
  24. offset,
  25. time,
  26. now,
  27. appendInfo.sourceCodec,
  28. appendInfo.targetCodec,
  29. config.compact,
  30. config.messageFormatVersion.recordVersion.value,
  31. config.messageTimestampType,
  32. config.messageTimestampDifferenceMaxMs,
  33. leaderEpoch,
  34. origin,
  35. interBrokerProtocolVersion,
  36. brokerTopicStats)
  37. } catch {
  38. case e: IOException =>
  39. throw new KafkaException(s"Error validating messages while appending to log $name", e)
  40. }
  41. // 更新校验结果对象类LogAppendInfo
  42. validRecords = validateAndOffsetAssignResult.validatedRecords
  43. appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
  44. appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
  45. appendInfo.lastOffset = offset.value - 1
  46. appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
  47. if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
  48. appendInfo.logAppendTime = now
  49. // 第4步:验证消息,确保消息大小不超限
  50. if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
  51. for (batch <- validRecords.batches.asScala) {
  52. if (batch.sizeInBytes > config.maxMessageSize) {
  53. // we record the original message set size instead of the trimmed size
  54. // to be consistent with pre-compression bytesRejectedRate recording
  55. brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
  56. brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
  57. throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
  58. s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
  59. }
  60. }
  61. }
  62. } else { // 直接使用给定的位移值,无需自己分配位移值
  63. if (!appendInfo.offsetsMonotonic) // 确保消息位移值的单调递增性
  64. throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
  65. records.records.asScala.map(_.offset))
  66. if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
  67. val firstOffset = appendInfo.firstOffset match {
  68. case Some(offset) => offset
  69. case None => records.batches.asScala.head.baseOffset()
  70. }
  71. val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
  72. throw new UnexpectedAppendOffsetException(
  73. s"Unexpected offset in append to $topicPartition. $firstOrLast " +
  74. s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
  75. s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
  76. s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
  77. firstOffset, appendInfo.lastOffset)
  78. }
  79. }
  80. // 第5步:更新Leader Epoch缓存
  81. validRecords.batches.asScala.foreach { batch =>
  82. if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
  83. maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
  84. } else {
  85. leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
  86. warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
  87. cache.clearAndFlush()
  88. }
  89. }
  90. }
  91. // 第6步:确保消息大小不超限
  92. if (validRecords.sizeInBytes > config.segmentSize) {
  93. throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
  94. s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
  95. }
  96. // 第7步:执行日志切分。当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息
  97. val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
  98. val logOffsetMetadata = LogOffsetMetadata(
  99. messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
  100. segmentBaseOffset = segment.baseOffset,
  101. relativePositionInSegment = segment.size)
  102. // 第8步:验证事务状态
  103. val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
  104. logOffsetMetadata, validRecords, origin)
  105. maybeDuplicate.foreach { duplicate =>
  106. appendInfo.firstOffset = Some(duplicate.firstOffset)
  107. appendInfo.lastOffset = duplicate.lastOffset
  108. appendInfo.logAppendTime = duplicate.timestamp
  109. appendInfo.logStartOffset = logStartOffset
  110. return appendInfo
  111. }
  112. // 第9步:执行真正的消息写入操作,主要调用日志段对象的append方法实现
  113. segment.append(largestOffset = appendInfo.lastOffset,
  114. largestTimestamp = appendInfo.maxTimestamp,
  115. shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
  116. records = validRecords)
  117. // 第10步:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
  118. // 前面说过,LEO值永远指向下一条不存在的消息
  119. updateLogEndOffset(appendInfo.lastOffset + 1)
  120. // 第11步:更新事务状态
  121. for (producerAppendInfo <- updatedProducers.values) {
  122. producerStateManager.update(producerAppendInfo)
  123. }
  124. for (completedTxn <- completedTxns) {
  125. val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
  126. segment.updateTxnIndex(completedTxn, lastStableOffset)
  127. producerStateManager.completeTxn(completedTxn)
  128. }
  129. producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
  130. maybeIncrementFirstUnstableOffset()
  131. trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
  132. s"first offset: ${appendInfo.firstOffset}, " +
  133. s"next offset: ${nextOffsetMetadata.messageOffset}, " +
  134. s"and messages: $validRecords")
  135. // 是否需要手动落盘。一般情况下我们不需要设置Broker端参数log.flush.interval.messages
  136. // 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
  137. if (unflushedMessages >= config.flushInterval)
  138. flush()
  139. // 第12步:返回写入结果
  140. appendInfo
  141. }
  142. }
  143. }

额,过程有点多,用图来描述下:

2. 读取操作

read 方法的流程相对要简单一些,首先来看它的方法签名:

  1. def read(startOffset: Long,
  2. maxLength: Int,
  3. isolation: FetchIsolation,
  4. minOneMessage: Boolean): FetchDataInfo = {
  5. ......
  6. }

它接收 4 个参数,含义如下:

  • startOffset,即从 Log 对象的哪个位移值开始读消息。
  • maxLength,即最多能读取多少字节。
  • isolation,设置读取隔离级别,主要控制能够读取的最大位移值,多用于 Kafka 事务。
  • minOneMessage,即是否允许至少读一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。

看下read方法的流程:

  1. def read(startOffset: Long,
  2. maxLength: Int,
  3. isolation: FetchIsolation,
  4. minOneMessage: Boolean): FetchDataInfo = {
  5. maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
  6. trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
  7. val includeAbortedTxns = isolation == FetchTxnCommitted
  8. // 读取消息时没有使用Monitor锁同步机制,因此这里取巧了,用本地变量的方式把LEO对象保存起来,避免争用(race condition)
  9. val endOffsetMetadata = nextOffsetMetadata
  10. val endOffset = nextOffsetMetadata.messageOffset
  11. if (startOffset == endOffset) // 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可
  12. return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
  13. // 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
  14. var segmentEntry = segments.floorEntry(startOffset)
  15. // return error on attempt to read beyond the log end offset or read below log start offset
  16. // 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:
  17. // 1. 要读取的消息位移超过了LEO值
  18. // 2. 没找到对应的日志段对象
  19. // 3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
  20. if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
  21. throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
  22. s"but we only have log segments in the range $logStartOffset to $endOffset.")
  23. // 查看一下读取隔离级别设置。
  24. // 普通消费者能够看到[Log Start Offset, LEO)之间的消息
  25. // 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值,为Kafka事务使用
  26. // Follower副本消费者能够看到[Log Start Offset,高水位值]之间的消息
  27. val maxOffsetMetadata = isolation match {
  28. case FetchLogEnd => nextOffsetMetadata
  29. case FetchHighWatermark => fetchHighWatermarkMetadata
  30. case FetchTxnCommitted => fetchLastStableOffsetMetadata
  31. }
  32. // 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
  33. if (startOffset > maxOffsetMetadata.messageOffset) {
  34. val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
  35. return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
  36. }
  37. // 开始遍历日志段对象,直到读出东西来或者读到日志末尾
  38. while (segmentEntry != null) {
  39. val segment = segmentEntry.getValue
  40. val maxPosition = {
  41. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
  42. maxOffsetMetadata.relativePositionInSegment
  43. } else {
  44. segment.size
  45. }
  46. }
  47. // 调用日志段对象的read方法执行真正的读取消息操作
  48. val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
  49. if (fetchInfo == null) { // 如果没有返回任何消息,去下一个日志段对象试试
  50. segmentEntry = segments.higherEntry(segmentEntry.getKey)
  51. } else { // 否则返回
  52. return if (includeAbortedTxns)
  53. addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
  54. else
  55. fetchInfo
  56. }
  57. }
  58. // 已经读到日志末尾还是没有数据返回,只能返回空消息集合
  59. FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
  60. }
  61. }

总结一下日志对日志段的管理操作:

  1. 高水位管理:Log 对象定义了高水位对象以及管理它的各种操作,主要包括更新和读取。
  2. 日志段管理:作为日志段的容器,Log 对象保存了很多日志段对象。日志段对象被组织在一起的方式以及 Kafka Log 对象是如何对它们进行管理的。
  3. 关键位移值管理:主要涉及对 Log Start Offset 和 LEO 的管理。这两个位移值是 Log 对象非常关键的字段。比如,副本管理、状态机管理等高阶功能都要依赖于它们。
  4. 读写操作:日志读写是实现 Kafka 消息引擎基本功能的基石。

基本到此,kafka的Log对象总结告一段落。最大的感觉就是对Broker内部的操作心里有个数,shell脚本里的一些参数什么时候能用到也了解一些,但感觉还是少点东西,想起来再加吧。看到这里了,莫忘点赞支持奥!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/810503
推荐阅读
相关标签
  

闽ICP备14008679号