当前位置:   article > 正文

kafka Log 源码结构和初始化逻辑学习笔记

kafka log

Log 源码结构

Log 文件位于core\src\main\scala\kafka\log\Log.scala 目录下。
改文件定义了10个类和对象:

  • LogAppendInfo
    LogAppendInfo类保存了一组写入消息的各种元数据信息;LogAppendInfo伴生对象定义一些工厂方法,用于创建特定的LogAppendInfo实例。
  • Log
    Log伴生对象定义一些常量辅助方法。
    Log 类是Log文件最核心部分。
  • RollParams
    定义控制日志段是否切分的数据结构,对应的伴生对象也是对应的工厂方法。
  • LogMetricNames
    对应Log对象的监测指标。
  • LogOffsetSnapshot
    封装分区所有位移元数据的容器类。
  • LogReadInfo
    封装读取日志数据和返回的元数据。
  • CompletedTxn
    记录已完成的事务的元数据,主要用于构建事务索引。

log 伴生对象定义的常量


object Log {
  val LogFileSuffix = ".log"
  val IndexFileSuffix = ".index"
  val TimeIndexFileSuffix = ".timeindex"
  val ProducerSnapshotFileSuffix = ".snapshot"
  val TxnIndexFileSuffix = ".txnindex"
  val DeletedFileSuffix = ".deleted"
  val CleanedFileSuffix = ".cleaned"
  val SwapFileSuffix = ".swap"
  val CleanShutdownFile = ".kafka_cleanshutdown"
  val DeleteDirSuffix = "-delete"
  val FutureDirSuffix = "-future"
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

前面几种在上一篇博客中有说明。

  • .snapshot 是kafka 为幂等型事务或事务型producer所做的快照文件。
  • .deleted 是删除日志段操作创建的文件。xxx.log ------>xxx.deleted
  • .cleand 和swap都是Compaction 操作后的文件。
  • delete 是删除主题时,分区文件的后缀。
  • future 是用于变更主题分区文件夹地址的。

Log类定义


class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

dir 是日志所在文件夹路径,logStartOffset是日志当前对外可见的最早一条消息的位移值,这两个变量都是var 的,可以多线程更新。还有一种位移Log End Offset(LEO) ,它是表示日志下一条待插入消息的位移值,如下图所示:
offset的区别

Log类定义的重要属性

  • nextOffsetMetadata
    封装了下一条待插入消息的位移值等同于LogEndOffset。
  • highWatermarkMetadata
    分区日志高水位值。
  • segments
    Log 类的属性,保存了分区日志下所有的日志段信息,底层数据结构是ConcurrentNavigableMap ,支持线程安全的方法,还有排序方法来管理日志对象。
    Leader Epoch Cache 对象
    主要是用来判断出现Failure 时是否执行日志截断操作,0.11.0.0版本加入,里面保存了Leader的Epoch和对应位移的映射关系。

Log类的初始化逻辑


 locally {
        val startMs = time.milliseconds
    
    
        // create the log directory if it doesn't exist
        Files.createDirectories(dir.toPath)
    
    
        initializeLeaderEpochCache()
    
    
        val nextOffset = loadSegments()
    
    
        /* Calculate the offset of the next message */
        nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
    
    
        leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
    
    
        logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
    
    
        // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
        leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
    
    
        // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
        // from scratch.
        if (!producerStateManager.isEmpty)
          throw new IllegalStateException("Producer state must be empty during log initialization")
        loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
    
    
        info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
          s"log end offset $logEndOffset in ${time.milliseconds() - startMs} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

1、创建分区日志路径。
2、初始化Leader Epoch Cache.
3、加载日志段对象:(1)执行removeTempFilesAndCollectSwapFiles逻辑。(2)源码开始清空已有日志段集合,并重新加载日志段文件。(3)处理第一步返回的有效.swap 文件集合。(4)recoverLog 操作。
4、更新nextOffsetMetadata 和logStartOffset.
5、更新Leader Epoch Cache,清除无效数据。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/362019
推荐阅读
相关标签
  

闽ICP备14008679号