当前位置:   article > 正文

Kafka学习笔记-常用命令_kafka describe topic

kafka describe topic

特性

  • 高吞吐量、低延迟,kakka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分为多个分区,消费组可以对分区进行消费操作。
  • 可扩展性;kafka集群支持热扩展
  • 扩展性;可靠性,消费被持久化到本地磁盘,并支持数据备份防止数据丢失;
  • 容错性;允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  • 高并发:支持数千个客户端同时读写

使用场景

  • 日志收集:一个公司可以用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等;
  • 消息系统:解藕和生产社和消费者、缓存信息等;
  • 用户活动踪迹:kafka经常被用来标记web用户或者app用户的各种活动,比如浏览网页、搜索、点击灯活动、这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘;
  • 运营指标:kafka可以用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
  • 流处理:比如spark streaming和storm;

技术优势

kafka的两个重要特性造就了他的可伸缩性

  • kafka集群在运行期间可以轻松的扩展或收缩(添加活删除代理),而不会宕机,
  • 可以扩展一个kafka主题来包含更多的分区,由于一个分区无法扩展到多个代理,所以它的容量受到代理磁盘的空间限制,能够增加分区和代理的数量意味着单个主题可以存储的数据量是没有限制的;
  • 容错性和可靠性:kafka的设计方式使某个代理的故障能够被集群中的其他代理检测到,由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行;
  • 吞吐量:代理能够以超快的速度有效地存储和检索数据;

概念详解

截屏2022-03-24 下午11.23.35

  • Producer :生产者即数据的发布者,该角色将消息发布到kafka的topic中,broker接收到生产者发送的消息后,broker将该消息追加到当前用于住家数据的segment文件中,生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition;
  • Consumer:消费者可以从broker中读取数据,消费者可以消费多个topic中的数据;
  • Topic:在kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称topic,如果把kafka看作一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名;
  • Partition:topic中的数据分割为一个或多个partition,每个topic至少有一个partition,每个partition中的数据使用多个segment文件存储,partition中的数据是有序的,partition间的数据丢失了数据的顺序,如果topic有多个partition,消费数据时就不能保证数据的顺序,在需要严格保证消息的消费顺序的场景下,需要将patition数目设为1;
  • Partition offset:每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置;
  • Replicas of partition:副本是一个分区的备份,副本不会被消费者消费,副本只能用于防治数据丢失,即消费者不从为foollower的partition中消费数据,而是从为leader的partition中读取数据,副本之间是一主多从的关系;
  • Zookeeper:zookeeper负责维护和协调broker,当kafka系统中新增borker或者某个broker发生故障失效时,由zookeeper通知生产者和消费者,生产者和消费者依据zookeeper的broker状态信息与broker协调数据的发布和订阅任务;
  • AR(Assigned Replicas): 分区中所有副本统称为AR;
  • ISR(In-Sync Replicas):所有与leader部分保持一定程度的副本组成ISR;
  • OSR(Out-of-Sync-Replicas) :与Leader副本同步滞后过多的副本;
  • HW(High Watermark):高水位,标识了一个特定的offset,消费者只能提取到这个offset之前的消息;
  • LED(Log End Offset):即日志末端位移,记录了该副本底层日志中的下一条消息的位移值,注意是下一条消息,也就是说 如果LED = 10,那么表示该副本保存了10条消息,位移值范围是[0,9];

测试消息生产与消费

创建主题
命令:bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --create --topic test --partitions 2 --replication-factor 1
  • 1
  • –Zookeeper :指定kafka所连接的zookeeper服务地址
  • –topic:指定了所要创建的主题名称
  • –partitions:指定了分区个数
  • –relication-factor:指定了副本因子
  • –create:创建主题的动作指令
//运行结果
bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --create --topic test --partitions 2 --replication-factor 1
Created topic test.
  • 1
  • 2
  • 3
展示所有主题
命令:bin/kafka-topics.sh --zookeeper localhost:2181 --list
  • 1
  • –list 查看主题列表

运行结果

bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --list
__consumer_offsets
czh
jm
mykafka
test
bash-5.1#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

查看主题详情

命令:bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --describe --topic mykafka
  • 1
  • –describe 查看详情动作指令

