当前位置:   article > 正文

Kafka知识点总结_c#framework向kafka发送数据

c#framework向kafka发送数据

1 基本信息

1)组成:zookeeper中存放的信息brokerId,topic,消费者相关的信息(高版本中消费者信息存放在kafka集群内部),没有存放生产者相关信息。
2)kafka一般安装多少台:2*(生产者峰值生产速率副本/100)+1=3
3)压测:生产者峰值生产速率
4)副本:默认1 ,一般2-3个副本。副本越多可靠性越高,但会增加磁盘IO,效率低下
5)数据量问题:
100万日活 没人100条 100W
100=1亿
一条日志:0.5-2K~1K
1K1亿=100g
平均速度速度:1亿条/(24
3600S)=1150条/秒
1150条/秒1k=1m/s
峰值速度:?
6)kafka数据保存时间
默认保存7天,生产环境一般保存3天
7)磁盘预留多大空间
100g
2副本*3天/0.7
8)kafka监控
kafkamanager KafkaEagle
9)分区: 合适的分区提高kafka并发度
先设置一个分区;进行测试生产者峰值生产速率tp,消费者消费的峰值速率tc
分区数=t/min(tp,tc) t:kafka希望达到的吞吐量预期
一般3-10个
10)分区分配策略
1.Range(默认)
10个分区 3个消费者线程
0 1 2 3 容易发生数据倾斜
4 5 6
7 8 9
2.RoundRobin
先将分区hash打散,再轮询
11)ISR
主要用于选举leader挂了,选举新的leader;在ISR队列中的都有机会选上
老版本中:延迟时间,延迟条数;
新版本:延迟时间(及leader向follower发送数据同步时,follower向leader响应(ack)的时间,如果follower一段时间没有响应,就会被踢出ISR)

2 Kafka挂了怎么办

短时间:数据会存储在上游flume channel中(Producer)
长时间:日志服务器有备份,一般保存30天,可以采集重新跑一遍
  • 1
  • 2

3数据丢失

kafka中有ack应答机制保证
0 生成者只管发送数据,不等broker落盘,传输效率高,可靠性较低
1 等待leader应答;不用等follower应答,可能leader接收数据落盘之后,还没有吧数据发送给follower,leader就挂掉了,数据还是可能会丢失;可靠性一般,传输效率一般
-1 leader+follower共同应答;可靠性最高,效率最低。follower是ISR队列中的follower
生产环境下:
一般不选0,
1 一般传输普通日志,对可靠性要求不高
-1 一般对可靠性要求比较高的场合下,如涉及钱的金融场景

Broker丢数据
通过ack机制来保证

Product丢失数据
为了提高吞吐,kafka一般将数据缓存在product的buffer中,然后批量发送。因此,基于buffer可以实现异步发送,提高效率,product通过callback来处理异常和发送失败的情况。但是如果product非法停止,则buffer中的数据就会丢失。异或是,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
在这里插入图片描述

根据上图,可以想到几个解决的思路:

  • 异步发送消息改为同步发送消息。或者service产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。
  • 扩大Buffer的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。
  • service不直接将消息发送到buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在buffer和service之间又加了一层空间更加富裕的缓冲层。

4数据重复

1 kafka自身特性解决
1)幂等性:单分区单会话内
kafka在一个分区中维护一个id列表,producer发送的数据会按kafka中的id去重,一旦夸会话就会失效(Producer挂掉再重启就无法保证)
2)事务
3)ack =-1
幂等性+事务+ack=-1 用的比较少,效率低,容易产生数据积压
2下一层处理
hive的dwd group by去重
sparkStreaming、redis去重

5数据积压

1 增加分区 同时增加下一级消费者的CPU核数(sparkstreaming)
2 增加下一级消费者消费速度
flume sparkStreaming,增加 batchsize 1000event/s=>2000event/s

6优化

7其他

