当前位置:   article > 正文

Kafka学习笔记完整_kafka学习笔记 最全

kafka学习笔记 最全

文章目录

kafka的基本认识

Kafka最初由Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper 协调的分布式日志系统,也可以作为MQ消息系统。Linkedin 公司在2010 年贡献给了Apache基金会并成为了顶级开源项目。

一、什么是Kafka

是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量

1、设计目标
  • 以时间复杂度为O(1) 的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K 条消息的传输。
  • 支持Kafka Server 间的消息分区,及分布式消费,同时保证每个partition 内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

二、Kafka的应用场景

场景可以用于 web/nginx 日志、访问日志,消息服务等等。

1、行为跟踪

kafka 可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的Topic 中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控

2、日志收集系统

日志收集方面,有很多比较优秀的产品,比如 Apache Flume, 很多公司使用kafka 代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的 log 都会输出到本地的磁盘上,排查问题的话通过Linux命令来搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的的事了。所以一般都会做一个日志统一收集平台管理 log 日志用来快速定位到问题。所以很多公司的套路都是把应用日志集中到kafka上,然后分布导入到es 和 hdfs 上。用来做实时检索分析和离线统计数据备份等。而另一个方面,kafka 本身又提供了很好的api来集成日志并且做日志收集

推荐一个基于ElasticSearch+Logstash+Kibana搭建的日志管理中心:https://blog.csdn.net/qq_39938758/article/details/103974396

推荐一个ElasticSearch+Logstash+Filebeat+Kafka+Zookeeper+Kibana 的日志收集分析:

https://blog.csdn.net/weixin_41047933/article/details/82699823

3、消息系统

搞Java的。。有啥说的。

三、Kafka的架构

四、Kafka名词解释

1、Broker

Kafka集群中包含的服务器,有一个或多个服务器,这种服务器被称为 Broker。

Broker 端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快。避免了在JVM 内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

2、Producer

负责发布消息到Kafka Broker

3、Consumer

负责从Broker 拉取(pull)数据并进行处理。

4、Topic

每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic

物理上不同Topic的消息分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个Broker上但是用户只需指定消费的Topic即课生产或消费数据而不必关心数据存于何处。

5、Partition

Partition 是物理上的概念,每个Topic 包含一个或多个Partition。kafka分配的单位是Partition

6、Consumer Group

每个Consumer 属于一个特定的Consumer Group

可为每个Consumer 指定Group name,若不指定group name 则属于默认的group

每条消息只可以被Consumer Goup 组中中的一个Consumer消费,但是可以指定多个Consumer Group

所以一个消息在Consumer Group 里面只可以被消费一次。已确定!

7、Topic & Partition

Topic 在螺髻山行可以被认为是一个 queue,每发送一条消息必须指定它的Topic,可以简单理解为必须指明把这条消息放入到哪个queue里。

为了使得kafka 的吞吐率可以线性提高,物理上把Topic 分成一个或多个Partition,每个Partition 在物理上对应一个文件夹,该文件夹下存储这个Partition 的所有消息和索引文件。

若创建 Topic1 和Topic2 两个Topic,且分别有13个和19个分区,则整个集群上相应会生成共32个文件夹

8、Replication-factor

表示该Topic 需要再不同高德broker 中保存几分

kafka 安装部署+基本操作

一、单机

1、下载kafka

因为kafka需要依赖于zk来做一些master选举和其它数据的维护,所以需要先安装zk并且启动

https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

2、启动
  • 启停zookeeper (采用的kafka内的zk)

    zookeeper-server-start.sh -daemon ../config/zookeeper.properties
    
    zookeeper-server-stop.sh -daemon ../config/zookeeper.properties
    
    • 1
    • 2
    • 3
  • 启停kafka

    sh kafka-server-start.sh -daemon config/server.properties
    
    sh kafka-server-stop.sh -daemon config/server.properties
    
    • 1
    • 2
    • 3

二、集群

准备三个虚拟机,分别把kafka的安装包放入三台机器上

1、修改配置

修改server.properties配置

  • 配置broker.id,同一个集群中的每个机器的id必须唯一,修改三台

    broker.id=0
    
    • 1
  • 继续修改zookeeper连接配置,修改三台

    zookeeper.connect=192.168.165.3:2181
    
    • 1
  • 继续修改Listeners配置,修改三台

    listeners=PLAINTEXT://192.168.165.4:9092
    
    • 1
2、启动
sh kafka-server-start.sh -daemon ../config/server.properties
  • 1

三、基本操作