运行结果

bash-5.1# bin/kafka-topics.sh --zookeeper 192.168.72.249:2181 --describe --topic mykafka
Topic: mykafka	TopicId: yyJojBfxT1azmNJcUAMoaQ	PartitionCount: 1	ReplicationFactor: 1	Configs:
	Topic: mykafka	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
bash-5.1#
  • 1
  • 2
  • 3
  • 4
启动消费端接收消息
命令:bin/kafka-console-consumer.sh --bootstrap-server 192.168.72.249:9092 --topic mykafka
  • 1
  • –bootstrap-server 指定了连接kafka集群的地址
  • -topic 指定了消费端订阅的主题

运行结果

bash-5.1# kafka-console-consumer.sh --bootstrap-server  192.168.72.249:9092 --topic mykafka
  • 1
生产端发送消息
 命令:bin/kafka-console-producer.sh --broker-list 192.168.72.249:9092 --topic mykafka
  • 1
  • –broker-list 指定了连接的kafka集群的地址
  • –topic 指定了发送消息时的主题
bash-5.1bin/kafka-console-producer.sh --broker-list 192.168.72.249:9092 --topic mykafka
  • 1

生产者详解

1.分区器

本身kafka有自己的分区策略,如果未指定,就会使用默认的分区策略。

kafka根据传递消息的key来进行分区的分配,即hash(key)%numPartitions(分区的数量),如果key相同的话,那么就会分配到统一的分区

2.拦截器

Producer拦截器是在0.10版本引入的,主要用于实现clinets端的定制化控制逻辑

使用场景:

  1. 按照某个规则过滤掉不符合要求的消息
  2. 修改消息的内容
  3. 统计类需求
3.发送原理剖析

截屏2022-03-25 上午11.54.30

消息发送的过程,涉及到两个线程的协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程之间的缓冲器)中暂存,Sender线程负责将消息构成请求,并最终执行网络i/o的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,kafkaProducer是线程安全的,多个线程之间可以共享使用同一个kafkaProducer对象。

4.ack副本

这个参数是用来指定分区必须有多少个副本收到这条消息,之后生产者才会认为这条消息是写入成功的,ack是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡

  • ack=0,生产者在成功写入消息之前不会等待任何来自服务器的响应,如果出现问题生产者是感知不到的,消息就会丢失,不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  • ack=1.默认值为1,只要集群的首领节点收到消息,生产者机会收到一个来自服务器的成功响应,如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误的响应,为了避免数据丢失,生产者会重发消息,但是,这样还是会有可能导致数据丢失,如果收到写成功通知,此时首领节点还没来得及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
  • ack=-1,只有当所有参与复制的节点都收到消息时,生产者会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
  • ack参数是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常。
5.retries

生产者从服务器收到错误有可能是临时性的错误(比如分区找不到首领),在这种情况下,如果达到了reties设置的次数,生产者机会放弃重试并返回错误,默认情况下,生产者在每次重试之间的等待100ms,可以通过retry.backoff.ms参数来修改这个事件间隔。

6.batch.size

当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次中,该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息的格式。当批次被填满后,批次里的所有消息会被发送出去,不过生产者并不一定会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次可能被发送,所以就算把batch.size设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁的发送消息而增加一些额外的开销。

7. Max.request.size

该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指定单个请求里所有消息的总大小,broker对可接收的消息最大值也有自己的限制(message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被broker拒绝。

消费者详解

消费者和消费组:

消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息,假设有一个T1主题,该主题有4个分区,同时我们有一个消费组G1,这个消费组只有一个消费者C1,那么消费者会收到这4个分区的消息,如下所示:

截屏2022-03-25 下午12.37.31

kafka一个很重要的特性是,只需要写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。应用需要有不同的消费组,对于上面的例子,例如我们新增了一个全新的消费组G2,而这个消费组有两个消费者,那么会是这样的:(不同业务接收同一个消息例如下单后的短信和邮件通知功能)截屏2022-03-25 下午12.44.59

订阅主题和分区

创建完消费者后我们可以订阅主题,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic))
  • 1
  • 2

