赞
踩
我们知道Kafka中的消息是存储在磁盘上的,那么为什么要使用磁盘作为存储介质?具体消息的存储格式又是什么呢?怎么样能够快速检索到指定的消息?消息不可能无限制存储,那么清理规则又是什么呢?
log.dirs=/usr/local/var/lib/kafka-logs
kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。
比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
多个分区在集群中多个broker上的分配方法
1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上
log分段:
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes=1073741824 (设置分段大小,默认是1gb)选项进行设置。
segment 的 index file 和 data file 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充
第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 0000000000000005376.log。对应的 index 为 00000000000000005376.index
kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。
日志文件的树型结构如下图所示
通过下面这条命令可以看到 kafka 消息日志的内容
- sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
-
输出结果为:
- offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
-
可以看到一条消息,会包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
kafka中message的物理结构如下图所示
参数说明:
关键字 | 解释说明 |
---|---|
8 byte offset | 在partition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在partition(分区)内的位置.即offset 表示partition的第多少条message |
4 byte message size | message的大小 |
4 byte message CRC32 | 用CRC32校验message |
1 byte magic | 表示本次发布kafka服务程序协议版本号 |
1 byte attributes | 表示为独立版本、或表识压缩类型、或编码类型 |
4 byte key length | 表示key的长度 |
k byte Key | Key |
value byte payload | 表示实际消息数据 |
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex 和 TimeIndex,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
查 看 索 引 内 容 命令:
sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件内容关系如下
如上图所示,index 文件中存储了索引以及物理偏移量。 log 文件存储了消息的内容。索引文件中保存了部分offset和偏移量position的对应关系。比如 index文件中 [4053,80899],表示在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899.
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回
如果是用kafka默认的api【org.apache.kafka.clients.consumer.KafkaConsumer】来消费,其消费者的offset会更新到一个kafka自带的topic下:__consumer_offsets,__consumer_offsets默认有50个分区
计算指定group在__consumer_offsets的哪个分区
Math.abs(groupID.hashCode()) % numPartitions
无论是kafka默认api,还是java的api,offset的更新方式都有两种:自动提交和手动提交
1.1 自动提交(默认方式)
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation:Math.abs(groupID.hashCode()) % numPartitions。
这种方式也被称为at most once,fetch到消息后就可以更新offset,无论是否消费成功。
1.2 手动提交
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交
1.2.1 同步提交
1.2.2 异步提交
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
1.2.3 异步+同步
日志删除
在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略。
基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments),retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes和log.retention.ms来配置,其中 log.retention.ms 的优先级最高,log.retention.minutes 次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间lastModifiedTime来计算的,而是根据日志分段中最大的时间戳largestTimeStamp 来计算的。因为日志分段的lastModifiedTime可以被有意或无意地修改,比如执行了touch操作,或者分区副本进行了重新分配,lastModifiedTime并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中的最大时间戳 largestTimeStamp 的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取其值,否则才设置为最近修改时间lastModifiedTime。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),如图5-14所示。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是Log中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log日志文件)的大小。单个日志分段的大小由broker 端参数 log.segment.bytes 来限制,默认值为1073741824,即1GB。
基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。
日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
当启用压缩时,对批处理的影响特别明显,因为随着数据大小的增加,压缩通常会变得更有效。特别是在使用基于文本的格式时,比如JSON,压缩的效果会非常明显,压缩比通常在5x到7x之间。此外,记录的批处理主要作为一个客户端操作,负载在传递的过程中,不仅对网络带宽有积极影响,而且对服务端的磁盘I/O利用率也有积极影响。
Kafka利用分段、追加日志的方式,在很大程度上将读写限制为顺序I/O(sequential I/O),这在大多数的存储介质上都很快。人们普遍错误地认为硬盘很慢。然而,存储介质的性能,很大程度上依赖于数据被访问的模式。同样在一块普通的7200 RPM SATA硬盘上,随机I/O(random I/O)与顺序I/O相比,随机I/O的性能要比顺序I/O慢3到4个数量级。此外,现代的操作系统提供了预先读和延迟写的技术,这些技术可以以块为单位,预先读取大量数据,并将较小的逻辑写操作合并成较大的物理写操作。因此,顺序I/O和随机I/O之间的性能差异在闪存和其他固态非易失性介质中仍然很明显,不过它们在旋转存储,比如固态硬盘中的性能差异就没有那么明显。
有关测试结果表明,一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性(顺序)写入速度可以达到600MB/s,而随机写入速度只有100KB/s,两者性能相差6000倍。
首先了解一下页缓存,页缓存是操作系统用来作为磁盘的一种缓存,减少磁盘的I/O操作。
在写入磁盘的时候其实是写入页缓存中,使得对磁盘的写入变成对内存的写入。写入的页变成脏页,然后操作系统会在合适的时候将脏页写入磁盘中。
在读取的时候如果页缓存命中则直接返回,如果页缓存 miss 则产生缺页中断,从磁盘加载数据至页缓存中,然后返回数据。
并且在读的时候会预读,根据局部性原理当读取的时候会把相邻的磁盘块读入页缓存中。在写入的时候会后写,写入的也是页缓存,这样存着可以将一些小的写入操作合并成大的写入,然后再刷盘。
而且根据磁盘的构造,顺序 I/O 的时候,磁头几乎不用换道,或者换道的时间很短。
根据网上的一些测试结果,顺序写盘的速度比随机写内存还要快。
当然这样的写入存在数据丢失的风险,例如机器突然断电,那些还未刷盘的脏页就丢失了。不过可以调用 fsync
强制刷盘,但是这样对于性能的损耗较大。
因此一般建议通过多副本机制来保证消息的可靠,而不是同步刷盘。
即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
操作系统将数据从磁盘读入到内核空间的页缓存
▪ 应用程序将数据从内核空间读入到用户空间缓存中
▪ 应用程序将数据写回到内核空间到 socket 缓存中
▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有
进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;
在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
Kafka性能的另一个基本原因是,一个值得进一步研究的原因:Kafka在确认写操作之前并没有调用fsync。ACK的唯一要求是记录已经写入I/O缓冲区。这是一个鲜为人知的事实,但却是一个至关重要的事实。实际上,这就是Kafka的执行方式,就好像它是一个内存队列一样——Kafka实际上是一个由磁盘支持的内存队列(受缓冲区/页面缓存大小的限制)。
但是,这种形式的写入是不安全的,因为副本的出错可能导致数据丢失,即使记录似乎已经被ACK。换句话说,与关系型数据库不同,仅写入缓冲区并不意味着持久性。保证Kafka持久性的是运行几个同步的副本。即使其中一个出错了,其他的(假设不止一个)将继续运行——假设出错的原因不会导致其他的副本也出错。因此,无fsync的非阻塞I/O方法和冗余的同步副本组合为Kafka提供了高吞吐、持久性和可用性。
Redis中日志刷盘机制
配置 | 含义 | 效率 | 安全 |
---|---|---|---|
appendfsync=always | 同步持久化每一次修改操作 | 效率最低 | 安全最高,最多丢失一个事件循化的数据 |
appendfsync=everysec | 每秒向aof文件同步一次 | 效率折中 | 安全折中,最多丢失1S的数据 |
appendfsync=no | 关闭向aof文件写入修改 | 效率最高 | 安全最低,最多丢失上一次保存aof文件到当前时刻的全部数据 |
always:写入+同步,每个事件循环,都会将aof_buf的内容写到aof文件,并且同步到aof文件,也就是会完成写入和同步两个过程,此种情况效率最低,安全性最高
everysec:写入+每秒同步一次,每个事件循环,都会将aof_buf中的内容写入到aof文件,但是每间隔1s才会完成一个同步动作,此种情况效率和安全性折中
no:写入+不同步,每个事件循环,都会将aof_buf中的内容写入到aof文件,同步动作交给操作系统完成,自己不管,此种情况效率最高,安全性最低
MySQL中日志刷盘机制
关键字 | 描述 |
---|---|
innodb_flush_log_at_trx_commit=0 | 每隔一秒刷新到文件系统,并调用文件系统的“flush”刷新到磁盘上 |
innodb_flush_log_at_trx_commit=1 | 每次事务提交就刷盘(包括到文件系统以及flush)一次,保证了数据的一致性,需要数据库IO的性能很高 |
innodb_flush_log_at_trx_commit=2 | 每次事务提交将log buffer刷新到文件系统,文件系统每隔一秒刷新到磁盘 |
一个分区中包含一个或多个副本,其中一个为leader副本,其余为follower副本,各个副本位于不同的broker节点中。只有leader副本对外提供服务,follower副本只负责数据同步。· 分区中的所有副本统称为 AR,而ISR 是指与leader 副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的一员。· LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉取到HW之前的消息。从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的HW,进而消费者可以消费到这条消息。
Kafka中采用了多副本的机制,这是大多数分布式系统中惯用的手法,以此来实现水平扩展、提供容灾能力、提升可用性和可靠性等.Kafka是如何通过日志同步机制实现数据的可靠性呢?
在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个leader来负责处理数据写入的顺序性。只要leader还处于存活状态,那么follower只需按照leader中的写入顺序来进行同步即可。通常情况下,只要leader不宕机我们就不需要关心follower的同步问题。不过当leader宕机时,我们就要从follower中选举出一个新的leader。follower的同步状态可能落后leader很多,甚至还可能处于宕机状态,所以必须确保选择具有最新日志消息的follower作为新的leader。日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的leader中能够包含这条消息。这里就有一个需要权衡(tradeoff)的地方,如果leader在消息被提交前需要等待更多的follower确认,那么在它宕机之后就可以有更多的follower替代它,不过这也会造成性能的下降。
对于这种tradeoff,一种常见的做法是“少数服从多数”,它可以用来负责提交决策和选举决策。虽然Kafka不采用这种方式,但可以拿来探讨和理解tradeoff的艺术。在这种方式下,如果我们有2f+1个副本,那么在提交之前必须保证有f+1个副本同步完消息。同时为了保证能正确选举出新的leader,至少要保证有f+1个副本节点完成日志同步并从同步完成的副本中选举出新的leader节点。并且在不超过f个副本节点失败的情况下,新的leader需要保证不会丢失已经提交过的全部消息。这样在任意组合的 f+1 个副本中,理论上可以确保至少有一个副本能够包含已提交的全部消息,这个副本的日志拥有最全的消息,因此会有资格被选举为新的 leader来对外提供服务。
“少数服从多数”的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为3,那么延迟就取决于最快的那个follower而不是最慢的那个(除了leader,只需要另一个follower确认即可)。不过它也有一些劣势,为了保证leader选举的正常进行,它所能容忍的失败follower数比较少,如果要容忍1个follower失败,那么至少要有3个副本,如果要容忍2个follower失败,必须要有5个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这也就是“少数服从多数”的这种Quorum模型常被用作共享集群配置(比如ZooKeeper),而很少用于主流的数据存储中的原因。
在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的 leader。写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为leader,选举过程简单(详细内容可以参考6.4.3节)、开销低,这也是Kafka选用此模型的重要因素。Kafka中包含大量的分区,leader副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响Kafka的性能指标。
在采用ISR模型和(f+1)个副本数的配置下,一个Kafka分区能够容忍最大f个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍f个节点失败,“少数服从多数”的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数”需要3个副本和1个follower的确认信息,采用ISR的方式需要2个副本和1个follower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低,“少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对Kafka而言,这种能力可以交由客户端自己去选择。
另外,一般的同步策略依赖于稳定的存储系统来做数据恢复,也就是说,在数据恢复时日志文件不可丢失且不能有数据上的冲突。不过它们忽视了两个问题:首先,磁盘故障是会经常发生的,在持久化数据的过程中并不能完全保证数据的完整性;其次,即使不存在硬件级别的故障,我们也不希望在每次写入数据时执行同步刷盘(fsync)的动作来保证数据的完整性,这样会极大地影响性能。而 Kafka 不需要宕机节点必须从本地数据日志中进行恢复,Kafka 的同步方式允许宕机副本重新加入ISR集合,但在进入ISR之前必须保证自己能够重新同步完leader中的所有数据。
参考文章
https://hiddenpps.blog.csdn.net/article/details/54633905
https://blog.csdn.net/L13763338360/article/details/105529388
https://www.cnblogs.com/FG123/p/10091599.html
https://blog.csdn.net/muyimo/article/details/91439222
https://blog.csdn.net/weixin_39098944/article/details/108923726
https://developers.google.com/protocol-buffers/docs/encoding
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。