1、创建topic
sh kafka-topics.sh --create --zookeeper 192.168.131.3:2181 --replication-factor 3 --partitions 3 --topic test
  • 1
2、查看topic
sh kafka-topics.sh --list --zookeeper 192.168.131.3:2181 --topic test
  • 1
3、查看topic属性
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
  • 1
4、消费消息
sh kafka-console-consumer.sh --bootstrap-server 192.168.165.3:9092 --topic test --from-beginning
  • 1
5、发送消息
sh kafka-console-producer.sh --broker-list 192.168.165.6:9092 --topic test
  • 1

kafka 参数配置

我就不整理了,有人整理的齐全:https://www.cnblogs.com/alan319/p/8651434.html

kafakaApi操作

1、基本生成消费操作

参考地址:https://gitee.com/kylin1991_admin/help-s/tree/master/kafka-h/src/main/java/org/example/test

2、自定义partition

参考地址:https://gitee.com/kylin1991_admin/help-s/blob/master/kafka-h/src/main/java/org/example/test/MyPartitioner.java

kafka整合SpringBoot

参考链接:https://gitee.com/kylin1991_admin/help-s/tree/master/kafka-h

kafka分布式消息原理

一、Topic 和 Partition

1、Topic

在Kafka中,Topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 Kafka 集群的消息都有一个类别。物理上来锁,不同的topic的消息是分开存储的。

每个Topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。

2、Partition

每个Topic 可以划分多个分区(每个Topic至少有一个分区),同一topic 不同分区包含不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset (偏移量),它是消息在此分区中的唯一编号,kafka 通过offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即kafka 只保证同一个分区内的消息是有序的。

二、生产者消息分发

1、kafka生产消息分发策略

消息是kafka 最基本的数据单元,在kafaka 中,一条消息由key、value 两部分组成。

在发送一条消息时,我们可以指定这个key,那么producer 会根据key 和partition 机制来判断当前这条消息应该发送并存储到哪个partition 中。我们可以根据需要进行扩展producer 和partition 机制。

自定义消息分区分发机制地址:https://gitee.com/kylin1991_admin/help-s/blob/master/kafka-h/src/main/java/org/example/test/MyPartitioner.java

2、消息默认的分发机制

默认情况下,kafka 采用的时hash 取模的分区算法(DefaultPartitioner)。

该默认的分区算法,如果key为null,则会随机分配一个分区,这个随机是在这个参数 “matadata.max.age.ms” 的时间范围内随机选择一个。对于这个时间段内,如果key为null,则只会发送到唯一的分区。这个值在默认情况下是10分钟更新一次。

3、关于Metadata类

简单理解就是Topc 、 Partition 和 Broker 的映射关系,每个Topic 的每一个 Partition,需要知道对应的broker 列表是什么、Leader是谁、Follower是谁。这些信息都是存储在Metadata这个类的。

4、消费者如何制定分区消费
// 消费制定分区的时候,不需要再订阅
// kafkaConsumer.subscribe(Collections.singletonList(topic));

// 消费制定分区
TopicPartition topicPartition = new TopicPartition(topic, 0);
kafaConsumer.assign(Arrays.asList(topicPartition));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

三、消费者的消费原理

在实际的生产过程中,每个topic 都会有多个partitions。

多个partitions的好处在于:

一方面能够对broker 上的数据进行分片有效减少了消息的容量从而提升io性能。

另一方面,为了提高消费端的消费能力,一般会通过多个consumer去消费桶一个topic,也就是消费端的负载均衡机制

kafka存在consumer group的概念,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的consumer 来消费,那么同一个consumer group 里面的consumer 是怎么分配该消费哪个分区的数据呢?如下图:

1、kafaka消息消费原理演示+测试
a、环境演示
  • 通过命令的方式创建3 分区 3 副本的topic

    sh kafka-topics.sh --create --zookeeper 192.168.131.3:2181 --replication-factor 3 --partitions 3 --topic replis-test
    
    • 1
  • 登录三个消费者客户端,配置group 一样

    sh kafka-console-consumer.sh --bootstrap-server 192.168.131.4:9092,192.168.131.5:9092,192.168.131.6:9092 --topic replis-test --from-beginning --group test_group1
    
    • 1
  • 通过生产者发批量发送消息,记得用自己的随机算法来操作(利用key为null随机,不要用默认的消息分区算法,默认的不方便测试)然后批量发送消息,可以看到消息的确被分发到了不同的消费端

    使用代码来操作,参考:https://gitee.com/kylin1991_admin/help-s/blob/master/kafka-h/src/main/java/org/example/test/ProducerAsyncDemo.java