另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接kafka与其他系统时非常有用,比如订阅所有的测试主题:

consumer.subscribe(Pattern.compile("jm"))
  • 1

指定订阅的分区

//指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic0701",0)))
  • 1
  • 2
反序列化
//与kafkaProducer中设置保持一致
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDesrializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
  • 1
  • 2
  • 3
位移提交

对于kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。

当我们调用poll()时,该方法会返回我们没有消费的消息,当消息从broler返回消费者时,broker并不跟踪这些消息是否被消费者收到;kafka让消费者自身来管理消费的位移,并小消费者提供更新位移的接口,这种更新位移的方式称为提交。

截屏2022-03-25 下午3.06.09

截屏2022-03-25 下午3.06.44

  • 自动提交

    这种方式让消费者来管理位移,应用本身不需要显式操作,当我们将enable.auto.commint设置为true,那么消费者会在poll方法调用后每隔5秒,(由auto.commit.interval.ms指定)提交一次位移,和其他很多操作一样,自动提交也是由poll()方法来驱动的,在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移值。

    需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后,kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

  • 同步提交

    截屏2022-03-25 下午3.31.17

  • 异步提交

    手动提交有一个缺点,就是当发起提交时应用会阻塞,当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和手动提交一样),另外一个解决方案是,使用异步提交的api

    但异步提交也有个缺点,就是如果服务器返回提交失败,异步提交不会进行重试,相比较,同步提交会进行重试知道成功或者最后抛出异常给应用,异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖,举个例子,假如我们发起了一个异步提交commitA,此时的提交位移是2000,随后又发起了一个异步提交commitB的位移为3000,commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上已经提交的位移从3000回滚到2000,导致消息重复消费。

    • 使用场景:大量消息,但运行重复消费
  • 指定位移消费

    到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来进行处理的,但是这个方法对于普通开发人员来说就是黑盒处理,无法精确掌握其消费的起始位置

    seek()方法正好提供这个功能,让我们得以追踪之前的消费或者回溯消费。截屏2022-03-25 下午3.44.41

  • 指定从分区末尾开始消费,

    Map<TopicPartition,Long> offsets = consumer.endoffsets(assignment);
    for(TopicPartition tp :assignment){
      consumer.seek(tp,offsets.get(tp))
    }
    
    • 1
    • 2
    • 3
    • 4
再均衡监视器

再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全的删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。截屏2022-03-25 下午3.54.55

消费者拦截器
  • 使用场景:对消费消息设置一个有效期属性,如果某条消息在既定的时间窗口内无法达到,那就视为无效,不需要再被处理。
消费者参数补充
  • Fetch.min.bytes

    这个参数允许消费者指定从broker读取消息时最小的数据量,当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者,对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间,而对于有大量消费者的主题来说,则可以明显减轻broker压力

  • Fetch.max.wait.ms

    上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间的阻塞,这个参数默认时500ms

  • Max.partition.fetch.bytes

    这个参数指定了每个分区返回的最多字节数,默认为1M,也就是说,kafkaCOnsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M,如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息,实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

  • Max.poll.records

    这个参数控制一个poll调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量

分区

kafka可以将主题划分为多个分区(partition),会根据分区规则选择吧消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样实现了负载均衡和水平扩展,另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力。

由于消息时以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还高,是kafka高吞吐率的重要保证之一。

1.副本机制

由于producer和Consumer都只会与leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用,kafka支持主被复制,所以消息具备高可用和持久性。

一个分区可以有多个副本,这些副本保存在不同的broker上,每个分区的副本中都会有一个作为leader,当一个broker失败时,lerder在这台borker上的分区都会变的不可用,kafka会自动移除leader,再其他副本中选择一个作为新的leader。

在通常情况下,增加分区可以提供kafka集群的吞吐量,然而,也应该意识到集群的总分区数,或者单台服务器上的分区数过多,会增加不可用或者延迟的风险。截屏2022-03-25 下午4.38.59

2.分区Leader选举

如果在某个分区上leader挂了,那么其他副本会进行选举产生一个新的leader,之后所有的读写就会转移到这个新的leader上,在kafka中,其不是采用常见的多数选举方式进行副本的leader选举,而是会在zookeeper上针对每个topic维护一个成为ISR的集合,显然还有一些副本没来得及同步,只有这个ISR列表里面的才有资格成为leader。

