赞
踩
Kafka中消息是存储在磁盘上的,基本存储单元是Partition,一个Partition对应一个日志(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment ,相当于一个巨型文件被平分为多个相对较小的文件,便于消息的维护和清理。
Log和LogSegment不是纯粹物理意义上的概念,Log在物理上以文件夹的形式存储(格式:主题名-分区,如:一个名为order的主题存在两个分区,存储格式为:order-0、order-1),而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(如:以.txnindex为后缀的事务索引文件)。
如图所示:
虽然一个Log被切分为多个分段,但只有最后一个LogSegment(当前活跃的日志分段)才能执行写入操作,在此之前所有的LogSegment都不能写入数据。当满足以下任一条件时会创建新的LogSegment:
Log文件在切分时,Kafka会关闭当前正在写入的LogSegment文件并置为只读模式,同时以可读写的模式创建新的LogSegment文件,文件大小默认为1GB。当下次Log切分时才会设置为LogSegment文件的实际大小。即:旧LogSegment文件大小为文件的实际大小,活跃LogSegment大小为默认的1GB。
日志分段详细信息:
- kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
-
- Dumping 00000000000000000000.log
- Starting offset: 0
- baseOffset: 0 lastOffset: 3 count: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1660546405649 size: 117 magic: 2 compresscodec: NONE crc: 1789113225 isvalid: true
- | offset: 0 CreateTime: 1660546405647 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 1 payload: kafka1
- | offset: 1 CreateTime: 1660546405648 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 5 payload: kafka5
- | offset: 2 CreateTime: 1660546405648 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 7 payload: kafka7
- | offset: 3 CreateTime: 1660546405649 keysize: 1 valuesize: 6 sequence: -1 headerKeys: [] key: 8 payload: kafka8
索引的主要目的是提高查找的效率。
Kafka采用稀疏索引(Sparse Index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。而是当写入一定量(由Broker端参数log.index. interval.bytes指定,默认为4KB)的消息后,索引文件才会增加一个索引项。
偏移量索引包含两部分数据,如图所示:
消息查找过程。如图所示:
假设要查找偏移量为23的消息,先通过二分法在偏移量索引文件中找到不大于23最大索引项,即:[22 656],再从日志分段文件中的物理地址656开始顺序查找偏移量为23的消息。
如果要查找要查找下图中偏移量为268的消息:
查找步骤:
如何查找baseOffset=251的日志分段的呢?
Kafka使用了跳跃表结构,每个日志对象中使用ConcurrentSkipListMap来保存各个日志分段,每个日志分段以baseOffset作为key ,这样就可以根据指定偏移量(251<偏移量268<378,因此要查找的日志分段为251)来快速定位消息所在的日志分段。
时间戳索引(时间戳索引名与偏移量索引名一样,只是文件后缀不一样(即:.timeindex和.index))包含两部分数据,如图所示:
时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的timestamp必须大于之前追加的索引项的timestamp ,否则不予追加。
消息查找过程如图所示:
假如要查找指定时间戳targetTimeStamp = 1526384718288开始的消息,首先找到不小于指定时间戳的日志分段。这里无法使用跳跃表来快速定位相应的日志分段, 需要分以下几个步骤来完成:
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加,需要对消息做一定的清理操作。Kafka提供了两种日志清理策略:
kafka有专门的任务来周期性删除不符合条件的日志分段文件,删除策略主要以下有3种:
Broker端可通过参数配置来设置日志的最大保留时间,默认7天。定时任务会查看每个分段的最大时间戳,若最大时间戳距离当前时间超过7天,则需要删除。
删除日志分段时, 首先会从跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作,然后将待删除日志分段对应的所有文件添加上.delete的后缀。最后由专门的定时任务来删除以.delete为后缀的文件。
日志删除任务会检查当前日志的大小是否超过设定的阀值(参数:retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。
注意:这里的日志的大小是指所有Segment的总和,不是单个Segment。
如图所示:
首先计算日志文件的总大小和设定阈值的差值(即:计算需要删除的日志总大小),然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段,放入deletableSegments集合中 ,最后进行删除。删除过程与基于时间删除策略一致。
一般情况下,日志文件的起始偏移logStartOffset等于第1个日志分段的baseOffset ,但logStartOffset是可以被修改的。该策略会判断某个日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset ,是则将其放入deletableSegments集合中,最后进行删除。删除过程与基于时间删除策略一致。
如图所示:
对于有相同key不同value值的消息,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启日志压缩功能,Kafka会定期将相同key的消息进行合井,只保留最新的value值。
如图所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。