b、测试
  • 3个partition 对应3consumer
    • 结果:每个consumer 会消费一个分区
  • 3个partition 对应2个consumer
    • 结果:consumer1会消费partition0/partition1分区,consumer2会消费partition2分区
  • 3个partition 对应4个或以上consumer
    • 结果:任然只有3个consumer对应3个partition,其他的consumer无法消费消息
c、结论
  • 如果consumer 比 partition 多
    • 是浪费,因为kafka的设计是在一个partition上是不允许并发的。所以consumer 数不要大于partition数
  • 如果consumer 比partition 少
    • 一个consumer会对应于多个partition,这里主要合理分配consumer数量的整数倍,否则会导致partition里面的数据被取的不均匀。最好partition数量是consumer数量的整数倍,所以partition 数量很重要,比如取18 就很容易设定consumer 数量。
  • 如果consumer从多个partition读取到数据,不保证数据间的顺序性
    • kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同。
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
2、消费者分区分配策略

以下情况出发分配rebalance分配操作

  • 同一个consumer group 内新增了消费者
  • 消费者离开当前所属的consumer group,比如主机停机或宕机等
  • topic 新增了分区(也就是分区熟练发生了变化)

kafka提供了3分配策略(可自定义)

PARTITION_ASSIGNMENT_STRATEGY_CONFIG

  • Range(默认,范围)
  • RoundRobin(轮询)
  • StickyAssignor(沾性)
a、RangeAssignor

Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字符顺序进行排序

假设  n = 分区数 / 消费数 ; m = 分区数 % 消费者数量
那么前 m 个消费者每个分配 n + 1 个分区
后面的(消费者数量 - m)个消费者每个分配n个分区

例如:11个分区,3个消费者 
A1-0 将消费 0,1,2,3 分区
A2-0 将消费 4,5,6,7 分区
A3-0 将消费 8,9,10 分区
结果:A1-0 和 A2-0 多消费一个分区,没什么

例如:2个主题(T1和T2),分别有10个分区
A1-0 将消费T1主题的 0,1,2,3 分区,以及T2主题的0,1,2,3分区
A2-0 将消费 4,5,6 分区,以及T2主题的4,5,6 分区
A3-0 将消费 7,8,9 分区,以及T2主题的7,8,9 分区
结果:A1-0 比其他分区多了2个分区,这就是RangeAssignor很明显的弊端。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
b、RoundRobinAssignor

轮询分区策略就是:

  • 把所有partition 和所有consumer线程都列出来,然后按照hashcode进行排序
  • 最后通过轮询算法分配partition给消费线程。

如果所有consumer实例的订阅是相同的,那么partition会均匀分布

使用轮询分区策略,必须满足两个条件

1、每个主题的消费者实例具有相同数量的流

2、每个消费者订阅的主题必须是相同的。

c、StrickyAssignor

kafka在0.11.x 版本支持了StrickyAssignor,翻译过来就是粘带策略,它有两个目的

1、分区的分配尽可能的均匀

2、分区的分配尽可能和上次分配保持相同

当两者冲突时,第一个目标优先于第二目标

假设消费组有3个消费者,C0、C1、C3,它们分别订阅了4个Topic(t0,t1,t2,t3),并且每个主题有两个分区(p0,p1),也就是说,整个消费组订阅了8个分区:t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1
那么最终的分配场景结果为:
C0: t0p0 t1p1 t3p0
C1: t0p1 t2p0 t3p1
C2: t1p0 t2p1
这样的分配策略类似轮询策略,但是其实不是,因为如果C1 这个消费者挂了,就必然会造成重新分区(reblance),如果是轮询,那么结果应该是
C0: t0p0 t1p1 t2p0 t3p0
C2: t0p1 t2p0 t2p1 t3p1
但是不是如此,因为他要满足粘带策略,所以他会满足 “分区的分配尽可能和上次分配保持相同”,所以分配结果是
C0: t0p0 t1p1 t3p0 + t2p0 
C2: t1p0 t2p1 + t0p1 t3p1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

结论:也就是说保留了上一次的分配结果,并且把原挂掉的C1分区分配给了C0和C2,这中策略的好处,就是分区发生变化的时候,由于分区的沾性,减少了不要的分区移动。

3、Coordinator

用来执行Rebalance 以及 管理 consumer的Group

四、Coordinator

kafka提供了一个角色:coordinator 来执行对于Consumer group的管理,当consumer group的第一个consumer 启动的时候,它会去和kafka server确定谁是它们的coordinator。之后该group 内的所有成员都会和该coordinator进行协调通信。