通过ISR,kafka需要的冗余度比较低,可以容忍的失败数比较高,假设某个topic上有f+1个副本,kafkake可以容忍f个不可用,当然,如果全部ISR里面的副本不可用,也可以选择其他可用的副本,只是存在的数据不一致。

3.分区重新分配

在部署好的kafka集群中添加机器是正常需求,但是新添加的kafka节点并不会自动的分配数据,所以无法分担集群的负载,除非我们新建一个topic,但是我们想手动将部分分区移到新添加的kafka节点上,kafka内部提供了相关的工具来重新分配某个topic的分区

具体步骤:

  1. 我们创建一个有三个节点的集群,详情可查看集群搭建章节
bin/kafka-topic.sh --create --zookeeper localhost:2181 --topic jm --partitions 3 --replication-factor 3
  • 1

详情查看

bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic jm
  • 1
  1. 添加一个分区
bin/kafka-topic.sh --alter --zookeeper localhost:2181 --topic jm
  • 1
  1. 添加一个broker节点

    查看主题信息

    bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic jm
    
    • 1
  2. 重新分配

    我们需要将原来分布在broker 1-3节点上的分区重新分布到1-4节点上,借助kafka-reassign-partitions.sh工具生成reassign plan,不过我们得按照要求定义一个文件,里面说明哪些topic需要重新分区,文件内容如下:

    cat reassign.json
    {"topics":[{"topic":"jm"}],
     "version":1
    }
    
    • 1
    • 2
    • 3
    • 4

    然后使用Kafka-reassign-partitions.sh工具生成 reassign plan

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topic-to-move-json-file reassign.json --broker-list"0,1,2,3" --generate
    
    • 1

    截屏2022-03-25 下午8.27.14

    • –generate 表示指定类型参数

    • –topics-to-move-json-file 指定分区重新分配对应的主题清单路径

    • 命令输入两个json字符串,第一个json内容为当前的分区副本分配情情况,第二个为重新分类的候选方案,注意这里只是生成了一份可行性的方案,并没真正的执行重新分配的动作

    • 我们将第二个json内容保存到名为result.json文件中,然后执行这些rreassign plan;命令如下:

      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignm
      
      • 1

      查看重新分区的进度

      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json --verify
      
      • 1

      截屏2022-03-25 下午8.31.34

