当前位置:   article > 正文

kafka日志对象(二)—— Log_could not find offset index file corresponding to

could not find offset index file corresponding to log file

日志(Log)是日志段(Log Segment)的容器,里面定义了很多管理日志段的操作。

Log 源码结构

Log 源码位于 Kafka core 工程的 log 源码包下,文件名是 Log.scala

Log Class & Object

Log Obj:

  1. object Log {
  2. val LogFileSuffix = ".log"
  3. val IndexFileSuffix = ".index"
  4. val TimeIndexFileSuffix = ".timeindex"
  5. val ProducerSnapshotFileSuffix = ".snapshot"
  6. val TxnIndexFileSuffix = ".txnindex"
  7. val DeletedFileSuffix = ".deleted"
  8. val CleanedFileSuffix = ".cleaned"
  9. val SwapFileSuffix = ".swap"
  10. val CleanShutdownFile = ".kafka_cleanshutdown"
  11. val DeleteDirSuffix = "-delete"
  12. val FutureDirSuffix = "-future"
  13. }

这是 Log Object 定义的所有常量。耳熟能详的.log、.index、.timeindex 和.txnindex 都在里面。介绍几种其他文件类型:

  • .snapshot 是 Kafka 为幂等型或事务型 Producer 所做的快照文件。
  • .deleted 是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker 端把日志段文件从.log 后缀修改为.deleted 后缀。如果你看到一大堆.deleted 后缀的文件名,别慌,这是 Kafka 在执行日志段文件删除。
  • .cleaned 和.swap 都是 Compaction 操作的产物。
  • -delete 则是应用于文件夹的。当你删除一个主题的时候,主题的分区文件夹会被加上这个后缀。
  • -future 是用于变更主题分区文件夹地址的,属于比较高阶的用法。
  1. def filenamePrefixFromOffset(offset: Long): String = {
  2. val nf = NumberFormat.getInstance()
  3. nf.setMinimumIntegerDigits(20)
  4. nf.setMaximumFractionDigits(0)
  5. nf.setGroupingUsed(false)
  6. nf.format(offset)
  7. }

这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka 日志文件固定是 20 位的长度,此方法就是用前面补 0 的方式,把给定位移值扩充成一个固定 20 位长度的字符串。 

举个例子,我们给定一个位移值是 12345,那么 Broker 端磁盘上对应的日志段文件名就应该是 00000000000000012345.log。

Log Class:

  1. class Log(@volatile var dir: File,
  2. @volatile var config: LogConfig,
  3. @volatile var logStartOffset: Long,
  4. @volatile var recoveryPoint: Long,
  5. scheduler: Scheduler,
  6. brokerTopicStats: BrokerTopicStats,
  7. val time: Time,
  8. val maxProducerIdExpirationMs: Int,
  9. val producerIdExpirationCheckIntervalMs: Int,
  10. val topicPartition: TopicPartition,
  11. val producerStateManager: ProducerStateManager,
  12. logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
  13. ……
  14. }

dirlogStartOffset 是最重要的属性。dir 就是这个日志所在的文件夹路径,也就是主题分区的路径。而 logStartOffset,表示日志的当前最早位移。dir 和 logStartOffset 都是 volatile var 类型,表示它们的值是变动的,而且可能被多个线程更新。

Log类常提到的有LEO和HW,用图来描述下:

日志的当前末端位移,也就是 Log End Offset(LEO),它是表示日志下一条待插入消息的位移值,而 Log Start Offset 是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。Log Start Offset 之前的位移可能过期被截断。

位移值 8 是高水位值(High Watermark),它是区分已提交消息和未提交消息的分水岭。

Log类下其他重要的属性:

  1. @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
  2. @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
  3. private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
  4. @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None

nextOffsetMetadata 可以 等同 LEO,下一条要插入的位移值。

highWatermarkMetadata,是分区日志高水位值。

segments,这是 Log 类中非常重要的属性。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象。

Leader Epoch Cache 对象,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的 Leader Epoch Cache 是一个缓存类数据,里面保存了分区 Leader 的 Epoch 值与对应位移值的映射关系。

