赞
踩
Log 文件位于core\src\main\scala\kafka\log\Log.scala 目录下。
改文件定义了10个类和对象:
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
val TimeIndexFileSuffix = ".timeindex"
val ProducerSnapshotFileSuffix = ".snapshot"
val TxnIndexFileSuffix = ".txnindex"
val DeletedFileSuffix = ".deleted"
val CleanedFileSuffix = ".cleaned"
val SwapFileSuffix = ".swap"
val CleanShutdownFile = ".kafka_cleanshutdown"
val DeleteDirSuffix = "-delete"
val FutureDirSuffix = "-future"
}
前面几种在上一篇博客中有说明。
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
}
dir 是日志所在文件夹路径,logStartOffset是日志当前对外可见的最早一条消息的位移值,这两个变量都是var 的,可以多线程更新。还有一种位移Log End Offset(LEO) ,它是表示日志下一条待插入消息的位移值,如下图所示:
locally { val startMs = time.milliseconds // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) initializeLeaderEpochCache() val nextOffset = loadSegments() /* Calculate the offset of the next message */ nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset)) logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. if (!producerStateManager.isEmpty) throw new IllegalStateException("Producer state must be empty during log initialization") loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile) info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " + s"log end offset $logEndOffset in ${time.milliseconds() - startMs}
1、创建分区日志路径。
2、初始化Leader Epoch Cache.
3、加载日志段对象:(1)执行removeTempFilesAndCollectSwapFiles逻辑。(2)源码开始清空已有日志段集合,并重新加载日志段文件。(3)处理第一步返回的有效.swap 文件集合。(4)recoverLog 操作。
4、更新nextOffsetMetadata 和logStartOffset.
5、更新Leader Epoch Cache,清除无效数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。