每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

1、coordinator怎么确定的?

消费者向kafka 集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator

2、joinGroup的过程

在rebalance 之前,需要保证coordinator 是已经确定好了的,整个rebalance 分为两个步骤

1、Join Group

2、Synchronizing Group State

a、Join Group

表示加入到consumer group 中,在这一步中,所有的成员都会向coordinator 发送 joinGroup的请求。一旦所有成员都发送了joinGroup请求,那么coordinator 会选择一个consumer担任leader 角色,并把组成员信息和订阅信息发送消费者

consumer的leader选举

选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,如果这个时候leader 消费者退出消费组,那么会重新选举一个leader,这个选举很随意,类似随机算法

每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个 “赞同” 的规则是,消费组内的各个消费者会通过投票来决定

  • 在joingroup阶段,每个consumer 都会把自己支持的分区分配策略发送到coordinator
  • coordinator收集到所有消费者的分配策略,组成一个候选集
  • 每个消费者需要从候选集中找出一个自己支持的策略,并且为这个策略投票
  • 最终计算候选集的各个策略的选票数,票数最多的就是当前消费组的分配策略
b、Synchronizing Group State

完成分区分配后,就进入了Synchronizing Group State 阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer Group中的所有consumer

每个消费者都会向coordinator发送syncGroup请求,不过只有leader节点会发送分配方案,其他消费者只是打打酱油而已。当leader 把方案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中。这样所有成员都会知道自己应该消费哪个分区

  • consumer group 的分区分配方案是在客户端执行的,kafka将这个权利下方给客户端主要是因为这样做可以有更好的灵活性
c、总结consumer group rebalance过程
  • 对于每个consumer group 子集,都会在服务端对应一个GroupCoordinator进行管理,GroupCoordinator会在Zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修改Zookeeper上保存的数据,从而触发 GroupCoordinator开始Rebalance操作
  • 当消费者准备加入某个Consumer Group 或者 GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator 在网络中的位置,这个时候就需要确定GroupCoordinator,消费者会向集群中的任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response作为响应,其中包含管理当前ConsumerGroup的GroupCoordinator。
  • 消费者会根据broker的返回信息,连接到groupCoordinator,并且发送HeartbeatRequest,发送心跳的目的是要告诉GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送心跳请求,则Group Coordinator会触发Rebalance操作
  • 发起join group请求的两种情况
    • 如果GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator因为前面说的几种情况导致了Rebalance操作,那这个时候,Consumer会发送join Group 请求
    • 新加入到consumer group的consumer 确定好了GroupCoordinator以后,消费者会向GroupCoordinator发起join group 请求,GroupCoordinator会收集全部消费者信息之后,来确定可用的消费者,并从中选取一个消费者成为consumer group leader。并把相应的信息(分区分配策略、leader_id、……)封装成response 返回给所有消费者,但是只有group leader 会受到当前consumer group 中的所有消费者信息。当消费者确定自己是group leader 以后,会根据消费者的信息以及选定分区分配策略进行分区分配
    • 接着进入Synchronizing Group State 阶段,每个消费者会发送SyncGroupState 请求到Group Coordinator,但是只有Group Leader 的请求会存在分区分配结果,GroupCoordinator会根据Group Leader 的分区分配结果形成SyncGroupResponse返回给所有的Consumer。
    • consumer 根据分配的结果,执行响应的操作。
3、offset

offset就是用来确定消费端的消费位置,每个topic可以划分多个分区(每个topic 至少有一个分区),同意topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(偏移量),它是消息在此分区中的唯一编号

kafka通过offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即kafka 只保证在同一个分区内的消息是有序的。

对于应用层的消费者来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。那么offset保存在哪里?

a、kafka怎么维护消费端的offset

在kafka中,提供了一个consumer_offset_*的一个topic,把offset信息写入到这个topic中。

consumer_offset——里保存了每个consumer group 某一时刻提交的offset信息。

__consumer_offsets 默认有50个分区。

b、consumer的offset分区算法

计算公式:Math.abs(“gourpid”.hashCode()) % groupMetadataTopicPartitionCount

  • 由于默认情况下groupMetadataTopicPartitionCount 有50个分区,如果计算的结果是35,那么意味着当前的consumer_group的位移信息保存在__consumer_offsets的第35个分区
c、consumer查看offset的命令

执行如下命令可以查看当前consumer_group中的offset位移提交信息