Log类的初始化:

  1. locally {
  2. val startMs = time.milliseconds
  3. // create the log directory if it doesn't exist
  4. Files.createDirectories(dir.toPath)
  5. initializeLeaderEpochCache()
  6. val nextOffset = loadSegments()
  7. /* Calculate the offset of the next message */
  8. nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
  9. leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
  10. logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
  11. // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
  12. leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
  13. // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
  14. // from scratch.
  15. if (!producerStateManager.isEmpty)
  16. throw new IllegalStateException("Producer state must be empty during log initialization")
  17. loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
  18. info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
  19. s"log end offset $logEndOffset in ${time.milliseconds() - startMs}

主要逻辑用图描述一下:

重点说下第三步,即加载日志段的实现逻辑,以下是 loadSegments 的实现代码:

  1. private def loadSegments(): Long = {
  2. // first do a pass through the files in the log directory and remove any temporary files
  3. // and find any interrupted swap operations
  4. val swapFiles = removeTempFilesAndCollectSwapFiles()
  5. // Now do a second pass and load all the log and index files.
  6. // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
  7. // this happens, restart loading segment files from scratch.
  8. retryOnOffsetOverflow {
  9. // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
  10. // loading of segments. In that case, we also need to close all segments that could have been left open in previous
  11. // call to loadSegmentFiles().
  12. logSegments.foreach(_.close())
  13. segments.clear()
  14. loadSegmentFiles()
  15. }
  16. // Finally, complete any interrupted swap operations. To be crash-safe,
  17. // log files that are replaced by the swap segment should be renamed to .deleted
  18. // before the swap file is restored as the new segment file.
  19. completeSwapOperations(swapFiles)
  20. if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
  21. val nextOffset = retryOnOffsetOverflow {
  22. recoverLog()
  23. }
  24. // reset the index size of the currently active log segment to allow more entries
  25. activeSegment.resizeIndexes(config.maxIndexSize)
  26. nextOffset
  27. } else {
  28. if (logSegments.isEmpty) {
  29. addSegment(LogSegment.open(dir = dir,
  30. baseOffset = 0,
  31. config,
  32. time = time,
  33. fileAlreadyExists = false,
  34. initFileSize = this.initFileSize,
  35. preallocate = false))
  36. }
  37. 0
  38. }

这段代码会对分区日志路径遍历两次。

首先,它会移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等),removeTempFilesAndCollectSwapFiles 方法实现了这个逻辑。

之后,它会清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志段文件的孤立索引文件。待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。

等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。

看下removeTempFilesAndCollectSwapFiles方法的实现:

  1. private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
  2. // 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
  3. def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
  4. info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
  5. val offset = offsetFromFile(baseFile)
  6. Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
  7. Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
  8. Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
  9. }
  10. var swapFiles = Set[File]()
  11. var cleanFiles = Set[File]()
  12. var minCleanedFileOffset = Long.MaxValue
  13. // 遍历分区日志路径下的所有文件
  14. for (file <- dir.listFiles if file.isFile) {
  15. if (!file.canRead) // 如果不可读,直接抛出IOException
  16. throw new IOException(s"Could not read file $file")
  17. val filename = file.getName
  18. if (filename.endsWith(DeletedFileSuffix)) { // 如果以.deleted结尾
  19. debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
  20. Files.deleteIfExists(file.toPath) // 说明是上次Failure遗留下来的文件,直接删除
  21. } else if (filename.endsWith(CleanedFileSuffix)) { // 如果以.cleaned结尾
  22. minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) // 选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中
  23. cleanFiles += file
  24. } else if (filename.endsWith(SwapFileSuffix)) { // 如果以.swap结尾
  25. val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
  26. info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
  27. if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
  28. deleteIndicesIfExist(baseFile) // 删除原来的索引文件
  29. } else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
  30. deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
  31. swapFiles += file // 加入待恢复的.swap文件集合中
  32. }
  33. }
  34. }
  35. // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
  36. val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
  37. invalidSwapFiles.foreach { file =>
  38. debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
  39. val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
  40. deleteIndicesIfExist(baseFile, SwapFileSuffix)
  41. Files.deleteIfExists(file.toPath)
  42. }
  43. // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
  44. // 清除所有待删除文件集合中的文件
  45. cleanFiles.foreach { file =>
  46. debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
  47. Files.deleteIfExists(file.toPath)
  48. }
  49. // 最后返回当前有效的.swap文件集合
  50. validSwapFiles
  51. }