1kafka高效读写数据
   1)kafka是集群模式,多点同时工作
   2)可以设置分区,并发读写
2底层采用顺序读写,读写速度可以达到600m/s,比随机读写(100m/s)快
3采用了零拷贝技术
https://editor.csdn.net/md/?articleId=108515004
4 kafka单条日志传输大小
   kafka消息体大小默认单条最大值是1M,如果消息大于1M则会出现生产者无法推送数据或消费者无法消费数据(kafka卡死)
需要进行配置
在这里插入图片描述
5 故障恢复细节
在这里插入图片描述
1)leader故障
   follower挂掉后,会从ISR中选举一个新的leader,为了保证多个副本之间数据的一致性,其他的follower会将各自log文件中高于HW的部分截取掉,然后从新的leader中同步数据
2)follower故障
   follower挂掉后会被踢出ISR,等该follower恢复后,会读取本地磁盘记录的上次的HW,将log文件中高于HW的部分截取掉,再向leader同步数据(同步完成之前处于失效状态),等该follower的LEO大于等于Partition的HW(及follower追上leader之后),就可以重新加入ISR

6 zookeeper在kafka中的作用

  • 保存broker、topic的信息,消费者组的信息保存在_consumer_offsets主题中。
  • zookeeper会保存kafka中brokerId,topic等信息,同时kafka集群中的controller也是通过zookeeper对集群进行管理。
  • kafka集群中有一个broker会被选举为controller,主要负责集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
       controller的管理工作都依赖于zookeeper
    partition的leader选举过程
                                           partition的leader选举过程
follower向leader的同步过程
  1. leader接收producer发送的消息,追加到本地日志中,更日志文件新偏移量,以及leader的LEO;
  2. follower会定时的向leader发送同步请求(默认3s),请求中会携带follower的LEO信息,leader根据ISR中follower的LEO的最小值更新HW。leader将HW以及消息一起返回给follower。
  3. follower将收到的消息追加到本地日志中,并更新偏移量。
Kafka不支持读写分离的原因

对于像zk、redis集群都支持读写分离(主写从读),这样可以帮助leader分担读取数据的压力。那为什么kafka不支持读写分离呢?
1)延时问题:leader写入的数据需要向follower同步,向redis的数据同步需要经过主节点网路->主节点内存->从节点网络->从节点内存的环节,而kafka同步时间更长,需要落盘,主节点网路->主节点内存->主节点磁盘->从节点网络->从节点内存->从节点磁盘。
2)kafka的主写主读模式实现了负载均衡:kafka对每一个topic进行了分区,在各分区leader均匀分布在不同broker上时,可以达到负载均衡。

Brokers的controller选取以及作用

选举策略:kafka集群启动的时候各个broker会向zk注册一个临时节点/controller,注册成功的broker成为集群的leader,其他broker发现节点已经存在,就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。
controller的作用
1)如果有一个broker退出了,controller会去检查这个broker是否有分区leader,如果有,那么这个分区需要选取一个新的leader。controller遍历所有副本选出新leader,同时更新分区ISR队列
2)如果有一个broker加入了,controller会根据其BrokerID去判断其是否有现有分区的副本,如果有,就要去分区leader同步数据

分区leader和follower以及ISR同步队列
  • 每个topic有多个partition,为了容灾每个partition有多个分区。分区leader负责读写数据(读写不分离),follower向leader发送同步数据请求(包含了需要同步数据的offset),leader通过follower的offset判断复制进度。如果follower超过10s没有向leader发送请求就会被踢出ISR队列。其中请求超时时间通过replica.lag.time.max.ms配置。
  • 为了Kafka的负载均衡,希望同一个topic的不同partition的leader分布在不同的broker中。因此,每一个分区都有一个首选首领,我们通过设置auto.leader.rebalance.enable为true,集群会检查首选首领是否是当前分区的leader,如果不是,会触发选举,将首选首领选举为leader。
