赞
踩
Kafka
实际上就是日志消息存储系统, 根据offset
获取对应的消息,消费者获取到消息之后该消息不会立即从mq
中移除,而是继续存储在磁盘中。
Kafka
会将topic
分成多个不同的分区、每个分区中拆分成多个不同的segment
文件存储日志。每个segment
文件都会有 .index
文件 (消息偏移量索引文件),.log
文件(消息物理存放文件),timeindex
文件(时间索引文件),在默认的情况下,每个segment
文件容量最大是为500mb
,如果超过了500mb
则会生成一个新的 segment
文件,且文件命名后几位为上个segment
文件最后offset
值,如:segment01 、segment500 、segment1000
。
存储的消息日志文件在 server.properties
配置文件的 log.dirs
指定的目录下:
index 文件的结构:
.index
文件存储着消息偏移量,其格式为 offset: 194 position: 5124
,position表示物理存放位置,可以通过以下命令查看 .index
索引文件:
kafka-run-class.bat kafka.tools.DumpLogSegments --files 00000000000000000000.index
.log 文件的结构:
log 文件实际存储的是消息数据,主要包含消息体、消息大小、offset、压缩类型…等!主要是下面三个:
offset
:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置。
消息大小
:消息大小占用4byte,用于描述消息的大小。
消息体
:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
这里需要注意的是 .log 文件是采用顺序写的方式记录的,避免磁头的随机移动,从而提升写入速率,提高了写的效率,查看 .log
文件的内容:
kafka-run-class.bat kafka.tools.DumpLogSegments --files 00000000000000000000.log
下面的策略配置,均在 server.properties
配置文件中。
segment 分段策略
分段文件配置默认是500mb ,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件),文件较多时性能会稍微降低。下面是相关配置参数:
##日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
log.roll.hours=72
##segment的索引文件最大尺寸限制,即时log.segment.bytes没达到,也会生成一个新的segment
log.index.size.max.bytes=10*1024*1024
##控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1024*1024*1024
数据文件刷盘策略
当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。这里可以根据消息的数量log.flush.interval.messages
和时间log.flush.interval.ms
进行配置,如果时间设置的过大,有没达到指定的数量的情况下,如果系统挂了,数据就会丢失。
Kafka官方并不建议通过Broker
端的log.flush.interval.messages
和log.flush.interval.ms
来强制写盘,认为数据的可靠性应该通过Replica
来保证,而强制Flush
数据到磁盘会对整体性能产生影响。
##每当producer写入10000条消息时,刷数据到磁盘 配置
log.flush.interval.messages=10000
##每间隔5秒钟时间,刷数据到磁盘
log.flush.interval.ms=5000
日志清理策略:
## 是否开启日志清理
log.cleaner.enable=true
## 日志清理运行的线程数
log.cleaner.threads = 2
## 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖,默认 delete
log.cleanup.policy = delete
## 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
## log.retention.bytes和 log.retention.minutes或 log.retention.hours任意一个达到要求,都会执行删除
log.retention.minutes=300
log.retention.hours=24
## topic每个分区的最大文件大小,-1没有大小限
log.retention.bytes=-1
## 文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes
在kafka
中每个segment file
也有自己的命名规则,每个名字有20个字符,不够用0填充,每个名字从0开始命名,下一个segment file
文件的名字就是,上一个segment file
中最后一条消息的索引值。在.index
文件中,存储的是key-value
格式的,key
代表在.log
中按顺序开始顺序消费的offset
值,value
代表该消息的物理消息存放位置。但是在.index
中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index
记录中,在已有的记录中查找,范围也大大缩小了。
每个 log
文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message
大小不一致)。文件的命名是以该 segment
最小 offset
来命名的,如 000.index
存储offset
为0~368795
的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。
下面演示下kakfa 中是如何根据 offset 定位到具体的消息,假如有两个segment
索引文件为:
00000000000000000000.index
00000000000001000000.index
下面查找 offset
值为 50000
的消息:
00000000000000000000 segment
中index
文件,根据offset值查询到物理存放位置,如下图,根据 二分算法 得到34597< 50000 < 69807
即可以得到消息在 34597
的向下附近的位置,根据 index
中 position
到 .log
文件中找到 message3
,从这个位置向下依次 +1
遍历,得到offset
值50000
的消息。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。