执行完了 removeTempFilesAndCollectSwapFiles 逻辑之后,源码开始清空已有日志段集合,并重新加载日志段文件。这就是第二步。这里调用的主要方法是 loadSegmentFiles。

  1. private def loadSegmentFiles(): Unit = {
  2. // 按照日志段文件名中的位移值正序排列,然后遍历每个文件
  3. for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
  4. if (isIndexFile(file)) { // 如果是索引文件
  5. val offset = offsetFromFile(file)
  6. val logFile = Log.logFile(dir, offset)
  7. if (!logFile.exists) { // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
  8. warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
  9. Files.deleteIfExists(file.toPath)
  10. }
  11. } else if (isLogFile(file)) { // 如果是日志文件
  12. val baseOffset = offsetFromFile(file)
  13. val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
  14. // 创建对应的LogSegment对象实例,并加入segments中
  15. val segment = LogSegment.open(dir = dir,
  16. baseOffset = baseOffset,
  17. config,
  18. time = time,
  19. fileAlreadyExists = true)
  20. try segment.sanityCheck(timeIndexFileNewlyCreated)
  21. catch {
  22. case _: NoSuchFileException =>
  23. error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
  24. "recovering segment and rebuilding index files...")
  25. recoverSegment(segment)
  26. case e: CorruptIndexException =>
  27. warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
  28. s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
  29. recoverSegment(segment)
  30. }
  31. addSegment(segment)
  32. }
  33. }
  34. }

第三步是处理第一步返回的有效.swap 文件集合。completeSwapOperations 方法就是做这件事的:

  1. private def completeSwapOperations(swapFiles: Set[File]): Unit = {
  2. // 遍历所有有效.swap文件
  3. for (swapFile <- swapFiles) {
  4. val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) // 获取对应的日志文件
  5. val baseOffset = offsetFromFile(logFile) // 拿到日志文件的起始位移值
  6. // 创建对应的LogSegment实例
  7. val swapSegment = LogSegment.open(swapFile.getParentFile,
  8. baseOffset = baseOffset,
  9. config,
  10. time = time,
  11. fileSuffix = SwapFileSuffix)
  12. info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
  13. // 执行日志段恢复操作
  14. recoverSegment(swapSegment)
  15. // We create swap files for two cases:
  16. // (1) Log cleaning where multiple segments are merged into one, and
  17. // (2) Log splitting where one segment is split into multiple.
  18. // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
  19. // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
  20. // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
  21. // do a replace with an existing segment.
  22. // 确认之前删除日志段是否成功,是否还存在老的日志段文件
  23. val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
  24. segment.readNextOffset > swapSegment.baseOffset
  25. }
  26. // 如果存在,直接把.swap文件重命名成.log
  27. replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
  28. }
  29. }

最后一步是 recoverLog 操作:

  1. private def recoverLog(): Long = {
  2. // if we have the clean shutdown marker, skip recovery
  3. // 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
  4. if (!hasCleanShutdownFile) {
  5. // 获取到上次恢复点以外的所有unflushed日志段对象
  6. val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
  7. var truncated = false
  8. // 遍历这些unflushed日志段
  9. while (unflushed.hasNext && !truncated) {
  10. val segment = unflushed.next
  11. info(s"Recovering unflushed segment ${segment.baseOffset}")
  12. val truncatedBytes =
  13. try {
  14. // 执行恢复日志段操作
  15. recoverSegment(segment, leaderEpochCache)
  16. } catch {
  17. case _: InvalidOffsetException =>
  18. val startOffset = segment.baseOffset
  19. warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
  20. s"creating an empty one with starting offset $startOffset")
  21. segment.truncateTo(startOffset)
  22. }
  23. if (truncatedBytes > 0) { // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
  24. warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
  25. removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
  26. truncated = true
  27. }
  28. }
  29. }
  30. // 这些都做完之后,如果日志段集合不为空
  31. if (logSegments.nonEmpty) {
  32. val logEndOffset = activeSegment.readNextOffset
  33. if (logEndOffset < logStartOffset) { // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象
  34. warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
  35. "This could happen if segment files were deleted from the file system.")
  36. removeAndDeleteSegments(logSegments, asyncDelete = true)
  37. }
  38. }
  39. // 这些都做完之后,如果日志段集合为空了
  40. if (logSegments.isEmpty) {
  41. // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
  42. addSegment(LogSegment.open(dir = dir,
  43. baseOffset = logStartOffset,
  44. config,
  45. time = time,
  46. fileAlreadyExists = false,
  47. initFileSize = this.initFileSize,
  48. preallocate = config.preallocate))
  49. }
  50. // 更新上次恢复点属性,并返回
  51. recoveryPoint = activeSegment.readNextOffset
  52. recoveryPoint

最后这些接上个思维导图总结下:

这篇具体是日志如何加载日志段的,那么加载完后的怎么操作呢?别走开,点个赞后请看下一篇。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/362011
推荐阅读
相关标签
  

闽ICP备14008679号