赞
踩
1)组成:zookeeper中存放的信息brokerId,topic,消费者相关的信息(高版本中消费者信息存放在kafka集群内部),没有存放生产者相关信息。
2)kafka一般安装多少台:2*(生产者峰值生产速率副本/100)+1=3
3)压测:生产者峰值生产速率
4)副本:默认1 ,一般2-3个副本。副本越多可靠性越高,但会增加磁盘IO,效率低下
5)数据量问题:
100万日活 没人100条 100W100=1亿
一条日志:0.5-2K~1K
1K1亿=100g
平均速度速度:1亿条/(243600S)=1150条/秒
1150条/秒1k=1m/s
峰值速度:?
6)kafka数据保存时间
默认保存7天,生产环境一般保存3天
7)磁盘预留多大空间
100g2副本*3天/0.7
8)kafka监控
kafkamanager KafkaEagle
9)分区: 合适的分区提高kafka并发度
先设置一个分区;进行测试生产者峰值生产速率tp,消费者消费的峰值速率tc
分区数=t/min(tp,tc) t:kafka希望达到的吞吐量预期
一般3-10个
10)分区分配策略
1.Range(默认)
10个分区 3个消费者线程
0 1 2 3 容易发生数据倾斜
4 5 6
7 8 9
2.RoundRobin
先将分区hash打散,再轮询
11)ISR
主要用于选举leader挂了,选举新的leader;在ISR队列中的都有机会选上
老版本中:延迟时间,延迟条数;
新版本:延迟时间(及leader向follower发送数据同步时,follower向leader响应(ack)的时间,如果follower一段时间没有响应,就会被踢出ISR)
短时间:数据会存储在上游flume channel中(Producer)
长时间:日志服务器有备份,一般保存30天,可以采集重新跑一遍
kafka中有ack应答机制保证
0 生成者只管发送数据,不等broker落盘,传输效率高,可靠性较低
1 等待leader应答;不用等follower应答,可能leader接收数据落盘之后,还没有吧数据发送给follower,leader就挂掉了,数据还是可能会丢失;可靠性一般,传输效率一般
-1 leader+follower共同应答;可靠性最高,效率最低。follower是ISR队列中的follower
生产环境下:
一般不选0,
1 一般传输普通日志,对可靠性要求不高
-1 一般对可靠性要求比较高的场合下,如涉及钱的金融场景
Broker丢数据
通过ack机制来保证
Product丢失数据
为了提高吞吐,kafka一般将数据缓存在product的buffer中,然后批量发送。因此,基于buffer可以实现异步发送,提高效率,product通过callback来处理异常和发送失败的情况。但是如果product非法停止,则buffer中的数据就会丢失。异或是,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
根据上图,可以想到几个解决的思路:
1 kafka自身特性解决
1)幂等性:单分区单会话内
kafka在一个分区中维护一个id列表,producer发送的数据会按kafka中的id去重,一旦夸会话就会失效(Producer挂掉再重启就无法保证)
2)事务
3)ack =-1
幂等性+事务+ack=-1 用的比较少,效率低,容易产生数据积压
2下一层处理
hive的dwd group by去重
sparkStreaming、redis去重
1 增加分区 同时增加下一级消费者的CPU核数(sparkstreaming)
2 增加下一级消费者消费速度
flume sparkStreaming,增加 batchsize 1000event/s=>2000event/s
1kafka高效读写数据
1)kafka是集群模式,多点同时工作
2)可以设置分区,并发读写
2底层采用顺序读写,读写速度可以达到600m/s,比随机读写(100m/s)快
3采用了零拷贝技术
https://editor.csdn.net/md/?articleId=108515004
4 kafka单条日志传输大小
kafka消息体大小默认单条最大值是1M,如果消息大于1M则会出现生产者无法推送数据或消费者无法消费数据(kafka卡死)
需要进行配置
5 故障恢复细节
1)leader故障
follower挂掉后,会从ISR中选举一个新的leader,为了保证多个副本之间数据的一致性,其他的follower会将各自log文件中高于HW的部分截取掉,然后从新的leader中同步数据
2)follower故障
follower挂掉后会被踢出ISR,等该follower恢复后,会读取本地磁盘记录的上次的HW,将log文件中高于HW的部分截取掉,再向leader同步数据(同步完成之前处于失效状态),等该follower的LEO大于等于Partition的HW(及follower追上leader之后),就可以重新加入ISR
6 zookeeper在kafka中的作用
_consumer_offsets
主题中。对于像zk、redis集群都支持读写分离(主写从读),这样可以帮助leader分担读取数据的压力。那为什么kafka不支持读写分离呢?
1)延时问题:leader写入的数据需要向follower同步,向redis的数据同步需要经过主节点网路->主节点内存->从节点网络->从节点内存的环节,而kafka同步时间更长,需要落盘,主节点网路->主节点内存->主节点磁盘->从节点网络->从节点内存->从节点磁盘。
2)kafka的主写主读模式实现了负载均衡:kafka对每一个topic进行了分区,在各分区leader均匀分布在不同broker上时,可以达到负载均衡。
选举策略:kafka集群启动的时候各个broker会向zk注册一个临时节点/controller,注册成功的broker成为集群的leader,其他broker发现节点已经存在,就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。
controller的作用:
1)如果有一个broker退出了,controller会去检查这个broker是否有分区leader,如果有,那么这个分区需要选取一个新的leader。controller遍历所有副本选出新leader,同时更新分区ISR队列。
2)如果有一个broker加入了,controller会根据其BrokerID去判断其是否有现有分区的副本,如果有,就要去分区leader同步数据。
replica.lag.time.max.ms
配置。auto.leader.rebalance.enable
为true,集群会检查首选首领是否是当前分区的leader,如果不是,会触发选举,将首选首领选举为leader。每个消费者组在服务端都有一个组协调器GroupCoordinator
对其进行管理,客户端有一个ConsumerCoordinator
消费者协调器与组协调器进行通信。在GroupCoordinator
中会为消费者组选举出一个leader
。这个leader
负责该消费者组的分区分配的实施。
同一个消费者组内的消费者不能消费同一个主题的分区,但一个消费者可以消费多个分区。不同消费者组的消费者可以消费同一个主题的分区。
消费者组的优势:降低的消费者的压力,如果没有消费者组的话,一个topic的所有分区都要被一个消费者消费,消费压力增大。使用消费者组,组内每个消费者分配到的分区比较均匀,达到了平衡消费压力的效果。
消费者重平衡的条件主要有三个:订阅的topic数量发生、topic的分区数量发生变换、消费者组的消费者数量发生变化。其中消费者组的成员数量发生变化主要有三种形式:新成员加入、组成员主动离开、组成员崩溃。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
消费者崩溃一般是与broker的心跳超时或者消息处理时间超时造成的。
kafaka 消费者配置的四个参数
当session.timeout.ms或者max.poll.interval.ms超时的时候都会触发consumer group的重平衡。
如果数据处理逻辑比较重,从而造成消费处理超时的话,就会发生rebalance。rebalance时会预留一端时间给consumer上报自己的offset信息。但是如果当前数据还没处理完,就发生rebalance的话,就会造成offset commit失败,下次poll会拉取到旧的数据(重复消费),因此要保证好消息处理的幂等性。
对于 rebalance 类问题,简单总结就是:处理好心跳超时问题和消费处理超时问题。
topic.metadata.refresh.ms
制定个时间后,随机选择一个分区,然后这段时间消息都发往这个分区,默认时间是10min重新随机选择一个分区。使leader副本均匀的分布到broker上,使分区均衡。但是分区均衡并不意味着负载均衡(分区均衡,数据在分区上还要均匀分区等)。
auto.leader.rebalance.enable
,参数默认为true
。kafka集群会定时轮询所有broker节点,计算分区不均衡率,使分区均衡。生产环境不要开启此功能,如果在高峰时期发送分区再均衡,会引起性能影响。kafka-preferred-replica-election.sh
脚本,在特定时间分批手动进行分区平衡。(kafka-preferred-replica-election.sh --zookeeper hadoop102:2181 --path-to-json-file election.json
)当消费者组发生变化(消费者增加、退出、宕机)或者消费者组对应的topic发生变化(分区变化)时,会触发kafka分区的再均衡,重新分配分区(再均衡期间,消费者组内的消费者无法读取数据)。可以使用再均衡监听器,避免均衡后发生的消息重复消费问题。
ConsumerCoordinate
(消费者协调器)先向集群发送一条FindCoordinatorRequest
请求,找到其要加入的消费者组对应的GroupCoordinator
(组协调器)所在的brokerGroupCoordinator
发送JoinGroupRequest
请求加入消费者组,并为其生成一个唯一标志member_id
。如果此时消费者组没有leader或leader退出,会为其选一个leader。服务端根据消费者组中各个消费者的分区分配策略选出消费者组的分区分配策略,并返回给消费者。leader
根据分区分配策略选出分区分配方案,将方案提交给GroupCoordinator
。GroupCoordinator
将返回的消费组的元数据存入_consumer_offsets
主题中,并向消费者返回其各自消费的分区。ConsumerCoordinate
(消费者协调器)向GroupCoordinator
发送心跳来维持他们与消费组的从属关系,如果GroupCoordinator
长时间没有接收到心跳,就会认为该消费者退出,触发重分区。kafka-reassign-partitions.sh
进行分区重分配当kafka集群扩容,增加新的broker的时候,当前主题分区并不会自动分配到加入的节点中;当有broker挂掉的时候,kafka集群也不会把失效几点上的分区副本自动迁移到集群剩余的可用broker上。这样就会影响集群的负载均衡。需要使用脚本kafka-reassign-partitions.sh
执行分区重分配工作。分区重分配对集群性能影响很大,需要占用额外的资源,一般分成多个小批次执行。
kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --generate --topic-to-move-json-file reassign.json
kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --execute --reassignment-json-file project.json
kafka-reassign-partitions.sh --zookeeper hadoop102:2181 --execute --reassignment-json-file project.json --throttle 10
复制流量上限为10b/s。重分配完成后,为了不影响kafka本身性能,需要对临时的限流配置删除,使用verify
代替execute
可以执行完后就删除临时限流配置,并且显示执行进度。kafkaProducer是线程安全的,kafkaConsumer是线程不安全的,所以,一个主题分区的数据写入是可以并发进行的,分区中的数据读取只能被一个消费者线程消费。经过测试,分区数上升,吞吐量也上升,但大过一个阀值的时候,整体吞吐量反而下降。
kafka的分区数不是没有上线的,分区数会占用文件描述符,每个进程能够支配的文件描述符是有限的。
开启幂等需要客户端参数enable.idempotence
设为true,并且kafka服务端ack设为-1。(所以开启幂等之后会降低kafka效率)
kafka为每一个生产者分配一个PID
,每个分区为这个PID
维护一个序列号,每发一条消息对应的<PID,分区>
对应的序列号+1。所以幂等性只对单分区单会话有效。
存在问题:kafka自带幂等职能只能解决product重复发送一条msg出现的消息重复问题(添加链接描述),但是不能真正解决消息上的幂等。
可以借助于redis实现消息的幂等,每次product发送msg先查询redis,判断是否发送过。
public void invoke(String value, Context context) throws Exception { // 1. 没有启用幂等输出 if(!enableIdempotent) { producer.send(new ProducerRecord<>(topic, value)); return; } // 2. 启用了幂等 String idempotentKey = String.format("%s%s", idempotentKeyPrefix, MD5Util.toMD5(value)); // 检查fusion中是否存在 String fusionValue = FusionUtil.procRetry(jedis -> jedis.get(idempotentKey)); // 已经输出过这条数据, 忽略 if(fusionValue != null){ log.info("已经输出过这条数据... idempotentKey: {}", idempotentKey); return; } // 没有输出过 producer.send(new ProducerRecord<>(topic, value)); FusionUtil.procRetry(jedis -> jedis.set(idempotentKey,"")); }
properties("transactional.id","transactionId")
,transactionId与PID一一对应,不同的是transactionId用户指定,PID由kafka内部分配。enable.idempotence
设置为true,并且将acks
设置为-1,所以开启事务之后也会影响kafka的效率。isolation.level
默认为read_uncommitted
(可以消费未提交的事务),read_committed
(只能消费提交了的事务)Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "ProducerTranscationnalExample"); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "test-transactional");//transactional.id由客户端提供 props.put("acks", "all");//ack设为-1,要全部落盘 KafkaProducer producer = new KafkaProducer(props); producer.initTransactions();//初始化事务 try { String msg = "matt test"; producer.beginTransaction();//开启事务 producer.send(new ProducerRecord(topic, "0", msg.toString())); producer.send(new ProducerRecord(topic, "1", msg.toString())); producer.send(new ProducerRecord(topic, "2", msg.toString())); producer.commitTransaction();//提交事务 } catch (ProducerFencedException e1) { e1.printStackTrace(); producer.close(); } catch (KafkaException e2) { e2.printStackTrace(); producer.abortTransaction(); } producer.close();
kafka有两个索引文件,偏移量索引文件(xxx.index)和时间戳索引文件(xxx.timeindex)。偏移量索引文件建立了消息偏移量到物理地址之间的映射关系,通过索引文件可以快速定位消息位置。时间戳索引文件建立了时间戳与偏移量之间的映射关系。
Controller主要负责管理kafka集群所有分区和副本的状态。
Controller的选举:broker启动回去读取zk的/controller节点的brokerId的值,如果这个值是-1或者还不存在这个节点,就回去注册一个/controller临时节点,并在节点/controller_epoch中保存一个版本号。每个broker中保存一个当前控制器的brokerId,表示为activeController。
controller的作用:①监听分区相关变化(处理分区重分配、ISR结合变更)②监听broker相关变化。③处理主题相关变化。
controller通过在zk的/brokers/ids和/brokers/topics等节点创建监听来管理kafka的分区、topic、broker的变化。
controller的优势:早期每一个broker为了管理分区和副本都要在zk上创建大量的监听,一个分区或者副本的变化就会唤醒很多不必要的监听器。这样的设计会有造成脑裂、羊群效应以及zk过载的风险。而现在只有controller节点在zk上创建监听,所有对分区和副本的管理交由controller完成,controller发现zk上的数据变化,再告知对应的broker进行更新操作。broker只在zk上建立一个对/controller节点的监听。
当创建分区(创建topic、增加分区数)或者分区上线(分区leader下线)的时候会执行leader选举
选取规则:根据AR(所有分区)的循序选取AR中第一个在ISR队列中的分区。(按照AR中的顺序不是ISR中的顺序,如果有优先副本就设置优先副本为leader)
Kafka生产客户端主要由主线程和sender线程来协调工作。主线程中创建消息然后经过拦截器、序列化器、分区器之后缓存到消息累加器,然后由sender线程获取消息发送给kafka集群。
RecordAccumulator
中,消息缓存器的大小通过buffer.memory
参数配置,默认32M。如果生产者生产速度大于发送速度,可能会导致生产者空间不足。RecordAccumulator
内部是按分区设置的多个双端队列,每个队列节点是一个producerBatch
缓存了一批消息,默认大小是16k,batch.size
参数指定,sender按批次拉取消息。这样将消息封装成批次发送可以减少网络请求次数,提高吞吐RecordAccumulator
内部有一个BufferPool
缓存池管理producerBatch
空间的重用,避免重复申请、释放空间。但是只管理batch.size
指定大小的producerBatch
producerBatch
后,将其封装成一个请求进行发送,并将请求缓存到InFlightRequests
中,等待服务端响应,如果失败可以重试。InFlightRequests
中的Deque<Request>
大小默认是5,可以通过Deque<Request>
的size
判断对应node
节点是否积压很多未响应的消息。KafkaProducer是线程安全的,但是KafkaConsumer是线程不安全的。如果生产者生产消息的速度远大于消费者消费消息的速度,就会造成消息的积压,积压消息可能达到时限就会被清除,造成消息的丢失。所以可以使用多线程提高消息消费能力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。