消费组选leader(分区分配的具体实施者)

每个消费者组在服务端都有一个组协调器GroupCoordinator对其进行管理,客户端有一个ConsumerCoordinator消费者协调器与组协调器进行通信。在GroupCoordinator中会为消费者组选举出一个leader。这个leader负责该消费者组的分区分配的实施。

Kafka的分区分配策略

同一个消费者组内的消费者不能消费同一个主题的分区,但一个消费者可以消费多个分区。不同消费者组的消费者可以消费同一个主题的分区。
消费者组的优势:降低的消费者的压力,如果没有消费者组的话,一个topic的所有分区都要被一个消费者消费,消费压力增大。使用消费者组,组内每个消费者分配到的分区比较均匀,达到了平衡消费压力的效果。

  • Range策略(均分):分区数/消费者组内的消费者=n,分区数%消费者组内的消费者=m,消费者组内每个消费者分配n个分区,多出来的m个给前面的消费者。
  • RoundRobin轮询):将分区对消费者组的消费者依次轮询分配,具体做法是先将消费者组所有消费者订阅的主题分区按字典序排序,然后再进行轮询分发(只有当消费者组中所有消费者订阅topic相同的时候分配才均匀)。
  • StickyAssignor:①分区分配尽可能均匀,和RoundRobin类似。②分区分配尽可能与上一次分配保持相同。
    StickyAssignor的②原则保证了当消费者组的消费者发生变化的时候,分区分配不会发生太大的变化。
Consumer Group Rebalance

消费者重平衡的条件主要有三个:订阅的topic数量发生、topic的分区数量发生变换、消费者组的消费者数量发生变化。其中消费者组的成员数量发生变化主要有三种形式:新成员加入、组成员主动离开、组成员崩溃。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
消费者崩溃一般是与broker的心跳超时或者消息处理时间超时造成的。
kafaka 消费者配置的四个参数

  • session.timeout.ms 设置了超时时间,心跳超时时间,一般为心跳间隔的3倍,超时会认为consumer死亡,开启rebalance
  • heartbeat.interval.ms 心跳时间间隔,与broker保持联系
  • max.poll.interval.ms 每两次 poll 消息的时间间隔,既每次消费的最长处理时间,每次消息拉取之间的最大间隔时间,超过间隔时长没有拉取数据broker也会认为consumer死亡,开启rebalance
  • max.poll.records 每次拉取的最大消息数

当session.timeout.ms或者max.poll.interval.ms超时的时候都会触发consumer group的重平衡。

如果数据处理逻辑比较重,从而造成消费处理超时的话,就会发生rebalance。rebalance时会预留一端时间给consumer上报自己的offset信息。但是如果当前数据还没处理完,就发生rebalance的话,就会造成offset commit失败,下次poll会拉取到旧的数据(重复消费),因此要保证好消息处理的幂等性。

对于 rebalance 类问题,简单总结就是:处理好心跳超时问题和消费处理超时问题

消息发往那个分区
  • 当指定分区的时候发往指定分区;
  • 没有指定分区,当消息有key的时候,通过hash(key)%分区数确定消息发往的分区
  • 当没有指定分区,也没有key的时候,会随机选择一个分区。随机是指在参数topic.metadata.refresh.ms制定个时间后,随机选择一个分区,然后这段时间消息都发往这个分区,默认时间是10min重新随机选择一个分区。
Kafka优先副本选举