分区分配策略
  • RangeAssignor分配策略

    RangeAssignor策略是的原理是按照消费者总数进行整除运算获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀地分配给所有的消费者,对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区

    假设n=分区数量/消费者数量,m=分区水量%消费者数量,那么前m个消费者每个分配n+1个分区,后面的消费者每个分配n个分区

    假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么所订阅的所有分区可以标识为t0p0、t0p1、t0p2、t0p3、t1p1、t1p2、t1p3.最终所有分配结果为:

    消费者C0:t0p0、t0p1、t1p0、t1p1
    消费者C1:t0p2、t0p3、t1p2、t1p3
    
    • 1
    • 2

    假设上面的例子中2个主题只有三个分区,那么所订阅的所有分区可以表示为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2.最终分配结果为:

    消费者C0:t0p0、t0p1、t1p0、t1p1
    消费者C1:t0p2、t1p2
    
    • 1
    • 2

    可以明显的看到这样的分配是不均匀的,如果将此类情形扩大,有可能出现部分消费者过载的情况。

  • RoundRobinAssignor分配策略

    RoundRobinAssignor策略的原理是将消费组内的所有消费者以及订阅者所订阅的所有topic的partititon按照字典序排序,然后通过轮询的方式逐个将分区以此分配给每个消费者,RoundRobinAssignor策略对应的partition.assignment.strategy参数值为org.apche.kafka.clients.consumer.RoundRobinAssignor

    假设消费组中有两个消费者C0和C1,都订阅了主题t0和t1,并且每个主题有三个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2.最终分配结果为:

    消费者C0:t0p0、t0p2、t1p1
    消费者C1:t0p1、t1p0、t1p2
    
    • 1
    • 2

    如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配的不均匀,如果某个消费者没有订阅消费组内某个topic,那么在分配分区的时候此消费者将分配不到的这个topic的任何分区,

    假如消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题,t0、t1、t2,这三个主题分别1、2、3个分区,即整个消费组订阅了t0p1、t1p0、t1p1、t2p0、t2p1、t2p2这六个分区,具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者c2订阅的是t0、t1和t2,那么最终分配结果:

    消费者C0:t0p0
    消费者C1:t1p0
    消费者C2:t1p1、t2p0、t2p1、t2p2
    
    • 1
    • 2
    • 3

    可以看到RoundRobinAssignor策略也不是十分完美的,这样分配其实不是最优结果,因为完全可以将分区t1p1分配给消费者1.

  • StickyAssignor分配策略

    kafka从0.11.X版本开始引入这种分配策略,它主要有两个目的

    • 分区的分配尽可能的均匀
    • 分区的分配尽可能的于上次的分配保持相同

    当两者发生冲突时,第一个目标优先于第二个目标,鉴于这两个目标, StickyAssignor的策略的具体实现要比RoundRobinAssignor和RangeAssignor的这两种分配策略复杂的多。

    假设消费组内有3个消费者,C0、C1、C2,它们都订阅了4个主题,t0、t1、t2、t3,并且每个主题有两个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t3p0、t301这八个分区,最终的分配结果如下:

    消费者C0:t0p0、t1p1、t3p0
    消费者C1:t0p1、t2p0、t3p1
    消费者C2:t1p0、t2p1
    
    • 1
    • 2
    • 3

    假设此时消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配,如果采用了ROundRobinAssignor策略,那么此时的分配结果如下

    消费者C0:t0p0、t1p0、t2p0、t3p0
    消费者C2:t0p1、t1p1、t2p1、t3p1
    
    • 1
    • 2

    如分配结果所示,RoundRobingAssignor策略会按照消费者C0和C2进行重新轮询分配,而如果此时使用的是StickyAssignor策略,分配结果如下

    消费者C0:t0p0、t1p1、t3p0、t2p0
    消费者C2:t0p1、t1p0、t3p1、t2p1
    
    • 1
    • 2

    可以看到分配结果还保留着上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的负担分配给剩余的两个消费者C0和C2,最终C0和C2的分配还保持了平衡

  • 自定义分配策略

    需要实现:org.apche.kafka.clients.consumer.internals.PartitionAssignor

    继承:org.apche.kafka.clients.consumer.internals.AbstractPartitionAssignor

kafka存储

存储结构

截屏2022-03-27 下午7.33.46

  • 每一个partition文件相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中,但每一个段segment file消息数量不一定相等,这样的特性方便old segment file高速被删除,(默认情况下每个文件大小为1G)
  • 每一个parition仅仅需要支持顺序读写即可,segment文件生命周期由服务端配置参数决定
partition和segment文件存储结构

Segment file组成:由两部分组成 分别为index file和data file,此2个文件一一对应,成对出现后,后缀.index和.login分别表示segment索引文件、数据文件

segment文件命名规则:partition全局的第一个segment从0开始,兴许每一个segment文件名称为上一个segment文件最后一条消息的offset值。

数组最大为64位long大小,19位数字字符长度,没有数字用0填充

日志索引
数据文件的分段

kafka解决查询效率的方法之一是将数据文件分段,比如有100条message,它们的offset是从0到99,假设将数据文件分成5段,第一段位0-19,第二段位20-39,以此类推,每段放在一个单独的数据文件中,数据文件以该段中的最小offset命名,这样在查找指定offset的message的时候,用二分法查找就可以定位到该message在哪个段中。

偏移量索引

数据文件分段使得可以在一个较小的数据文件中查找对应的offset的message了,但是这依然需要顺序扫描才能找到对应的offset的message,为了进一步提高查询效率,kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名称一样,只是文件扩展名为.index

截屏2022-03-27 下午7.55.13

比如,要查找绝对索引offset为7的message

首先是用二分查找法确定它在哪个logSegment中,自然是在第一个segment中,打开这个segment的index文件,也是用二分查找法找到小于或者等于offset的索引条目中最大的那个offset,自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的message在数据文件中的位置为9807.

