赞
踩
1,Kafka 中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的
2,topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。
3,Producer 生产的数据会被不断追加到该 log文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。
该文件夹的命名规则为:topic 名称+分区序号
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
文件.index 前面的对应在此次分区中在segment-0的顺序,后面则是所对应的物理地址,message对应是 log文件
在同一分区中的不同segment-1 所存储的物理地址 索引是地址是前一个segment文件 物理地址从0重新计数 但是.index索引与上一个message文件的数量开始计数,log文件也继续计数
这两个文件的命令规则为:Partition全局的第一个Segment从0开始,后续每个Segment文件名为上一个Segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
从partition中通过offset查找message
(1)以上图为例,读取offset=170418的消息,首先查找segment文件为00000000000000000000.index 为最开始的文件,第二个文件00000000000000 `170410.index(起始偏移为 170410 1=170411),而第三个文件为 00000000000000239430.index(起始偏移为 239430 1=239431),所以这个 offset=170418 就落到了第二个文件之中。
(2)其它后续文件可以依次类推,以其偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。
(3)其次根据 00000000000000170410.index 文件中的 [8,1325] 定位到 00000000000000170410.log 文件中的 1325 的位置进行读取。要是读取 offset=170418 的消息,从 00000000000000170410.log 文件中的 1325的位置进行读取,
(4)确定何时读完本条消息,是由消息的物理结构解决,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition
(3)patition和key都未指定,kafka 采用 Sticky Partition(黏性分区器)使用轮询选出一个patition。并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,kafka 再随机一个分区进行使用.
(1)同一个partition可能会有多个replication(对应 server.properties 配置中的default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。
(2)引入replication之后,同一个partition可能会有多个replication,而这时需要在这replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
第一种Kafka 的每个分区都有大量的数据,会造成大量数据的冗余。
第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小:
(1)leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步。
(2)Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
(1)0:这一操作提供了一个最低的延迟,partition 的 leader 接收到消息还没有写入磁盘就已经返回 ack,当 leader 故障时有可能丢失数据;
(2)1: partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;
(3)-1(all):partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。
(1)follower 故障follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2)leader 故障 leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。
At Least Once + 幂等性 = Exactly Once
要启用幂等性只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可,roker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker端只会持久化一条。PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
1)producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)
基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。
比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理)。如果要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。
Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,
消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
优缺点:
puth:它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息pull模式不足之处是,如果kafka 没有数据,消费者可能会陷入循环中,一直返回空数据,但是可以添加一个时常参数timeout
一个 consumer group 中有多个 consumer(消费者),一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费
Kafka 有三种分配策略,RoundRobin(轮询),Range(随机) , Sticky(粘性)。
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。
在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
(1)思想: _consumer_offsets 为 kafka 中的 topic, 那就可以通过消费者进行消费.Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**。
对_consumer_offsets_offsets进行维护,实验会对维护后的数据再次进行维护,所以会产生套娃现象,一直运行.
(2)修改配置文件 consumer.properties
# 不排除内部的 topic
exclude.internal.topics=false
(3)创建topic
[itwise@node2 kafka_2.11-2.4.1]$ kafka-topics.sh --create --topic itwise --bootstrap-server node2:9092 --partitions 2 --replication-factor 2
(4)查看topic列表
[itwise@node2 kafka_2.11-2.4.1]$ kafka-topics.sh --list --bootstrap-server node2:9092
__consumer_offsets
first
itwise
myTest
myTest2
second
(5).启动个生产者:
kafka-console-producer.sh --topic itwise --broker-list node2:9092
(6).启动一组两个消费者:
[itwise@node2 ~]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --bootstrap-server node2:9092 --topic itwise
[itwise@node3 logs]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --bootstrap-server node2:9092 --topic itwise
(7)启动一个消费者,来消费消费者 offset中的数据 (套娃循环)
--formatter 重写输出样式 --consumer.config 加载配置文件
[itwise@node2 ~]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/server.properties --bootstrap-server node2:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
(8)观察生产者 生产数据, 然后被 一组消费者,进行消费的情况,接下来还要看 另外的一个消费者消费kafka的offset的维护情况:[0,1,2]
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1710244797088, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1710244797103, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244802091, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244802105, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244807091, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244807106, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244812095, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244812108, expireTimestamp=None)
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间.
Kafka 数据持久化是直接持久化到 Pagecache 中,这样会产生以下几个好处:
(1)I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
(2)I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
(3)充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担
(4)读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据
(5)如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用尽管持久化Pagecache上可能会造成宕机丢失数据的情况,但这可以被 Kafka的Replication机制解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。
数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。
kafka数据写入磁盘前,数据先写到进程的内存空间
除了减少数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能
1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的
常见的有以下几种情况会触发Partition的Leader Replica选举:
事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么`全部成功,要么全部失败。
(1)为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的 PID和 Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
( 2 ) 对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
( 2 ) 对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。