kafka-console-consumer.sh --topic __consumer_offsets --partiioin 35 --bootstrap-server 192.168.131.4:9092,192.168.131.5:9092,192.168.131.6:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
  • 1

五、Partition 的副本机制

每个topic 都可以分为多个partition,并且多个partition 会均匀分布在集群的各个节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说 ,都是单点的,当其中一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader副本来进行处理。剩余的其他副本都作为Follower副本,Follower副本会从leader副本同步消息日志。这个有点类似Zookeeper中leader和Follower的概念,但是具体实现方式还是有很大的差异。所以副本集会存在一主多从的关系。

一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制提高了kafka集群的可用性。

注意:kafka集群中的一个broker中最多只能有一个副本,leader副本所在的broker节点的分区叫leader节点,foolower副本所在的broker节点的分区叫follower节点

1、创建一个带副本机制的topic
  • 通过如下命令

    sh kafka-topics.sh --create --zookeeper 192.168.131.3:2181 --replication-factor 3 --partitions 3 --topic testTopic
    
    • 1

2、查看各个分区中对应的leader是谁
  • Zookeeper查看

    结果中高德leader是那个broker_id

    get /brokers/topics/testTopic/partitions/1/state
    
    • 1
  • 命令查看

    sh kafka-topics.sh --zookeeper 192.168.131.3:2181 --describe --topic testTopic
    
    • 1
3、副本的相关名词介绍

kafka 提供了数据复制算法保证

如果leader 副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,这时候kafka会从follower副本中选择一个新的leader 副本。所以就会涉及到副本的leader选举。

  • leader副本:响应clients端读写请求的副本

  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求

  • ISR副本列表:包含了leader副本和所有leader副本保持同步的follower副本

    • 如果判断是否于leader同步,和LEO和HW有关,所有的副本都有这个属性
  • LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10.那么表示该副本保存了10条消息,位移值范围是[0,9]。另外Leader LEO 和follower LEO的更新是有区别的。

  • HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是 “已备份”的(replicated)。Leader副本和Follower副本更新HW的方式有区别的。

    从生产着发出的一条消息首先会被写入分区的leader副本,不过还需要等待 ISR 集合中的所有follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息

4、副本的协同

写请求首先由leader副本处理,之后Follower副本会从leader 上拉取写入的消息。这个过程会有一定延迟,导致Follower副本中保存的消息少于leader副本,但是只要没有超出阔值都可以容忍。

但是如果一个Follower 副本出现异常,比如宕机、网络断开等原因长时间没有同步消息,那么这个时候,leader就会把它踢出去。

kafka通过 ISR 集合来维护一个分区副本信息

一个新leader被选举并被接受客户端的消息成功写入。kafka确保从同步副本列表中选举一个副本为leader;

leader 负责维护和跟踪ISR(in - sync - replicas,副本同步队列)中所有follower 滞后的状态。当producer 发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

5、ISR

表示:可用且消息量与leader相差不多的

副本集合,这是整个副本集合的子集。

a、加入isr集合的条件
  • 副本所在节点必须维持有Zookeeper的连接
  • 副本最后一条消息的offset 与 leader 副本的最后一条消息的offset 之前的差值不能超过指定的阈值replicas.lag.time.max.ms,如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该foolower 就会被剔除 ISR 列表

通过zk查看分区的leader的方式可以看到 ISR列表

b、lastCaughtUpTimeMs标志

follower副本把leader副本LEO之前的日志全部同步完成后,则认为follower副本已经追赶上了leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标志,

c、kafka副本管理器

kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replicas.lag.time.max.ms的值,如果大于,则会把这个副本剔除ISR集合

d、所有replica不工作的情况

当所有的replica都宕机了。那么会选择第一个 “活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源。

6、副本数据同步原理

副本除了协同还有一个重要的机制,就是数据的同步,同步需要解决如下问题:

  • 怎么传播消息
  • 在消息发送端返回ack之前要保证多少个replica已经接受到这个消息
a、数据处理过程

Producer在发布消息到某个partition时:

  • 先通过zk找到该partition 的Leader get/brokers/topics/<topic>/partition/2/state,然后无论该topic的replication Factor为多少 ,Producer只将该消息发送到该Partition的Leader。
  • Leader会将消息会将该消息写入其本地Log。每个Follower都从Leader pull 数据。这种方式上,Follower存储的数据顺序与Leader保持一致。
  • Follower在收到该消息并写入其Log后,想Leader发送ACK。
  • 一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader会增加HW(Hight Water Mart) 并且向producer发送ACK。