使leader副本均匀的分布到broker上,使分区均衡。但是分区均衡并不意味着负载均衡(分区均衡,数据在分区上还要均匀分区等)。

  • auto.leader.rebalance.enable,参数默认为true。kafka集群会定时轮询所有broker节点,计算分区不均衡率,使分区均衡。生产环境不要开启此功能,如果在高峰时期发送分区再均衡,会引起性能影响。
  • 使用kafka-preferred-replica-election.sh脚本,在特定时间分批手动进行分区平衡。(kafka-preferred-replica-election.sh --zookeeper hadoop102:2181 --path-to-json-file election.json
Kafka分区再均衡原理

当消费者组发生变化(消费者增加、退出、宕机)或者消费者组对应的topic发生变化(分区变化)时,会触发kafka分区的再均衡,重新分配分区(再均衡期间,消费者组内的消费者无法读取数据)。可以使用再均衡监听器,避免均衡后发生的消息重复消费问题。

  • 当一个客户端加入消费者组的时候,会先通过消费者客户端的ConsumerCoordinate(消费者协调器)先向集群发送一条FindCoordinatorRequest请求,找到其要加入的消费者组对应的GroupCoordinator(组协调器)所在的broker
  • 这个客户端向GroupCoordinator发送JoinGroupRequest请求加入消费者组,并为其生成一个唯一标志member_id。如果此时消费者组没有leader或leader退出,会为其选一个leader。服务端根据消费者组中各个消费者的分区分配策略选出消费者组的分区分配策略,并返回给消费者。
  • leader根据分区分配策略选出分区分配方案,将方案提交给GroupCoordinatorGroupCoordinator将返回的消费组的元数据存入_consumer_offsets主题中,并向消费者返回其各自消费的分区。
  • 消费者通过ConsumerCoordinate(消费者协调器)向GroupCoordinator发送心跳来维持他们与消费组的从属关系,如果GroupCoordinator长时间没有接收到心跳,就会认为该消费者退出,触发重分区。
Kafka分区数量变化
  1. 增加分区:kafka增加分区数会导致按key分区的行为受到影响,原本既定的消息顺序也会受到影响。改变分区之后kafka的原始数据不会主动迁移,使用脚本kafka-reassign-partitions.sh进行分区重分配
  2. 目前不支持分区减少:因为分区减少,如果不保留数据的话会造成数据的丢失,如果保留数据的话,直接追加到现有分区的尾部会破坏现有分区的有序性。所有如果想要减少分区可以创建一个分区小的主题,然后将主题中的消息按既定的逻辑赋值过去。`
分区重分配(在broker失效或集群扩容时对分区迁移)

当kafka集群扩容,增加新的broker的时候,当前主题分区并不会自动分配到加入的节点中;当有broker挂掉的时候,kafka集群也不会把失效几点上的分区副本自动迁移到集群剩余的可用broker上。这样就会影响集群的负载均衡。需要使用脚本kafka-reassign-partitions.sh执行分区重分配工作。分区重分配对集群性能影响很大,需要占用额外的资源,一般分成多个小批次执行

  1. 创建主题清单JSON文件
  2. 生成重分配方案。kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --generate --topic-to-move-json-file reassign.json
  3. 执行分配方案。kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --execute --reassignment-json-file project.json
  4. 分区重分配比较影响集群性能,尤其是在高峰时期,及时采用分小批进行也会不足以应对。可以使用限流,对复制流量加以限制。kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --execute --reassignment-json-file project.json --throttle 10复制流量上限为10b/s。重分配完成后,为了不影响kafka本身性能,需要对临时的限流配置删除,使用verify代替execute可以执行完后就删除临时限流配置,并且显示执行进度。
Kafka分区数与性能的关系

kafkaProducer是线程安全的,kafkaConsumer是线程不安全的,所以,一个主题分区的数据写入是可以并发进行的,分区中的数据读取只能被一个消费者线程消费。经过测试,分区数上升,吞吐量也上升,但大过一个阀值的时候,整体吞吐量反而下降。
kafka的分区数不是没有上线的,分区数会占用文件描述符,每个进程能够支配的文件描述符是有限的。

Kafka幂等

开启幂等需要客户端参数enable.idempotence设为true,并且kafka服务端ack设为-1。(所以开启幂等之后会降低kafka效率)
kafka为每一个生产者分配一个PID,每个分区为这个PID维护一个序列号,每发一条消息对应的<PID,分区>对应的序列号+1。所以幂等性只对单分区单会话有效
存在问题:kafka自带幂等职能只能解决product重复发送一条msg出现的消息重复问题(添加链接描述),但是不能真正解决消息上的幂等。
可以借助于redis实现消息的幂等,每次product发送msg先查询redis,判断是否发送过。

public void invoke(String value, Context context) throws Exception {

        // 1. 没有启用幂等输出
        if(!enableIdempotent) {
            producer.send(new ProducerRecord<>(topic, value));
            return;
        }

        // 2. 启用了幂等
        String idempotentKey = String.format("%s%s", idempotentKeyPrefix, MD5Util.toMD5(value));
        // 检查fusion中是否存在
        String fusionValue = FusionUtil.procRetry(jedis -> jedis.get(idempotentKey));
        // 已经输出过这条数据, 忽略
        if(fusionValue != null){
            log.info("已经输出过这条数据... idempotentKey: {}", idempotentKey);
            return;
        }

        // 没有输出过
        producer.send(new ProducerRecord<>(topic, value));
        FusionUtil.procRetry(jedis -> jedis.set(idempotentKey,""));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
Kafka事务
  • 开启事务需要应用程序指定一个事务号properties("transactional.id","transactionId"),transactionId与PID一一对应,不同的是transactionId用户指定,PID由kafka内部分配。
  • 同一个transactionId只能一个Producer使用;
  • 事务还要求开启幂等,KafkaProducer默认会将enable.idempotence设置为true,并且将acks设置为-1,所以开启事务之后也会影响kafka的效率。
  • 消费端可以设置隔离级别,isolation.level默认为read_uncommitted(可以消费未提交的事务),read_committed(只能消费提交了的事务)
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");//transactional.id由客户端提供
props.put("acks", "all");//ack设为-1,要全部落盘
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();//初始化事务

try {
    String msg = "matt test";
    producer.beginTransaction();//开启事务
    producer.send(new ProducerRecord(topic, "0", msg.toString()));
    producer.send(new ProducerRecord(topic, "1", msg.toString()));
    producer.send(new ProducerRecord(topic, "2", msg.toString()));
    producer.commitTransaction();//提交事务
} catch (ProducerFencedException e1) {
    e1.printStackTrace();
    producer.close();
} catch (KafkaException e2) {
    e2.printStackTrace();
    producer.abortTransaction();
}
producer.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
日志索引

kafka有两个索引文件,偏移量索引文件(xxx.index)和时间戳索引文件(xxx.timeindex)。偏移量索引文件建立了消息偏移量到物理地址之间的映射关系,通过索引文件可以快速定位消息位置。时间戳索引文件建立了时间戳与偏移量之间的映射关系。

  • 日志分段:默认log文件大于1G,index文件大于10M,最长时间7天滚动一次分段文件。文件名是上一个分段中offset的结束值。
    00000000000000000000.index 起始偏移量为0
    00000000000000000000.log
    00000000000000368769.index 起始偏移量为368769+1 = 368770
    00000000000000368769.log
  • 日志查找:主要有两种方式offset查找和时间戳查找。通过二分查找,先根据offset定位log文件,再根据log文件对应的index文件定位消息位置
    在这里插入图片描述
Kafka控制器

Controller主要负责管理kafka集群所有分区和副本的状态。

  1. Controller的选举:broker启动回去读取zk的/controller节点的brokerId的值,如果这个值是-1或者还不存在这个节点,就回去注册一个/controller临时节点,并在节点/controller_epoch中保存一个版本号。每个broker中保存一个当前控制器的brokerId,表示为activeController。在这里插入图片描述

  2. controller的作用:①监听分区相关变化(处理分区重分配、ISR结合变更)②监听broker相关变化。③处理主题相关变化。
    controller通过在zk的/brokers/ids和/brokers/topics等节点创建监听来管理kafka的分区、topic、broker的变化。

  3. controller的优势:早期每一个broker为了管理分区和副本都要在zk上创建大量的监听,一个分区或者副本的变化就会唤醒很多不必要的监听器。这样的设计会有造成脑裂、羊群效应以及zk过载的风险。而现在只有controller节点在zk上创建监听,所有对分区和副本的管理交由controller完成,controller发现zk上的数据变化,再告知对应的broker进行更新操作。broker只在zk上建立一个对/controller节点的监听。

分区leader选取

当创建分区(创建topic、增加分区数)或者分区上线(分区leader下线)的时候会执行leader选举
选取规则:根据AR(所有分区)的循序选取AR中第一个在ISR队列中的分区。(按照AR中的顺序不是ISR中的顺序,如果有优先副本就设置优先副本为leader)

Kafka高吞吐的原因
  1. kafka每个topic分为多个partition,每个partition尽可能的均匀分布在不同的broker(rebalance机制),所以消费者可以并发的读取多个分区的数据,提高了吞吐量
  2. 磁盘存储顺序读写:kafka采用文件追加的方式来写消息,只允许在日志文件的尾部追加新的消息,也不能修改已写入的消息(顺序写盘)。
  3. 页缓存:kafka大量使用页缓存,提高了kafka的吞吐量。(向磁盘读写数据先访问页缓存)
  4. 零拷贝:读取kafka的消息的时候,直接使用sendFile()方法,将磁盘数据拷贝到内核再通过网络发送出去,不拷贝到用户态(读取数据的时候kafka不需要对消息再进行处理,所以还将消息读取到程序中很消耗资源)。
Kafka生产端架构

Kafka生产客户端主要由主线程和sender线程来协调工作。主线程中创建消息然后经过拦截器、序列化器、分区器之后缓存到消息累加器,然后由sender线程获取消息发送给kafka集群。

  1. kafkaProducer生产的消息缓存到消息缓存器RecordAccumulator中,消息缓存器的大小通过buffer.memory参数配置,默认32M。如果生产者生产速度大于发送速度,可能会导致生产者空间不足。
  2. RecordAccumulator内部是按分区设置的多个双端队列,每个队列节点是一个producerBatch缓存了一批消息,默认大小是16k,batch.size参数指定,sender按批次拉取消息。这样将消息封装成批次发送可以减少网络请求次数,提高吞吐
  3. RecordAccumulator内部有一个BufferPool缓存池管理producerBatch空间的重用,避免重复申请、释放空间。但是只管理batch.size指定大小的producerBatch
  4. Sender线程从消息缓存区获取到producerBatch后,将其封装成一个请求进行发送,并将请求缓存到InFlightRequests中,等待服务端响应,如果失败可以重试。
  5. InFlightRequests中的Deque<Request>大小默认是5,可以通过Deque<Request>size判断对应node节点是否积压很多未响应的消息。
    在这里插入图片描述
Kafka多线程消费实现

KafkaProducer是线程安全的,但是KafkaConsumer是线程不安全的。如果生产者生产消息的速度远大于消费者消费消息的速度,就会造成消息的积压,积压消息可能达到时限就会被清除,造成消息的丢失。所以可以使用多线程提高消息消费能力。

  1. 线程封闭:一个线程实例化一个KafkaConsumer,一个线程对应一个或多个分区,所有消费线程隶属于一个消费组。并发度限制于分区数,但是offset的维护比较方便。
    在这里插入图片描述
  2. 拉取消息单线程,处理消息模块多线程:这种模式消息的消费不受分区数的限制,但是对offset的处理比较困难。需要引入一个共享变量offset 参与提交,每一次KafkaConsumerThread使用poll()拉取数据之后都要对offset的内容进行提交,并且在写入offset的时候要注意位移覆盖的问题。
    在这里插入图片描述
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/643236
推荐阅读
相关标签
  

闽ICP备14008679号