打开数据文件,从位置9807的那个地方开始顺序扫描到o ffset为7的那条message.

这套机制是建立在offset是有序的,索引文件被映射到内存中,所以查找的速度还是很快的。

总结:kafka的message存储采用了分区(partition)、分段(LogSegment)和稀疏索引这几个手段来达到高效性。

日志清理
日志删除

kafka日志管理器允许定制删除策略,目前的策略是删除修改时间在N天以上的日志(按时间删除),也可以使用另外一个策略,保留最后N GB的数据(按大小删除),为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照的副本上进行的,这类似于java的CopyOnWriteArrayList

kafka消费日志删除思想,kafka把topic中一个partition大文件分成了多个小文件段,通过小文件段,就容易定期清理或删除已经消费完的文件,减少磁盘占用

log.cleanip.policy=delete 启用删除策略
直接删除,删除后的消息不可恢复,可配置一下两个策略
清理超过指定时间的:
log.retention.hours=16
超过指定大小后,删除旧的消息
log.retention.bytes = 1073734556
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
日志压缩

将数据压缩,只保留每个key最后一个版本的数据,首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的,在topic的配置中设置log.cleanup.policy=compact启用压缩策略

截屏2022-03-27 下午8.16.35

压缩后的offset可能是不连续的。比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset打的offset对应的消息,比如试图拿到offset为5的消息,实际上会拿到offset为6的消息,并从这个位置开始消费。

这种策略只适合特种场景,比如消息的key是用户id,消息体是用户的资料,通过这种压缩策略,整个消息集里保存了所有用户的最新消息。

压缩策略支持删除,当某个key的最新版本的消息没有内容时,这个key被删除,这也符合以上逻辑。

磁盘存储优势

kafka在设计时,采用了文件追加的方式来写入消息。即只能在日志文件的尾部追加新的消息。并不会修改已经写入的消息。这种方式属于典型的顺序写入磁盘的操作。所以就算时kafka的使用磁盘作为介质,所能实现的吞吐量也是非常可观的。

kafka中大量使用页缓存,这页是kafka实现高吞吐量的重要因素之一。

除了消息顺序追加,页缓存等技术,kafka还是用了零拷贝的技术来进一步提升性能,“零拷贝技术”只用将磁盘文件的数据复制到页面的缓存中一次,然后u讲数据从页面缓存直接发送到网络中,(发送给不同的订阅着时,可以使用同一个页面缓存)避免大量重复操作,如果有10个消费者,传统方式下,复制数据的次数为4*10=40次,而使用“零拷贝技术”,只需要1—+10=11次,一次从磁盘复制到页面缓存,10次代表10个消费者各读取一次页面缓存。

稳定性

幂等性

所谓幂等性就是对接口的多次调用所产生的结果和调用一次时一致的,生产者在进行重试的时候有可能会重复写入消息,使用kafka的幂等性功能就可以避免这种情况。

幂等性是有条件的:

  • 只能保证Producer在单个绘画内不丢不重,如果Producer出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)。
  • 幂等性不能够跨多个topic-Partition,只能保证单个partition内的幂等性,当设计到多个topic-Partition时,这中间的状态并没有同步。

producer使用幂等性的示例非常简单,与正常的情况下Producer使用相比较变化不大,只是需要把Producerd的配置enable.idempotence设置为true即可,默认为true 如下所示:

Properties props = new Propertues();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE__CONFG,"true");
props.put("acks","all") //当enable.idempotence 为true,这里默认为all
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
kafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic,"test"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
事务
场景

幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性,操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能性。

为了实现事务,应用程序必须提供唯一的transactionalid,这个参数通过客户端程序来进行设定。

properties.put(ProducerCOnfig.TRANSACTIONAL_ID_CONFIG,transactionid);
  • 1
前期准备

事务要求生产者开启幂等性特性,因此通过transactionalid参数设置为非空从而开启事务特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认为true),如果显示设置为false,则会抛出异常。

kafkaProducer提供了5个与事务相关的方法,如下:

