赞
踩
日志(Log)是日志段(Log Segment)的容器,里面定义了很多管理日志段的操作。
Log 源码位于 Kafka core 工程的 log 源码包下,文件名是 Log.scala
Log Obj:
- object Log {
- val LogFileSuffix = ".log"
- val IndexFileSuffix = ".index"
- val TimeIndexFileSuffix = ".timeindex"
- val ProducerSnapshotFileSuffix = ".snapshot"
- val TxnIndexFileSuffix = ".txnindex"
- val DeletedFileSuffix = ".deleted"
- val CleanedFileSuffix = ".cleaned"
- val SwapFileSuffix = ".swap"
- val CleanShutdownFile = ".kafka_cleanshutdown"
- val DeleteDirSuffix = "-delete"
- val FutureDirSuffix = "-future"
- }
这是 Log Object 定义的所有常量。耳熟能详的.log、.index、.timeindex 和.txnindex 都在里面。介绍几种其他文件类型:
- def filenamePrefixFromOffset(offset: Long): String = {
- val nf = NumberFormat.getInstance()
- nf.setMinimumIntegerDigits(20)
- nf.setMaximumFractionDigits(0)
- nf.setGroupingUsed(false)
- nf.format(offset)
- }
这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka 日志文件固定是 20 位的长度,此方法就是用前面补 0 的方式,把给定位移值扩充成一个固定 20 位长度的字符串。
举个例子,我们给定一个位移值是 12345,那么 Broker 端磁盘上对应的日志段文件名就应该是 00000000000000012345.log。
Log Class:
- class Log(@volatile var dir: File,
- @volatile var config: LogConfig,
- @volatile var logStartOffset: Long,
- @volatile var recoveryPoint: Long,
- scheduler: Scheduler,
- brokerTopicStats: BrokerTopicStats,
- val time: Time,
- val maxProducerIdExpirationMs: Int,
- val producerIdExpirationCheckIntervalMs: Int,
- val topicPartition: TopicPartition,
- val producerStateManager: ProducerStateManager,
- logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
- ……
- }
dir 和 logStartOffset 是最重要的属性。dir 就是这个日志所在的文件夹路径,也就是主题分区的路径。而 logStartOffset,表示日志的当前最早位移。dir 和 logStartOffset 都是 volatile var 类型,表示它们的值是变动的,而且可能被多个线程更新。
Log类常提到的有LEO和HW,用图来描述下:
日志的当前末端位移,也就是 Log End Offset(LEO),它是表示日志下一条待插入消息的位移值,而 Log Start Offset 是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。Log Start Offset 之前的位移可能过期被截断。
位移值 8 是高水位值(High Watermark),它是区分已提交消息和未提交消息的分水岭。
Log类下其他重要的属性:
- @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
- @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
- private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
- @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
nextOffsetMetadata 可以 等同 LEO,下一条要插入的位移值。
highWatermarkMetadata,是分区日志高水位值。
segments,这是 Log 类中非常重要的属性。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象。
Leader Epoch Cache 对象,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的 Leader Epoch Cache 是一个缓存类数据,里面保存了分区 Leader 的 Epoch 值与对应位移值的映射关系。
Log类的初始化:
- locally {
- val startMs = time.milliseconds
- // create the log directory if it doesn't exist
- Files.createDirectories(dir.toPath)
- initializeLeaderEpochCache()
-
- val nextOffset = loadSegments()
-
- /* Calculate the offset of the next message */
- nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
-
- leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
-
- logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
-
- // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
- leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
-
- // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
- // from scratch.
- if (!producerStateManager.isEmpty)
- throw new IllegalStateException("Producer state must be empty during log initialization")
- loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
-
- info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
- s"log end offset $logEndOffset in ${time.milliseconds() - startMs}
主要逻辑用图描述一下:
重点说下第三步,即加载日志段的实现逻辑,以下是 loadSegments 的实现代码:
- private def loadSegments(): Long = {
- // first do a pass through the files in the log directory and remove any temporary files
- // and find any interrupted swap operations
- val swapFiles = removeTempFilesAndCollectSwapFiles()
-
- // Now do a second pass and load all the log and index files.
- // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
- // this happens, restart loading segment files from scratch.
- retryOnOffsetOverflow {
- // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
- // loading of segments. In that case, we also need to close all segments that could have been left open in previous
- // call to loadSegmentFiles().
- logSegments.foreach(_.close())
- segments.clear()
- loadSegmentFiles()
- }
-
- // Finally, complete any interrupted swap operations. To be crash-safe,
- // log files that are replaced by the swap segment should be renamed to .deleted
- // before the swap file is restored as the new segment file.
- completeSwapOperations(swapFiles)
-
- if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
- val nextOffset = retryOnOffsetOverflow {
- recoverLog()
- }
-
- // reset the index size of the currently active log segment to allow more entries
- activeSegment.resizeIndexes(config.maxIndexSize)
- nextOffset
- } else {
- if (logSegments.isEmpty) {
- addSegment(LogSegment.open(dir = dir,
- baseOffset = 0,
- config,
- time = time,
- fileAlreadyExists = false,
- initFileSize = this.initFileSize,
- preallocate = false))
- }
- 0
- }
这段代码会对分区日志路径遍历两次。
首先,它会移除上次 Failure 遗留下来的各种临时文件(包括.cleaned、.swap、.deleted 文件等),removeTempFilesAndCollectSwapFiles 方法实现了这个逻辑。
之后,它会清空所有日志段对象,并且再次遍历分区路径,重建日志段 segments Map 并删除无对应日志段文件的孤立索引文件。待执行完这两次遍历之后,它会完成未完成的 swap 操作,即调用 completeSwapOperations 方法。
等这些都做完之后,再调用 recoverLog 方法恢复日志段对象,然后返回恢复之后的分区日志 LEO 值。
看下removeTempFilesAndCollectSwapFiles方法的实现:
- private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
-
- // 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
- def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
-
- info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
-
- val offset = offsetFromFile(baseFile)
-
- Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
- Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
- Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
-
- }
-
- var swapFiles = Set[File]()
- var cleanFiles = Set[File]()
- var minCleanedFileOffset = Long.MaxValue
-
- // 遍历分区日志路径下的所有文件
- for (file <- dir.listFiles if file.isFile) {
- if (!file.canRead) // 如果不可读,直接抛出IOException
- throw new IOException(s"Could not read file $file")
- val filename = file.getName
-
- if (filename.endsWith(DeletedFileSuffix)) { // 如果以.deleted结尾
- debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
- Files.deleteIfExists(file.toPath) // 说明是上次Failure遗留下来的文件,直接删除
-
- } else if (filename.endsWith(CleanedFileSuffix)) { // 如果以.cleaned结尾
- minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) // 选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中
-
- cleanFiles += file
- } else if (filename.endsWith(SwapFileSuffix)) { // 如果以.swap结尾
- val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
- info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
- if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
- deleteIndicesIfExist(baseFile) // 删除原来的索引文件
- } else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
- deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
- swapFiles += file // 加入待恢复的.swap文件集合中
-
- }
- }
- }
-
- // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
-
- val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
-
- invalidSwapFiles.foreach { file =>
- debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
-
- val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
- deleteIndicesIfExist(baseFile, SwapFileSuffix)
- Files.deleteIfExists(file.toPath)
- }
-
- // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
- // 清除所有待删除文件集合中的文件
- cleanFiles.foreach { file =>
- debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
- Files.deleteIfExists(file.toPath)
- }
-
- // 最后返回当前有效的.swap文件集合
- validSwapFiles
-
- }
执行完了 removeTempFilesAndCollectSwapFiles 逻辑之后,源码开始清空已有日志段集合,并重新加载日志段文件。这就是第二步。这里调用的主要方法是 loadSegmentFiles。
- private def loadSegmentFiles(): Unit = {
- // 按照日志段文件名中的位移值正序排列,然后遍历每个文件
- for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
- if (isIndexFile(file)) { // 如果是索引文件
- val offset = offsetFromFile(file)
- val logFile = Log.logFile(dir, offset)
- if (!logFile.exists) { // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
- warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
- Files.deleteIfExists(file.toPath)
- }
- } else if (isLogFile(file)) { // 如果是日志文件
-
- val baseOffset = offsetFromFile(file)
- val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
-
- // 创建对应的LogSegment对象实例,并加入segments中
- val segment = LogSegment.open(dir = dir,
- baseOffset = baseOffset,
- config,
- time = time,
- fileAlreadyExists = true)
- try segment.sanityCheck(timeIndexFileNewlyCreated)
- catch {
- case _: NoSuchFileException =>
- error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
- "recovering segment and rebuilding index files...")
- recoverSegment(segment)
- case e: CorruptIndexException =>
- warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
- s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
- recoverSegment(segment)
- }
- addSegment(segment)
- }
- }
- }
第三步是处理第一步返回的有效.swap 文件集合。completeSwapOperations 方法就是做这件事的:
- private def completeSwapOperations(swapFiles: Set[File]): Unit = {
-
- // 遍历所有有效.swap文件
- for (swapFile <- swapFiles) {
- val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) // 获取对应的日志文件
- val baseOffset = offsetFromFile(logFile) // 拿到日志文件的起始位移值
- // 创建对应的LogSegment实例
- val swapSegment = LogSegment.open(swapFile.getParentFile,
- baseOffset = baseOffset,
- config,
- time = time,
- fileSuffix = SwapFileSuffix)
- info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
- // 执行日志段恢复操作
- recoverSegment(swapSegment)
- // We create swap files for two cases:
- // (1) Log cleaning where multiple segments are merged into one, and
- // (2) Log splitting where one segment is split into multiple.
- // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
- // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
- // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
- // do a replace with an existing segment.
- // 确认之前删除日志段是否成功,是否还存在老的日志段文件
- val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
- segment.readNextOffset > swapSegment.baseOffset
- }
-
- // 如果存在,直接把.swap文件重命名成.log
- replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
- }
- }
最后一步是 recoverLog 操作:
- private def recoverLog(): Long = {
- // if we have the clean shutdown marker, skip recovery
- // 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
- if (!hasCleanShutdownFile) {
- // 获取到上次恢复点以外的所有unflushed日志段对象
- val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
- var truncated = false
-
- // 遍历这些unflushed日志段
- while (unflushed.hasNext && !truncated) {
- val segment = unflushed.next
- info(s"Recovering unflushed segment ${segment.baseOffset}")
- val truncatedBytes =
- try {
- // 执行恢复日志段操作
- recoverSegment(segment, leaderEpochCache)
- } catch {
- case _: InvalidOffsetException =>
- val startOffset = segment.baseOffset
- warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
- s"creating an empty one with starting offset $startOffset")
- segment.truncateTo(startOffset)
- }
- if (truncatedBytes > 0) { // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
- warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
- removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
- truncated = true
- }
- }
- }
-
- // 这些都做完之后,如果日志段集合不为空
- if (logSegments.nonEmpty) {
- val logEndOffset = activeSegment.readNextOffset
- if (logEndOffset < logStartOffset) { // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象
- warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
- "This could happen if segment files were deleted from the file system.")
- removeAndDeleteSegments(logSegments, asyncDelete = true)
- }
- }
-
- // 这些都做完之后,如果日志段集合为空了
- if (logSegments.isEmpty) {
- // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
- addSegment(LogSegment.open(dir = dir,
- baseOffset = logStartOffset,
- config,
- time = time,
- fileAlreadyExists = false,
- initFileSize = this.initFileSize,
- preallocate = config.preallocate))
- }
-
- // 更新上次恢复点属性,并返回
- recoveryPoint = activeSegment.readNextOffset
- recoveryPoint
最后这些接上个思维导图总结下:
这篇具体是日志如何加载日志段的,那么加载完后的怎么操作呢?别走开,点个赞后请看下一篇。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。