当前位置:   article > 正文

kafka_3架构深入详情与api使用_kafka3

kafka3

Kafka 架构深入

1,Kafka 工作流程及文件存储机制

注意:producer不在zk中注册,消费者在zk中注册。

1.1工作流程

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1,Kafka 中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的
2,topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。
3,Producer 生产的数据会被不断追加到该 log文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
  • 1
  • 2
  • 3

1.2Broker文件存储机制

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片索引机制将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。

该文件夹的命名规则为:topic 名称+分区序号

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1.3 index文件和log文件详解

“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

文件.index 前面的对应在此次分区中在segment-0的顺序,后面则是所对应的物理地址,message对应是 log文件

在同一分区中的不同segment-1 所存储的物理地址 索引是地址是前一个segment文件 物理地址从0重新计数 但是.index索引与上一个message文件的数量开始计数,log文件也继续计数

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

这两个文件的命令规则为:Partition全局的第一个Segment从0开始,后续每个Segment文件名为上一个Segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

从partition中通过offset查找message

	(1)以上图为例,读取offset=170418的消息,首先查找segment文件为00000000000000000000.index 为最开始的文件,第二个文件00000000000000	`170410.index(起始偏移为 170410 1=170411),而第三个文件为 00000000000000239430.index(起始偏移为 239430 1=239431),所以这个 offset=170418 就落到了第二个文件之中。
	(2)其它后续文件可以依次类推,以其偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。
	(3)其次根据 00000000000000170410.index 文件中的 [8,1325] 定位到 00000000000000170410.log 文件中的 1325 的位置进行读取。要是读取 offset=170418 的消息,从 00000000000000170410.log 文件中的 1325的位置进行读取,
	(4)确定何时读完本条消息,是由消息的物理结构解决,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
  • 1
  • 2
  • 3
  • 4

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

2,Kafka 生产者

2.1分区策略

(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。

2.2分区的原则

(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition
(3)patition和key都未指定,kafka 采用 Sticky Partition(黏性分区器)使用轮询选出一个patition。并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,kafka 再随机一个分区进行使用.

2.3 副本(Replication)

(1)同一个partition可能会有多个replication(对应 server.properties 配置中的default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。
  • 1

​ (2)引入replication之后,同一个partition可能会有多个replication,而这时需要在这replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。

2.4数据可靠性保证

2.4.1生产者发送数据到 topic partition 的可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2)副本数据同步策略优缺点

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

第一种Kafka 的每个分区都有大量的数据,会造成大量数据的冗余。

第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小:

2.4.2 ISR
(1)leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步。
(2)Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
  • 1
  • 2
2.4.3 应答级别

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

​ (1)0:这一操作提供了一个最低的延迟,partition 的 leader 接收到消息还没有写入磁盘就已经返回 ack,当 leader 故障时有可能丢失数据

​ (2)1: partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

​ (3)-1(all):partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.4.4leader 和 follower 故障处理细节

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

LEO:指的是每个副本最大的 offset;

HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

(1)follower 故障follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障 leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

2.4.5Exactly Once 语义——幂等性

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。

At Least Once + 幂等性 = Exactly Once

要启用幂等性只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可,roker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker端只会持久化一条。PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

2.4.6 Product 写入消息流程

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1)producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK

3,消费者

3,1 消费模型

​ 消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)

	基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。
	比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理)。如果要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。
  • 1
  • 2
	Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,
 	消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
  • 1
  • 2

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

优缺点:

puth:它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息pull模式不足之处是,如果kafka 没有数据,消费者可能会陷入循环中,一直返回空数据,但是可以添加一个时常参数timeout

  • 1
  • 2
  • 3

3.2分区策略

一个 consumer group 中有多个 consumer(消费者),一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费

Kafka 有三种分配策略,RoundRobin(轮询),Range(随机) , Sticky(粘性)。

3.3消费者组

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

​ 消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。

​ 在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。

3.3offset的维护

​ 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

​ (1)思想: _consumer_offsets 为 kafka 中的 topic, 那就可以通过消费者进行消费.Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**。

对_consumer_offsets_offsets进行维护,实验会对维护后的数据再次进行维护,所以会产生套娃现象,一直运行.

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

​ (2)修改配置文件 consumer.properties

# 不排除内部的 topic
exclude.internal.topics=false
  • 1
  • 2

​ (3)创建topic

[itwise@node2 kafka_2.11-2.4.1]$ kafka-topics.sh --create --topic itwise --bootstrap-server node2:9092 --partitions 2 --replication-factor 2
  • 1

​ (4)查看topic列表

[itwise@node2 kafka_2.11-2.4.1]$ kafka-topics.sh --list  --bootstrap-server node2:9092
__consumer_offsets
first
itwise
myTest
myTest2
second
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

​ (5).启动个生产者:

kafka-console-producer.sh --topic itwise --broker-list node2:9092
  • 1

​ (6).启动一组两个消费者:

[itwise@node2 ~]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --bootstrap-server node2:9092 --topic itwise

[itwise@node3 logs]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/consumer.properties --bootstrap-server node2:9092 --topic itwise
  • 1
  • 2
  • 3

​ (7)启动一个消费者,来消费消费者 offset中的数据 (套娃循环)

​ --formatter 重写输出样式 --consumer.config 加载配置文件

[itwise@node2 ~]$ kafka-console-consumer.sh --consumer.config /opt/module/kafka_2.11-2.4.1/config/server.properties --bootstrap-server node2:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
  • 1

​ (8)观察生产者 生产数据, 然后被 一组消费者,进行消费的情况,接下来还要看 另外的一个消费者消费kafka的offset的维护情况:[0,1,2]

[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1710244797088, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1710244797103, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244802091, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244802105, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244807091, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244807106, expireTimestamp=None)
[test-consumer-group,itwise,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244812095, expireTimestamp=None)
[test-consumer-group,itwise,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710244812108, expireTimestamp=None)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4Kafka 高效读写数据

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间.

4.1应用

Kafka 数据持久化是直接持久化到 Pagecache 中,这样会产生以下几个好处:

	(1)I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
​	(2)I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
​	(3)充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担
​	(4)读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据

​	(5)如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用尽管持久化Pagecache上可能会造成宕机丢失数据的情况,但这可以被 Kafka的Replication机制解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4.2零复制技术:

数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。
kafka数据写入磁盘前,数据先写到进程的内存空间

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

除了减少数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能

1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
  • 1
  • 2
  • 3
  • 4

4.3Zookeeper 在 Kafka 中的作用

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Leader Replica选举触发时机

常见的有以下几种情况会触发Partition的Leader Replica选举:

  1. **Leader Replica 失效:**当 Leader Replica 出现故障或者失去连接时,Kafka 会触发 Leader Replica 选举。
  2. **Broker 宕机:**当 Leader Replica 所在的 Broker 节点发生故障或者宕机时,Kafka 也会触发 Leader Replica 选举。
  3. **新增 Broker:**当集群中新增 Broker 节点时,Kafka 还会触发 Leader Replica 选举,以重新分配 Partition 的 Leader。
  4. **新建分区:**当一个新的分区被创建时,需要选举一个 Leader Replica。
  5. **ISR 列表数量减少:**当 Partition 的 ISR 列表数量减少时,可能会触发 Leader Replica 选举。当 ISR 列表中副本数量小于 **Replication Factor(副本因子)**时,为了保证数据的安全性,就会触发 Leader Replica 选举。
  6. **手动触发:**通过 Kafka 管理工具(kafka-preferred-replica-election.sh),可以手动触发选举,以平衡负载或实现集群维护。

4.4Kafka 事务

事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么`全部成功,要么全部失败。

​ (1)为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer获得的 PID和 Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

​ ( 2 ) 对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

​ ( 2 ) 对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号