b、初始状态
  • 初始状态下,Leader 和 Follower 的HW 和LEO都是0,leader副本会保存remote LEO,也会被初始化为0,这个时候,producer没有发送消息,Follower会不断地给leader发送FETCH请求,但是因为没有数据,这个请求会被leader寄存,当在指定的时间之后会强制完成请求,这个时间配置是(replica.fetch.wait.max.ms)。如果在指定时间内producer有消息发送过来,那么kafka会唤醒fetch请求,让leader继续处理

数据同步处理会分两种情况, 这两种情况的从处理方式是不同的

  • 第一种是leader处理完producer请求之后,follower 发送一个 fetch 请求过来
  • 第二种是follower 阻塞在leader 指定时间之内,leader 副本收到producer请求
b、第一种情况

生产者发送一条消息

leader处理完producer请求之后,follower发送一个fetch请求过来,状态图如上:

leader副本收到请求以后,会做几件事

  • 1、把消息追加到log文件,同时更新leader副本的leo
  • 2、尝试更新leader HW值,这个时候由于follower副本还没有发送fetch请求,那么leader 的remote LEO 仍然是0,leader 会比较自己的LEO以及remote LEO的值发现最小值是0,与HW值相同,所以不会更新HW

follower fetch

  • follower 副本发送fetch请求,leader副本的处理逻辑是
    • 1、读取log数据,更新remote LEO=0(follower还没有写入这条消息,这个值是根据follower的fetch请求中的offset来确定的)
    • 2、尝试更新更新HW,因为整个时候LEO和remote LEO 还是不一致,所以仍然是HW=0
    • 3、把消息内容和当前分区的HW值发送给follower副本
  • follower副本收到response以后
    • 1、将消息写入到本地log,同时更新follower的LEO
    • 2、更新follower HW,本地的LEO和leader 返回的HW进行比较取小值,所以仍然是0

第一次交互结束后,HW仍然是0,这个值会在下一次follower发起fetch请求时被更新

  • follower发起第二次fetch请求,leader收到请求以后
    • 1、读取log数据
    • 2、更新remote LEO=1,因为这次fetch携带的offset是1
    • 3、更新当前分区的HW,这个是偶leader LEO 和 remote LEO 都是1,所以HW的值也更新为1
    • 4、把数据和当前分区的HW值返回给follower副本,这个时候如果没有数据,则返回为空
  • follower副本收到response以后
    • 1、如果有数据则写本地日志,并且更新LEO
    • 2、更新follower的HW值

ps:由上总结

  • follower发起fetch,leader收到了做如下的事
    • 读取log数据,更新remote LEO(根据fetch请求的offset来确定)
    • 尝试更新HW,(也是根据fetch请求的offset来确定remoteLEO后,用remote LEO 和自己的LEO对比取最小)
    • 将消息内容(没有就为空)和当前分区的HW值发送给follower副本
  • follower收到响应后做如下
    • 将消息(如果不为空)写入到本地log,同时更新follower的LEO
    • 更新follower的HW值,(本地的LEO和返回的HW取最小值)
c、第二种情况

这种情况就是leader副本暂时还没有消息过, 所以follower的fetch会被阻塞,直到等待超时或者leader接受到新的数据。当leader收到请求以后会唤醒处于阻塞的fetch请求,处理过程基本和上面的说法一致。

  • leader将消息写入本地日志,更新Leader的LEO
  • 唤醒follower的fetch请求
  • 更新HW
d、数据丢失原理

Kafa使用HW和LEO的方式来实现副本数据的同步,本身设计挺好,但是这在这个地方会存在一个数据丢失的问题,当然这个丢失只出现在特定的背景下,

HW的值在新的一轮fetch中才会被更新,分析这个过程为什么会出现数据丢失

min.insync.replicas=1

  • 设定ISR中的最小副本数是多少,默认为1(在server.properties中配置),并且acks参数设置为 -1 (表示需要所有副本确认,Producer可配置)时,此参数才生效

  • 表达的含义是,至少需要多少个副本同步才能表示消息是提交的,所以当min.insync.replicas=1的时候,一旦消息被写入leader端log即被认为是 “已提交” ,而延迟一轮FETCH RPC更新HW值得设计使得follower HW 值是异步延迟更新的,倘若在这个过程中leader发生变更,那么成为新leader的follower的HW值就有可能是过期的,使得clients 端认为是成功提交的消息被删除。

producer的ACKS