//初始化事务,前提是配置了transactionalid
public void initTransactions(){
  //开启事务
  public void beginTransaction();
  //为消费者提供事务内的位移提交操作
  public void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets,String consumerGroupId)
  //提交事务
   public void commitTransaction()
	//终止事务 ,蕾丝回滚
    public void aboraTransaction()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

截屏2022-03-27 22.06.50

控制器

在kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(kafka COntroller),它负责管理整个集群中所有分区和副本的状态,当某个分区的leader副本出现故障时,由控制器负责为该分区选择新的leader副本,当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其原数据信息,当使用kafka——topics.sh脚本为某个topic增加分区时,同样由控制器负责分区的重新分配。

kafkad的控制器选举工作依赖于zookeeper,成功竞选为控制器的broker会在zookeeper中创建controller这个临时节点,

在任意时刻,集群中有且仅有一个控制器,每个broker启动的时候会尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他broker节点成功竞选为控制器,所以当前broker节点放弃竞选,如果zookeeper中不存在/controller这个节点,或者这个节点的数据异常,那么就会尝试创建/controller这个节点,当前broker去创建节点时,也有可能其他broker同时去尝试创建broker节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker表示竞选失败,每个broker会在内存中保存当前控制器的broker的值,这个值可以标识为activecontrollerid,

zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久节点,节点中存放的是一个整型的controller_epoch值,controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也称为控制器纪元。

controller_epoch的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1,每个和控制器交互的请求都会携带上controller_epoch字段,如果请求的controller_epoch小于内存中的controller_epoch值,则认为这个请求是已经过期的控制器发送的请求,那么这个请求会认为是无效请求,如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了,由此可见,kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。

具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

  1. 监听partition相关变化
  2. 监听topic相关变化
  3. 监听broker相关变化
  4. 从zookeeper中读取获取当前所有与topic、partition以及broke r有关信息进行响应管理。
可靠性保证
  1. 可靠性保证,确保系统在各种不同环境中能够发生一致的行为,

  2. kafka的保证

    • 保证分区消息的顺序

      • 如果使用同一个生产者忘同一个分区写入消息,而且消息b在消息a之后写入
      • 那么kafka可以保证消息b的偏移量比消息a的偏移量大,而且消费者会先读取消息a,再读取消息b,
    • 只有当消息被写入分区的所有同步副本时,它才会认为是已提交状态。

      • 生产者可以选择接收不同类型的确认,控制参数acks
    • 只要还有一个副本时活跃的,那么已提交的消息就不会丢失

    • 消费者只能读取已经提交的消息

  • 失效副本

    怎么判断一个分区是否有副本是处于同步失效状态呢,从kafka0.9.X版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为10000)来控制,当ISR中的一个follower副本滞后leader副本的时间超过参数replica.lag.time.max.ms指定的时间即判定为副本失效,需要将次follower副本剔出除ISR之外,具体实现原理很简单,当follower副本将leader副本的LED(Log End Offset,每个分区的最后一条消息的位置)之前的日志钱不同步,则认为该follower副本已经追赶leader副本,此时更为该副本的lastCaughUpTimeMs标识。kafka的副本管理器启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值,千万不要错误的认为follower副本只要拉取leader副本的诗句就会更新lastCaughtUpTimeMs,试想当leader副本的消息流入速度大于follower副本的拉取速度时,follower副本一致不断的拉取leader副本的消息也不能与leader副本同步,如果此follower副本处于ISR中,那么当leader副本失效时,而选取此follower副本为新的leader副本,那么就会有严重的消息丢失。

  • 副本复制

  • kafka中的每个主题分区都被复制n次,其中的n是主题的复制因子,这允许kafka在集群服务器发生故障时自动切换这些副本,以便在出现故障时消息仍可用,kafka的复制时以分区为粒度的,分区的预写日志被复制到n个服务器,在n个副本中,一个副本作为leader,其他副本成为followers,顾名思义,producer只能往leader分区上写数据(读也只能从leader分区上进行),followers只按照顺序从leader上复制日志

    一个副本可以不同步leader有如下几个原因

    • 慢副本:在一定周期时间内follower不能追赶上leader,最常见的原因之一是I/o瓶颈导致follower追加复制消息慢于从leader拉取速度
    • 卡住副本:在一定周期时间内follower停止从leader拉取请求,follower replica卡住了是由于GC暂停或follower失效或死亡

    新启动副本,当用户给主题增加副本因子时,新的follower不再同步副本的列表中,知道他们完全赶上了leader日志

    如何确定副本时滞后的截屏2022-03-27 23.58.28

    在服务器端现在只有一个参数需配置replica.lag.time.max.mx,这个参数解释replicas响应partition leader的最长等待时间,检测卡住或者失效副本的探测,如果一个replica失败导致发送来取请求时间查过replica.lag.time.max.mx,kafka会认为此replica已经死亡会从同步副本列表中剔除,检测慢副本机制会发生变化,如果一个replica开始落后于leader超过replica.lag.time.max.mx,,kafka会认为太缓慢并且从同步副本中移除,除非replica请求leader时间间隔大于replica.lag.time.max.mx,因此即使leader使流量激增和大批量写消息,kafka也不会从同步副本中移除该副本。

