当前位置:   article > 正文

Kafka核心原理之消息存储机制_kafka broker端的消息存储格式和内容

kafka broker端的消息存储格式和内容

消息存储机制

Kafka中消息是存储在磁盘上的,基本存储单元是Partition,一个Partition对应一个日志(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment ,相当于一个巨型文件被平分为多个相对较小的文件,便于消息的维护和清理。

Log和LogSegment不是纯粹物理意义上的概念,Log在物理上以文件夹的形式存储(格式:主题名-分区,如:一个名为order的主题存在两个分区,存储格式为:order-0、order-1),而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(如:以.txnindex为后缀的事务索引文件)。

如图所示:

日志分段

虽然一个Log被切分为多个分段,但只有最后一个LogSegment(当前活跃的日志分段)才能执行写入操作,在此之前所有的LogSegment都不能写入数据。当满足以下任一条件时会创建新的LogSegment:

  • 当前日志分段文件的大小超过了Broker端参数log.segment.bytes设置的值,默认1GB。
  • 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于参数log.roll.m或log.roll.hours设置的值,默认为7天。
  • 偏移量索引文件或时间戳索引文件的大小达到Broker端参数log.index.size .max.bytes设置的值,默认为10M。
  • 追加消息的偏移量与当前日志分段的偏移量之间的差值大于Integr.MAX_VALUE。

Log文件在切分时,Kafka会关闭当前正在写入的LogSegment文件并置为只读模式,同时以可读写的模式创建新的LogSegment文件,文件大小默认为1GB。当下次Log切分时才会设置为LogSegment文件的实际大小。即:旧LogSegment文件大小为文件的实际大小,活跃LogSegment大小为默认的1GB。

日志分段详细信息:

  1. kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
  2. Dumping 00000000000000000000.log
  3. Starting offset: 0
  4. baseOffset: 0 lastOffset: 3 count: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1660546405649 size: 117 magic: 2 compresscodec: NONE crc: 1789113225 isvalid: true
  5. | offset: 0 CreateTime: 1660546405647 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 1 payload: kafka1
  6. | offset: 1 CreateTime: 1660546405648 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 5 payload: kafka5
  7. | offset: 2 CreateTime: 1660546405648 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 7 payload: kafka7
  8. | offset: 3 CreateTime: 1660546405649 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 8 payload: kafka8

日志索引

索引的主要目的是提高查找的效率。

Kafka采用稀疏索引(Sparse Index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。而是当写入一定量(由Broker端参数log.index. interval.bytes指定,默认为4KB)的消息后,索引文件才会增加一个索引项。

偏移量索引

偏移量索引包含两部分数据,如图所示:

  • relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,当前索引文件的文件名即为baseOffset。
  • position:物理地址,指的是消息在日志分段文件中对应的物理位置。

消息查找过程。如图所示:

假设要查找偏移量为23的消息,先通过二分法在偏移量索引文件中找到不大于23最大索引项,即:[22 656],再从日志分段文件中的物理地址656开始顺序查找偏移量为23的消息。

如果要查找要查找下图中偏移量为268的消息:

查找步骤:

  • 1)定位到baseOffset=251的日志分段。
  • 2)计算相对偏移量relativeOffset:268 - 251=17。
  • 3)在baseOffset=251的索引文件中找到不大于17的索引项。
  • 4)根据索引项中的物理地址(position)从baseOffset=251的日志分段中顺序查找目标消息。

如何查找baseOffset=251的日志分段的呢?

Kafka使用了跳跃表结构,每个日志对象中使用ConcurrentSkipListMap来保存各个日志分段,每个日志分段以baseOffset作为key ,这样就可以根据指定偏移量(251<偏移量268<378,因此要查找的日志分段为251)来快速定位消息所在的日志分段。

时间戳索引

时间戳索引(时间戳索引名与偏移量索引名一样,只是文件后缀不一样(即:.timeindex和.index))包含两部分数据,如图所示:

  • timestamp:当前日志分段最大的时间戳。
  • relativeOffset:时间戳所对应的消息的相对偏移量,即:偏移量索引中偏移量。

时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的timestamp必须大于之前追加的索引项的timestamp ,否则不予追加。

消息查找过程如图所示:

假如要查找指定时间戳targetTimeStamp = 1526384718288开始的消息,首先找到不小于指定时间戳的日志分段。这里无法使用跳跃表来快速定位相应的日志分段, 需要分以下几个步骤来完成:

  • 1)targetTimeStamp和每个日志分段中的最大时间戳对比,直到找到不小于targetTimeStam所对应的日志分段。注:日志分段中的最大时间戳的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0 ,则取其值,否则取该日志分段的最近修改时间。
  • 2)找到对应的日志分段后,在时间戳索引文件中使用二分查找算法查找到不大于 targetTimeStamp最大索引项(即:[1526384718283, 28]),从而找到相对偏移量28。
  • 3)再从偏移量索引文件中使用二分法查找到不大于28的最大索引项(即:[26,838])。
  • 4)最后根据物理地址838从步骤1中找到的日志分段中顺序查找目标消息。

日志清理

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加,需要对消息做一定的清理操作。Kafka提供了两种日志清理策略:

  • 日志删除:按照一定的保留策略直接删除不符合条件的日志分段,默认策略。
  • 日志压缩:针对每个消息的Key进行整合,对于有相同key不同value的消息,只保留最后一个版本。

日志删除

kafka有专门的任务来周期性删除不符合条件的日志分段文件,删除策略主要以下有3种:

1)基于时间

Broker端可通过参数配置来设置日志的最大保留时间,默认7天。定时任务会查看每个分段的最大时间戳,若最大时间戳距离当前时间超过7天,则需要删除。

删除日志分段时, 首先会从跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作,然后将待删除日志分段对应的所有文件添加上.delete的后缀。最后由专门的定时任务来删除以.delete为后缀的文件。

2)基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阀值(参数:retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。

注意:这里的日志的大小是指所有Segment的总和,不是单个Segment。

如图所示:

首先计算日志文件的总大小和设定阈值的差值(即:计算需要删除的日志总大小),然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段,放入deletableSegments集合中 ,最后进行删除。删除过程与基于时间删除策略一致。

3)基于日志起始偏移量

一般情况下,日志文件的起始偏移logStartOffset等于第1个日志分段的baseOffset ,但logStartOffset是可以被修改的。该策略会判断某个日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset ,是则将其放入deletableSegments集合中,最后进行删除。删除过程与基于时间删除策略一致。

如图所示:

日志压缩

对于有相同key不同value值的消息,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启日志压缩功能,Kafka会定期将相同key的消息进行合井,只保留最新的value值。

如图所示:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/289139
推荐阅读
相关标签
  

闽ICP备14008679号