acks 配置表示producer 发送消息到broker上以后的确认值。有三个可选项

  • 0:表示producer 不需要等待broker的消息确认。这个选项时延最小但是同时风险最大(因为当server宕机时,数据将会丢失)。
  • 1:表示producer 只需要获得kafka集群中的leader 节点确认即可,这个选择时延较小同时确保了leader节点确认接收成功。
  • all(-1):需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR 可能会缩小到仅包含一个Replica,所以设置参数为all并不能一定避免数据丢失。
e、数据丢失解决方案

在kafka0.11.0.0版本之后,引入了一个leader epoch 来解决这个问题,所谓的leader epoch实际是一对值(epoch,offset),epoch代表leader的版本号,从0开始递增,当leader发生过变更,epoch就 +1 ,而offset则是对应这个epoch版本的leader写入第一条消息的offset,

比如:(0,0),(1,50),表示第一个leader从offset=0开始写消息,一共写了50条,第二个leader版本号是1,从offset=50开始写,这个信息会持久化到对应的分区的本地磁盘上,文件名是/tmp/kafka-log/topic/leader-epoch-checkpoint

  • leader broker 中会保存这样一个缓存,并且定期写入到checkpoint文件中
  • 当leader写log时它会尝试更新整个缓存;
    • 如果这个leader首次写入消息则会在缓存中增加一个条目;否则就不做更新。
    • 而每次副本重新成为leader 时会查询这部分缓存,获取出对应leader版本的offset

这样的方案解决原理是:

  • Follower宕机并且恢复之后,有两种情况,
    • 如果这个时候leader 副本没有挂,也就是意味着没有发生leader选举,那么Follower恢复之后并不会去截断自己的日志,而是先发送一个OffsetForLeaderEpochRequest请求给到leader副本,leader副本收到请求之后返回当前的LEO。
      • 如果Follower副本的leaderEpoch和Leader副本的epoch相同,leader的LEO只可能大于或者等于Follower副本的LEO值,所以这个时候不会发生截断
      • 如果Follower副本和leader副本的epoch值不同,那么leader副本会查找Follower副本传过来的epoch + 1在本地文件中存储的StartOffset返回给Follower副本,这样就避免了数据丢失的问题。(和上一条原理类似)
    • 如果leader副本宕机了,重新选举新的leader,那么原本的Follower副本就会变成leader,意味着epoch从0变成了1,使得原本Follower副本中LEO的值得到了保留
f、Leader副本的选举过程

kafkaController 会监听Zookeeper的/brokers/ids节点路径,一旦发现有broker挂了,执行下面的逻辑。

这里不考虑kafkaController所在broker挂了的情况,kafkaController挂了,各个broker会重新选举出新的kafkaController

重新选举leader策略如下:

  • 优先从ISR列表中选出第一个作为leader副本,这个叫优先副本,理想情况下有限副本就是该分区的leader副本
  • 如果ISR列表为空,则查看该Topic的unclean.leader.election.enable配置。
    • unclean.leader.election.enable:
      • 为true则表示允许非ISR列表的副本作为leader,那么此时意味着数据可能丢失
      • 为false则表示不允许,直接抛出NoReplicaOnlineException异常,造成Leader副本选举失败。
  • 如果上述配置为true,则从其他副本中选出一个作为leader副本,并且ISR列表只包含该Leader副本。一旦选举成功,则将选举后的leader和ISR和其他副本信息写入到该分区的对应的zk路径上。

六、消息的存储

消息发送端发送消息到broker中,消息会进行持久化。

kafka是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个offset值来表示它在分区中的偏移量。kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,log并不是直接对应在一个磁盘的日志文件,而是对应磁盘的一个目录,这个目录的命令规则是 <Topic_name>_<partition_id>

1、消息的文件存储机制

一个topic的一个或多个partition在物理磁盘上的保存在路径:/temp/kafka-logs/topic_partition下,包含日志文件、索引文件和时间索引文件

kafka是通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。

2、LogSegment逻辑概念

kafka是以partition为最小存储单位,那么当kafka producer 不断发送消息,必然会引起partition文件的无线扩张,这样对于消息文件的的维护以及被消费的消息的清理都会很困难。

所以kafka以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。

  • log.segment.bytes=1073741824(设置分段大小),默认是1GB,可以调小点来看分段的效果

a、segment文件命令规则

通过下面这条命令可以查看kafka消息日志的内容

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test4-0/00000000000000000000.index --print-data-log

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test4-0/00000000000000000000.log --print-data-log

# 加入第一个log文件的最后一个offset为:25722,所以下一个segment的文件命令为:00000000000000025723.log。对应的index为00000000000000025723.inexe
  • 1
  • 2
  • 3
  • 4
  • 5