一致性保证
  • 在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW(最全的数据),可以保证在切换leader之后,消费者可以继续看到HW之前已经提交的数据。
  • HW的截断机制,选出了新的leader,而新的leader并不能保证已经完全同步了之前的leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截取到HW的位置,再和新的leader同步数据,来保证数据的一致,当宕机的leader恢复过来后,会发现新的leader中的数据和自己持有的书就不一致,此时宕机的leader会将自己的数据截断到宕机之前的HW位置,然后同步新的leader的数据,宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。

截屏2022-03-28 00.10.30

截屏2022-03-28 00.11.49

截屏2022-03-28 00.12.27

截屏2022-03-28 00.13.04

Kafka 0.11.0.0版本解决方案

早上上述两个问题的根本原因是在HW值被用于衡量副本的成功与否以及出现在failture时作为日志截断的依据,但HW的值更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能完成更新,故这中间发生的任何崩溃都会导致HW的值过期,鉴于这些原因,Kafka0.11版本引入的leader epoch来取代HW的值,leader端多开辟了一段内存区域专门保存leader的epoch消息,这样即使出现上看的场景也能很好的规避这些问题

所谓leader epoch实际上是一对值,(epoch,offset)epoch表示leader的版本号,从0开始,当leader变更过1次,epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的唯一,因此假设两个值

(0,0)

(1,120) 表示第一个leader从位移0开始写入消息,共写了120条(0,119),而第二个leader版本号为1,从位移120处开始写入消息

Leader broker中会保存这样一个缓存,并定期写入一个checkpoint文件中。

避免数据丢失:

截屏2022-03-28 00.23.12

避免数据不一致:

截屏2022-03-28 00.25.09

消息重复的场景及解决方案
生产端重复

生产发送的消息没有收到正确的broker响应,导致producer重试

producer发出一条消息,broker落盘后因为网络原因发送端得到一个发送至呗的响应或者网络中断,然后producer收到一个可恢复的Exception重试消息导致消息发送失败

解决方案:

  • 启动kafka的幂等性

    • 启动kafka的幂等性,无需修改代码,默认关闭,需要修改配置文件enable.idempotence=true 同时要求ack=all且retries >1
  • Ack = 0 不重试

    • 可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,例如:数据收集
消费端重复

根本原因:

数据消费完没有及时的提交offset到broker

解决方案

  • 取消自动提交

    • 每次消费完或者程序退出时手动提交,这也可能没办法保证一条重复
  • 下游做幂等

    • 一般的解决方案是让下游做幂等性或者尽量每消费一条消息就记录offset,对于少数严格的场景可能需要把offset或者唯一id,例如订单的id和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表同时记录消费的offset,然后更新下油的数据时用消费位点做了乐观锁拒绝旧位的数据更新

高级应用

命令行工具
消费组管理
  • 查看消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 1
  • 查看消费组详情
 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo(消费组名)
  • 1
  • 查看消费组当前状态
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state
  • 1

消费组内成员信息

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members
  • 1
  • 删除消费组,如果有消费者在使用则会失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/468202
推荐阅读
相关标签
  

闽ICP备14008679号