赞
踩
Log 的常见操作分为 4 大部分:
一. 高水位定义
代码只有一行:
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
这行语句传达了两个重要的事实:
看下 LogOffsetMetadata 的代码:
- case class LogOffsetMetadata(messageOffset: Long,
- segmentBaseOffset: Long = Log.UnknownOffset,
- relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
里面保存了三个重要的变量:
二. 获取和设置高水位值
- // getter method:读取高水位的位移值
- def highWatermark: Long = highWatermarkMetadata.messageOffset
-
- // setter method:设置高水位值
- private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
- if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
- throw new IllegalArgumentException("High watermark offset should be non-negative")
-
- lock synchronized { // 保护Log对象修改的Monitor锁
- highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
- producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
- maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分,忽略它……
- }
- trace(s"Setting high watermark $newHighWatermark")
- }
三. 更新高水位值
源码还定义了两个更新高水位值的方法:updateHighWatermark 和 maybeIncrementHighWatermark。从名字上来看,前者是一定要更新高水位值的,而后者是可能会更新也可能不会。
- // updateHighWatermark method
- def updateHighWatermark(hw: Long): Long = {
- // 新高水位值一定介于[Log Start Offset,Log End Offset]之间
- val newHighWatermark = if (hw < logStartOffset)
- logStartOffset
- else if (hw > logEndOffset)
- logEndOffset
- else
- hw
- // 调用Setter方法来更新高水位值
- updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
- newHighWatermark // 最后返回新高水位值
- }
- // maybeIncrementHighWatermark method
- def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
- // 新高水位值不能越过Log End Offset
- if (newHighWatermark.messageOffset > logEndOffset)
- throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
- s"log end offset $logEndOffsetMetadata")
-
- lock.synchronized {
- val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值
-
- // 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新!
- // 另外,如果新高水位值在新日志段上,也可执行更新高水位操作
- if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
- (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
- updateHighWatermarkMetadata(newHighWatermark)
- Some(oldHighWatermark) // 返回老的高水位值
- } else {
- None
- }
- }
- }
需要注意的是,Leader 副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。就像我刚才说的,Follower 副本成功拉取 Leader 副本的消息后必须更新高水位值,但 Producer 端向 Leader 副本写入消息时,分区的高水位值就可能不需要更新——因为它可能需要等待其他 Follower 副本同步的进度。因此,源码中定义了两个更新的方法,它们分别应用于不同的场景。
四. 读取高水位值
关于高水位值管理的最后一个操作是 fetchHighWatermarkMetadata 方法。它不仅仅是获取高水位值,还要获取高水位的其他元数据信息,即日志段起始位移和物理位置信息。下面是它的实现逻辑:
- private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
- checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭
-
- val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量,避免多线程访问干扰
- if (offsetMetadata.messageOffsetOnly) { //没有获得到完整的高水位元数据
- lock.synchronized {
- val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // 通过读日志文件的方式把完整的高水位元数据信息拉出来
- updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
- fullOffset
- }
- } else { // 否则,直接返回即可
- offsetMetadata
- }
- }
所谓的日志段管理,无非就是增删改查。从上一篇知道日志段的定义就是一个Map。
1. 增加
就是Map方法的append
def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)
2. 删除
删除操作相对来说复杂一点。Kafka 有很多留存策略,包括基于时间维度的、基于空间维度的和基于 Log Start Offset 维度的。那啥是留存策略呢?其实,它本质上就是根据一定的规则决定哪些日志段可以删除。
从源码角度来看,Log 中控制删除操作的总入口是 deleteOldSegments 无参方法:
- def deleteOldSegments(): Int = {
- if (config.delete) {
- deleteRetentionMsBreachedSegments() +
- deleteRetentionSizeBreachedSegments() +
- deleteLogStartOffsetBreachedSegments()
- } else {
- deleteLogStartOffsetBreachedSegments()
- }
- }
代码中的 deleteRetentionMsBreachedSegments、deleteRetentionSizeBreachedSegments 和 deleteLogStartOffsetBreachedSegments 分别对应于上面时间、空间、Log Start Offset那 3 个策略。
这张图画出了删除底层调用的方法。上面 3 个留存策略方法底层都会调用带参数版本的 deleteOldSegments 方法,而这个方法又相继调用了 deletableSegments 和 deleteSegments 方法。
首先是带参数版的 deleteOldSegments 方法:
- private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
- lock synchronized {
- val deletable = deletableSegments(predicate)
- if (deletable.nonEmpty)
- info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
- deleteSegments(deletable)
- }
- }
该方法只有两个步骤:
接下来是 deletableSegments 方法:
- private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
- if (segments.isEmpty) { // 如果当前压根就没有任何日志段对象,直接返回
- Seq.empty
- } else {
- val deletable = ArrayBuffer.empty[LogSegment]
- var segmentEntry = segments.firstEntry
-
- // 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
- // 1. 测定条件函数predicate = false
- // 2. 扫描到包含Log对象高水位值所在的日志段对象
- // 3. 最新的日志段对象不包含任何消息
- // 最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。
- // 在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的!
-
- while (segmentEntry != null) {
- val segment = segmentEntry.getValue
- val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
- val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
- if (nextSegmentEntry != null)
- (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
- else
- (null, logEndOffset, segment.size == 0)
-
- if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
- deletable += segment
- segmentEntry = nextSegmentEntry
- } else {
- segmentEntry = null
- }
- }
- deletable
- }
- }
最后是 deleteSegments 方法,这个方法执行真正的日志段删除操作:
- private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
- maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
- val numToDelete = deletable.size
- if (numToDelete > 0) {
- // 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉
- if (segments.size == numToDelete)
- roll()
- lock synchronized {
- checkIfMemoryMappedBufferClosed() // 确保Log对象没有被关闭
- // 删除给定的日志段对象以及底层的物理文件
- removeAndDeleteSegments(deletable, asyncDelete = true)
- // 尝试更新日志的Log Start Offset值
- maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
- }
- }
- numToDelete
- }
- }
为什么要在删除日志段对象之后,尝试更新 Log Start Offset 值?
Log Start Offset 值是整个 Log 对象对外可见消息的最小位移值。如果我们删除了日志段对象,很有可能对外可见消息的范围发生了变化,自然要看一下是否需要更新 Log Start Offset 值。这就是 deleteSegments 方法最后要更新 Log Start Offset 值的原因。也就是上一篇中我手画图说明位移3之前截断想表达的意思。
3. 修改
说完了日志段删除,接下来我们来看如何修改日志段对象。
其实,源码里面不涉及修改日志段对象,所谓的修改或更新也就是替换而已,用新的日志段对象替换老的日志段对象。举个简单的例子。segments.put(1L, newSegment) 语句在没有 Key=1 时是添加日志段,否则就是替换已有日志段。
4. 查询
最后再说下查询日志段对象。源码中需要查询日志段对象的地方太多了,但主要都是利用了 ConcurrentSkipListMap 的现成方法。
Log 对象维护了一些关键位移值数据,比如 Log Start Offset、LEO 等。因为这些数据经常用到。所以分析下。
代码中定义更新 LEO 的 updateLogEndOffset 方法:
- private def updateLogEndOffset(offset: Long): Unit = {
- nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
- if (highWatermark >= offset) {
- updateHighWatermarkMetadata(nextOffsetMetadata)
- }
- if (this.recoveryPoint > offset) {
- this.recoveryPoint = offset
- }
- }
需要注意的是,如果在更新过程中发现新 LEO 值小于高水位值,那么 Kafka 还要更新高水位值,因为对于同一个 Log 对象而言,高水位值是不能越过 LEO 值的。
那么,Log 对象什么时候需要更新 LEO 呢?源码中以下几个地方调用了updateLogEndOffset方法:
那同样,Kafka 什么时候需要更新 Log Start Offset 呢?
1. 写操作
在 Log 中,涉及写操作的方法有 3 个:appendAsLeader、appendAsFollower 和 append。
appendAsLeader 是用于写 Leader 副本的,appendAsFollower 是用于 Follower 副本同步的。它们的底层都调用了 append 方法。
所以看下append方法:
- private def append(records: MemoryRecords,
- origin: AppendOrigin,
- interBrokerProtocolVersion: ApiVersion,
- assignOffsets: Boolean,
- leaderEpoch: Int): LogAppendInfo = {
- maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
- // 第1步:分析和验证待写入消息集合,并返回校验结果
- val appendInfo = analyzeAndValidateRecords(records, origin)
-
- // 如果压根就不需要写入任何消息,直接返回即可
- if (appendInfo.shallowCount == 0)
- return appendInfo
-
- // 第2步:消息格式规整,即删除无效格式消息或无效字节
- var validRecords = trimInvalidBytes(records, appendInfo)
-
- lock synchronized {
- checkIfMemoryMappedBufferClosed() // 确保Log对象未关闭
- if (assignOffsets) { // 需要分配位移
- // 第3步:使用当前LEO值作为待写入消息集合中第一条消息的位移值
- val offset = new LongRef(nextOffsetMetadata.messageOffset)
- appendInfo.firstOffset = Some(offset.value)
- val now = time.milliseconds
- val validateAndOffsetAssignResult = try {
- LogValidator.validateMessagesAndAssignOffsets(validRecords,
- topicPartition,
- offset,
- time,
- now,
- appendInfo.sourceCodec,
- appendInfo.targetCodec,
- config.compact,
- config.messageFormatVersion.recordVersion.value,
- config.messageTimestampType,
- config.messageTimestampDifferenceMaxMs,
- leaderEpoch,
- origin,
- interBrokerProtocolVersion,
- brokerTopicStats)
- } catch {
- case e: IOException =>
- throw new KafkaException(s"Error validating messages while appending to log $name", e)
- }
- // 更新校验结果对象类LogAppendInfo
- validRecords = validateAndOffsetAssignResult.validatedRecords
- appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
- appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
- appendInfo.lastOffset = offset.value - 1
- appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
- if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
- appendInfo.logAppendTime = now
-
- // 第4步:验证消息,确保消息大小不超限
- if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
- for (batch <- validRecords.batches.asScala) {
- if (batch.sizeInBytes > config.maxMessageSize) {
- // we record the original message set size instead of the trimmed size
- // to be consistent with pre-compression bytesRejectedRate recording
- brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
- brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
- s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
- }
- }
- }
- } else { // 直接使用给定的位移值,无需自己分配位移值
- if (!appendInfo.offsetsMonotonic) // 确保消息位移值的单调递增性
- throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
- records.records.asScala.map(_.offset))
-
- if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
- val firstOffset = appendInfo.firstOffset match {
- case Some(offset) => offset
- case None => records.batches.asScala.head.baseOffset()
- }
-
- val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
- throw new UnexpectedAppendOffsetException(
- s"Unexpected offset in append to $topicPartition. $firstOrLast " +
- s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
- s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
- s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
- firstOffset, appendInfo.lastOffset)
- }
- }
-
- // 第5步:更新Leader Epoch缓存
- validRecords.batches.asScala.foreach { batch =>
- if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
- maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
- } else {
- leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
- warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
- cache.clearAndFlush()
- }
- }
- }
-
- // 第6步:确保消息大小不超限
- if (validRecords.sizeInBytes > config.segmentSize) {
- throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
- s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
- }
-
- // 第7步:执行日志切分。当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息
- val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
-
- val logOffsetMetadata = LogOffsetMetadata(
- messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
- segmentBaseOffset = segment.baseOffset,
- relativePositionInSegment = segment.size)
-
- // 第8步:验证事务状态
- val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
- logOffsetMetadata, validRecords, origin)
-
- maybeDuplicate.foreach { duplicate =>
- appendInfo.firstOffset = Some(duplicate.firstOffset)
- appendInfo.lastOffset = duplicate.lastOffset
- appendInfo.logAppendTime = duplicate.timestamp
- appendInfo.logStartOffset = logStartOffset
- return appendInfo
- }
-
- // 第9步:执行真正的消息写入操作,主要调用日志段对象的append方法实现
- segment.append(largestOffset = appendInfo.lastOffset,
- largestTimestamp = appendInfo.maxTimestamp,
- shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
- records = validRecords)
-
- // 第10步:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
- // 前面说过,LEO值永远指向下一条不存在的消息
- updateLogEndOffset(appendInfo.lastOffset + 1)
-
- // 第11步:更新事务状态
- for (producerAppendInfo <- updatedProducers.values) {
- producerStateManager.update(producerAppendInfo)
- }
-
- for (completedTxn <- completedTxns) {
- val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
- segment.updateTxnIndex(completedTxn, lastStableOffset)
- producerStateManager.completeTxn(completedTxn)
- }
-
- producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
- maybeIncrementFirstUnstableOffset()
-
- trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
- s"first offset: ${appendInfo.firstOffset}, " +
- s"next offset: ${nextOffsetMetadata.messageOffset}, " +
- s"and messages: $validRecords")
-
- // 是否需要手动落盘。一般情况下我们不需要设置Broker端参数log.flush.interval.messages
- // 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
- if (unflushedMessages >= config.flushInterval)
- flush()
-
- // 第12步:返回写入结果
- appendInfo
- }
- }
- }
额,过程有点多,用图来描述下:
2. 读取操作
read 方法的流程相对要简单一些,首先来看它的方法签名:
- def read(startOffset: Long,
- maxLength: Int,
- isolation: FetchIsolation,
- minOneMessage: Boolean): FetchDataInfo = {
- ......
- }
它接收 4 个参数,含义如下:
看下read方法的流程:
- def read(startOffset: Long,
- maxLength: Int,
- isolation: FetchIsolation,
- minOneMessage: Boolean): FetchDataInfo = {
- maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
- trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
-
- val includeAbortedTxns = isolation == FetchTxnCommitted
-
- // 读取消息时没有使用Monitor锁同步机制,因此这里取巧了,用本地变量的方式把LEO对象保存起来,避免争用(race condition)
- val endOffsetMetadata = nextOffsetMetadata
- val endOffset = nextOffsetMetadata.messageOffset
- if (startOffset == endOffset) // 如果从LEO处开始读取,那么自然不会返回任何数据,直接返回空消息集合即可
- return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
-
- // 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
- var segmentEntry = segments.floorEntry(startOffset)
-
- // return error on attempt to read beyond the log end offset or read below log start offset
- // 满足以下条件之一将被视为消息越界,即你要读取的消息不在该Log对象中:
- // 1. 要读取的消息位移超过了LEO值
- // 2. 没找到对应的日志段对象
- // 3. 要读取的消息在Log Start Offset之下,同样是对外不可见的消息
- if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
- throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
- s"but we only have log segments in the range $logStartOffset to $endOffset.")
-
- // 查看一下读取隔离级别设置。
- // 普通消费者能够看到[Log Start Offset, LEO)之间的消息
- // 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值,为Kafka事务使用
- // Follower副本消费者能够看到[Log Start Offset,高水位值]之间的消息
- val maxOffsetMetadata = isolation match {
- case FetchLogEnd => nextOffsetMetadata
- case FetchHighWatermark => fetchHighWatermarkMetadata
- case FetchTxnCommitted => fetchLastStableOffsetMetadata
- }
-
- // 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
- if (startOffset > maxOffsetMetadata.messageOffset) {
- val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
- return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
- }
-
- // 开始遍历日志段对象,直到读出东西来或者读到日志末尾
- while (segmentEntry != null) {
- val segment = segmentEntry.getValue
-
- val maxPosition = {
- if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
- maxOffsetMetadata.relativePositionInSegment
- } else {
- segment.size
- }
- }
-
- // 调用日志段对象的read方法执行真正的读取消息操作
- val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
- if (fetchInfo == null) { // 如果没有返回任何消息,去下一个日志段对象试试
- segmentEntry = segments.higherEntry(segmentEntry.getKey)
- } else { // 否则返回
- return if (includeAbortedTxns)
- addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
- else
- fetchInfo
- }
- }
-
- // 已经读到日志末尾还是没有数据返回,只能返回空消息集合
- FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
- }
- }
总结一下日志对日志段的管理操作:
基本到此,kafka的Log对象总结告一段落。最大的感觉就是对Broker内部的操作心里有个数,shell脚本里的一些参数什么时候能用到也了解一些,但感觉还是少点东西,想起来再加吧。看到这里了,莫忘点赞支持奥!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。