赞
踩
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
学习必须往深处挖,挖的越深,基础越扎实!
阶段1、深入多线程
阶段2、深入多线程设计模式
阶段3、深入juc源码解析
码哥源码部分
码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】
码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】
码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】
码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】
打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】
Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作。Log 对象是 Kafka Broker最核心的部分:
- // Log.scala
-
- class Log(@volatile var dir: File,
- @volatile var config: LogConfig,
- @volatile var recoveryPoint: Long = 0L,
- scheduler: Scheduler,
- time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
- }
Log中包含两个核心属性: dir 和 logStartOffset 。dir
是主题分区日志所在的文件夹路径,比如"topic-0"、"topic-1";而 logStartOffset,表示日志的当前最早位移。
Log 的常见操作分为 4 大部分:
Log 是LogSegment的容器,它使用 J.U.C中的 ConcurrentSkipListMap 类来保存所有日志段对象。Kafka 将每个日志段的 起始位移值作为 Key ,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段:
- // Log.scala
-
- private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
对于ConcurrentSkipListMap不了解的童鞋,可以去看我的专栏:《透彻理解Java并发编程》。
Kafka 提供了很多策略(包括基于时间维度、空间维度、Log Start Offset 维度),可以根据一定的规则决定哪些日志段可以删除:
我们重点看下Log的写日志操作,也就是append方法。整个流程可以用下面这张图表述:
- // Log.scala
-
- def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
- // 1.分析和校验待写入消息集合,并返回校验结果
- val appendInfo = analyzeAndValidateRecords(records)
-
- // 如果不需要写入任何消息,直接返回
- if (appendInfo.shallowCount == 0)
- return appendInfo
-
- // 2.消息格式规整,即删除无效格式消息或无效字节
- var validRecords = trimInvalidBytes(records, appendInfo)
-
- try {
- lock synchronized {
- if (assignOffsets) {
- // 3.使用当前LEO值作为待写入消息集合中第一条消息的位移值
- val offset = new LongRef(nextOffsetMetadata.messageOffset)
- appendInfo.firstOffset = offset.value
- val now = time.milliseconds
- val validateAndOffsetAssignResult = try {
- LogValidator.validateMessagesAndAssignOffsets(validRecords,
- offset,
- now,
- appendInfo.sourceCodec,
- appendInfo.targetCodec,
- config.compact,
- config.messageFormatVersion.messageFormatVersion,
- config.messageTimestampType,
- config.messageTimestampDifferenceMaxMs)
- } catch {
- case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
- }
- // 更新校验结果对象类LogAppendInfo
- validRecords = validateAndOffsetAssignResult.validatedRecords
- appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
- appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
- appendInfo.lastOffset = offset.value - 1
- if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
- appendInfo.logAppendTime = now
-
- // 4.验证消息,确保消息大小不超限
- if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
- for (logEntry <- validRecords.shallowEntries.asScala) {
- if (logEntry.sizeInBytes > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
- .format(logEntry.sizeInBytes, config.maxMessageSize))
- }
- }
- }
-
- } else { // 直接使用给定的位移值,无需自己分配位移值;
- // 确保消息位移值的单调递增性
- if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
- }
-
- // 5.确保消息大小不超限
- if (validRecords.sizeInBytes > config.segmentSize) {
- throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
- .format(validRecords.sizeInBytes, config.segmentSize))
- }
-
- // 6.执行日志切分,当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息
- val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
- maxTimestampInMessages = appendInfo.maxTimestamp,
- maxOffsetInMessages = appendInfo.lastOffset)
-
- // 7.执行真正的消息写入操作,主要调用LogSegment.append方法实现
- segment.append(firstOffset = appendInfo.firstOffset,
- largestOffset = appendInfo.lastOffset,
- largestTimestamp = appendInfo.maxTimestamp,
- shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
- records = validRecords)
-
- // 8.更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
- updateLogEndOffset(appendInfo.lastOffset + 1)
-
- // 9.是否需要手动落盘(根据消息数判断)
- // 一般情况下我们不需要设置Broker端参数log.flush.interval.messages, 落盘操作由OS完成
- // 但某些情况下,可以设置该参数来确保高可靠性
- if (unflushedMessages >= config.flushInterval)
- flush()
-
- appendInfo
- }
- } catch {
- case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
- }
- }
日志的写入操作,是通过LogSegment.append()
方法完成的,我下一章节会对LogSegment进行分析。
Log在写日志的过程中,有一个很重要的maybeRoll
方法,负责进行 日志切分 。所谓日志切分,就是说如果当前日志段的剩余容量无法容纳新消息时,就需要新创建一个日志段:
- // Log.scala
-
- private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
- val segment = activeSegment
- val now = time.milliseconds
- val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
- // 容量不足,执行切分
- if (segment.size > config.segmentSize - messagesSize ||
- (segment.size > 0 && reachedRollMs) ||
- segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
- roll(maxOffsetInMessages - Integer.MAX_VALUE)
- } else {
- segment
- }
- }
-
- def roll(expectedNextOffset: Long = 0): LogSegment = {
- val start = time.nanoseconds
- lock synchronized {
- val newOffset = Math.max(expectedNextOffset, logEndOffset)
- val logFile = Log.logFile(dir, newOffset)
- val indexFile = indexFilename(dir, newOffset)
- val timeIndexFile = timeIndexFilename(dir, newOffset)
- for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
- warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
- file.delete()
- }
-
- segments.lastEntry() match {
- case null =>
- case entry => {
- val seg = entry.getValue
- seg.onBecomeInactiveSegment()
- seg.index.trimToValidSize()
- seg.timeIndex.trimToValidSize()
- seg.log.trim()
- }
- }
- // 新建一个日志段
- val segment = new LogSegment(dir,
- startOffset = newOffset,
- indexIntervalBytes = config.indexInterval,
- maxIndexSize = config.maxIndexSize,
- rollJitterMs = config.randomSegmentJitter,
- time = time,
- fileAlreadyExists = false,
- initFileSize = initFileSize,
- preallocate = config.preallocate)
- val prev = addSegment(segment)
- if(prev != null)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
- updateLogEndOffset(nextOffsetMetadata.messageOffset)
- scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
- segment
- }
- }
可以通过Kafka Broker端的参数
segment.bytes
设置日志分段的大小,默认为1G,当前正在写入的LogSegment称为Active LogSegment。
本章,我对Log对象进行了简单的讲解。Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作,我们目前只需要关注它的写日志方法即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。