当前位置:   article > 正文

Kafka源码分析(十九)——Broker:日志子系统——Log_kafka 日志系统

kafka 日志系统

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作。Log 对象是 Kafka Broker最核心的部分:

  1. // Log.scala
  2. class Log(@volatile var dir: File,
  3. @volatile var config: LogConfig,
  4. @volatile var recoveryPoint: Long = 0L,
  5. scheduler: Scheduler,
  6. time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
  7. }

Log中包含两个核心属性: dir 和 logStartOffset 。dir是主题分区日志所在的文件夹路径,比如"topic-0"、"topic-1";而 logStartOffset,表示日志的当前最早位移。

一、核心方法

Log 的常见操作分为 4 大部分:

  • 高水位管理操作: 高水位(HW)的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一;
  • 日志段管理: Log 是日志段的容器,高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题;
  • 关键位移值管理: 日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等,确保这些位移值的正确性,是构建消息引擎一致性的基础;
  • 读写操作: 所谓的操作日志,大体上就是指读写日志,读写操作的作用之大,不言而喻。

 

1.1 日志段管理

Log 是LogSegment的容器,它使用 J.U.C中的 ConcurrentSkipListMap 类来保存所有日志段对象。Kafka 将每个日志段的 起始位移值作为 Key ,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段:

  1. // Log.scala
  2. private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

对于ConcurrentSkipListMap不了解的童鞋,可以去看我的专栏:《透彻理解Java并发编程》。

Kafka 提供了很多策略(包括基于时间维度、空间维度、Log Start Offset 维度),可以根据一定的规则决定哪些日志段可以删除:

1.2 写日志操作

我们重点看下Log的写日志操作,也就是append方法。整个流程可以用下面这张图表述:

  1. // Log.scala
  2. def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
  3. // 1.分析和校验待写入消息集合,并返回校验结果
  4. val appendInfo = analyzeAndValidateRecords(records)
  5. // 如果不需要写入任何消息,直接返回
  6. if (appendInfo.shallowCount == 0)
  7. return appendInfo
  8. // 2.消息格式规整,即删除无效格式消息或无效字节
  9. var validRecords = trimInvalidBytes(records, appendInfo)
  10. try {
  11. lock synchronized {
  12. if (assignOffsets) {
  13. // 3.使用当前LEO值作为待写入消息集合中第一条消息的位移值
  14. val offset = new LongRef(nextOffsetMetadata.messageOffset)
  15. appendInfo.firstOffset = offset.value
  16. val now = time.milliseconds
  17. val validateAndOffsetAssignResult = try {
  18. LogValidator.validateMessagesAndAssignOffsets(validRecords,
  19. offset,
  20. now,
  21. appendInfo.sourceCodec,
  22. appendInfo.targetCodec,
  23. config.compact,
  24. config.messageFormatVersion.messageFormatVersion,
  25. config.messageTimestampType,
  26. config.messageTimestampDifferenceMaxMs)
  27. } catch {
  28. case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
  29. }
  30. // 更新校验结果对象类LogAppendInfo
  31. validRecords = validateAndOffsetAssignResult.validatedRecords
  32. appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
  33. appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
  34. appendInfo.lastOffset = offset.value - 1
  35. if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
  36. appendInfo.logAppendTime = now
  37. // 4.验证消息,确保消息大小不超限
  38. if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
  39. for (logEntry <- validRecords.shallowEntries.asScala) {
  40. if (logEntry.sizeInBytes > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
  41. BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
  42. throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
  43. .format(logEntry.sizeInBytes, config.maxMessageSize))
  44. }
  45. }
  46. }
  47. } else { // 直接使用给定的位移值,无需自己分配位移值;
  48. // 确保消息位移值的单调递增性
  49. if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
  50. throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
  51. }
  52. // 5.确保消息大小不超限
  53. if (validRecords.sizeInBytes > config.segmentSize) {
  54. throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
  55. .format(validRecords.sizeInBytes, config.segmentSize))
  56. }
  57. // 6.执行日志切分,当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息
  58. val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
  59. maxTimestampInMessages = appendInfo.maxTimestamp,
  60. maxOffsetInMessages = appendInfo.lastOffset)
  61. // 7.执行真正的消息写入操作,主要调用LogSegment.append方法实现
  62. segment.append(firstOffset = appendInfo.firstOffset,
  63. largestOffset = appendInfo.lastOffset,
  64. largestTimestamp = appendInfo.maxTimestamp,
  65. shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
  66. records = validRecords)
  67. // 8.更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
  68. updateLogEndOffset(appendInfo.lastOffset + 1)
  69. // 9.是否需要手动落盘(根据消息数判断)
  70. // 一般情况下我们不需要设置Broker端参数log.flush.interval.messages, 落盘操作由OS完成
  71. // 但某些情况下,可以设置该参数来确保高可靠性
  72. if (unflushedMessages >= config.flushInterval)
  73. flush()
  74. appendInfo
  75. }
  76. } catch {
  77. case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
  78. }
  79. }

日志的写入操作,是通过LogSegment.append()方法完成的,我下一章节会对LogSegment进行分析。

1.3 日志切分

Log在写日志的过程中,有一个很重要的maybeRoll方法,负责进行 日志切分 。所谓日志切分,就是说如果当前日志段的剩余容量无法容纳新消息时,就需要新创建一个日志段:

  1. // Log.scala
  2. private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
  3. val segment = activeSegment
  4. val now = time.milliseconds
  5. val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
  6. // 容量不足,执行切分
  7. if (segment.size > config.segmentSize - messagesSize ||
  8. (segment.size > 0 && reachedRollMs) ||
  9. segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
  10. roll(maxOffsetInMessages - Integer.MAX_VALUE)
  11. } else {
  12. segment
  13. }
  14. }
  15. def roll(expectedNextOffset: Long = 0): LogSegment = {
  16. val start = time.nanoseconds
  17. lock synchronized {
  18. val newOffset = Math.max(expectedNextOffset, logEndOffset)
  19. val logFile = Log.logFile(dir, newOffset)
  20. val indexFile = indexFilename(dir, newOffset)
  21. val timeIndexFile = timeIndexFilename(dir, newOffset)
  22. for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
  23. warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
  24. file.delete()
  25. }
  26. segments.lastEntry() match {
  27. case null =>
  28. case entry => {
  29. val seg = entry.getValue
  30. seg.onBecomeInactiveSegment()
  31. seg.index.trimToValidSize()
  32. seg.timeIndex.trimToValidSize()
  33. seg.log.trim()
  34. }
  35. }
  36. // 新建一个日志段
  37. val segment = new LogSegment(dir,
  38. startOffset = newOffset,
  39. indexIntervalBytes = config.indexInterval,
  40. maxIndexSize = config.maxIndexSize,
  41. rollJitterMs = config.randomSegmentJitter,
  42. time = time,
  43. fileAlreadyExists = false,
  44. initFileSize = initFileSize,
  45. preallocate = config.preallocate)
  46. val prev = addSegment(segment)
  47. if(prev != null)
  48. throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
  49. updateLogEndOffset(nextOffsetMetadata.messageOffset)
  50. scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
  51. segment
  52. }
  53. }

可以通过Kafka Broker端的参数segment.bytes设置日志分段的大小,默认为1G,当前正在写入的LogSegment称为Active LogSegment

二、总结

本章,我对Log对象进行了简单的讲解。Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作,我们目前只需要关注它的写日志方法即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/891560
推荐阅读
相关标签
  

闽ICP备14008679号