b、index和log的对应关系

为了提高查找效率。kafka对每个日志文件添加了2个索引文件:OffsetIndex和TimeIndex,分别对应.index 以及 .timeindex。timeindex索引文件格式:它是映射时间戳和相对的offset

log.index.interval.bytes来控制,默认是是每过4096字节记录一条index,太小意味着读取效率更高但是index文件会变大。基于这个特性,可以根据时间找到粗粒度的offset。(0.10.0.1版本之后增加记录了时间戳,粒度更细)

c、partition通过offset查找message

查找的算法是

1、根据offset的值,查找segment 段中的index 索引文件。由于索引文件命名是以上一个文件的最后一个offset进行命令的。所以使用二分查找算法能够根据offset快速定位到指定的索引文件。

2、找到索引文件后,根据offset进行定位,找到索引文件中符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)

3、得到position以后,再到对应的log文件中,从position处开始查找offset对应的消息,将每条消息的offset与目标offset进行比较,直到找到消息

例如:

我们要查找offset=2490这条消息,那么先找到00000000000000000000.index,然后找到[2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是否大于等于2490,。最后查找到对应的消息以后返回。

3、Log文件
a、Log文件消息分析

通过sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test4-0/00000000000000000000.log --print-data-log命令可以查看二进制的日志文件消息,一条消息会包含很多的字段

offset: 25722 position: 899905 CreateTime: 1588311755754 isvalid: true keysize: -1 valuesize: 31 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: producer send message : + 77169
  • 1

offset:消息的offset

position:消息的position位置

CreateTime:创建时间戳

isvalid:这个值是 message 被读取出来之后,进行crc校验之后的结果。

keysize:key的大小

valuesize:value的大小

magic:消息的版本号

compresscodes:表示压缩编码

producerId:用来支持幂等性 -待确定

producerEpoch:和produceId一样,用来支持幂等性 -待确定

sequence:序列号字段,表示某个producer发送高德-待确定

isTransactional:

headerKeys:

payload:表示消息的内容

b、日志清除策略

日志的分段,也是为了方便进行kafka的日志清理,目前清理策略有2种

  • 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程。
  • 根据topic存储的数据大小,当topic所占的日志文件大小小于一定的阔值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息

通过如下参数来触发清理

  • log.retention.bytes:剩余空间数据大小,默认是1G
  • log.retention.check.interval.ms :后台线程检查周期默认是5分钟
  • log.retention.hours:保留时间,默认是7天
c、日志压缩策略

kafka提供了 “日志压缩(Log Compaction)” 功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之前的对应关系是不断变化的。就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新value。因为我们可以开启kafka的日志压缩功能,服务端会在后台启动一个Cleaner线程池,定期将相同的key进行合并,只保留最新的value值

注意:需要开启压缩,的前提,是-- 消费者只关心key对应的最新value

七、磁盘存储的性能问题

kafka 才用磁盘存储怎么保证性能,采用如下

  • 顺序写

  • 零拷贝

  • 页缓存:https://www.jianshu.com/p/92f33aa0ff52

八、kafka消息的可靠性

没有任何一个中间件可以做到百分之百的完全可靠,可靠性更多的还是基于几个9的衡量标准,比如4个9,5个9。软件系统的可靠性只能够无限去接近100%,但不可能达到100%。所以kafka如何实现最大可能的可靠性?

  • 分区副本

    使用分区副本保证可靠性,但是也会带来性能上的开销

  • acks

    生产者发送消息的可靠性,也就是我要保证这个消息一定到了broker并且完成了多副本的持久化,但是这个要求也会带来性能上的开销,它有几个可选项

    • 0:表示producer 不需要等待broker的消息确认。这个选项时延最小但是同时风险最大(因为当server宕机时,数据将会丢失)。
    • 1:表示producer 只需要获得kafka集群中的leader 节点确认即可,这个选择时延较小同时确保了leader节点确认接收成功。
    • all(-1):需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR 可能会缩小到仅包含一个Replica,所以设置参数为all并不能一定避免数据丢失。
  • 保障消息到了broker之后,消费者也需要有一定的保证,因为消费者也可能出现某些问题导致消息没有消费到

  • enable.auto.commit 默认为true,也就是自动提交offset,自动提交是批量执行的,有一个时间窗口,这种方式会带来重复提交或者消息丢失的问题,所以对于高可靠要求的程序,要使用手动提交。对于高可靠要求的应用来说,定远重复消费,也不应该因为消费异常而导致消息丢失。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/813184
推荐阅读
相关标签
  

闽ICP备14008679号