赞
踩
- broker:位于Kafka架构中间位置的Kafka Cluster,用于落地存储消息,由多个server组成
- topic:可以理解为一个队列,生产者和消费者面向的都是topic
- producer:消息生产者,向kafka broker发消息的客户端
- consumer:消息消费者,向kafka broker取消息的客户端
- Consumer Group:消费者组内每个consumer消费固定分区,且一个分区只能有一个consumer消费(乱序)。消费者组之间互不影响,组间消费数据可重复。
- Partition:一个topic可以有多个分区,分布在不同的broker上。主分区用于接收和发送数据,副分区用于同步数据,当主分区宕机出现异常时,通过选主,从其他代理选择主分区。为了吞吐量,一个broker只放一个主分区,这样可以并法读写。
- Replica:一个topic的每个分区有若干个副本,即一个broker leader和多个follower。
- message:每条消息按顺序写到partition,并标识了一个递增序号代表其进来的先后顺序。
kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由message组成。每个message都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。
这样message就通过<partition_id + id>的方式存储起来。
上面的Id称为kafka的offset,这样的存储方式有如下好处:
- 消费者可根据offset灵活消费;
- 线程安全:每个consumer都保留自己的offset,消费时互不干扰,不存在线程安全问题;
- 消费访问的高效性:每个topic中的消息被组织成多个分区,消息在生产和消费时,一个线程消费指定partition,减少竞争增加了程序的并行能力。
- 集群的可拓展性:通过分区策略将message分配到不同的分区(一个分区对应一个broker),当broker负载满时,通过扩容将消息重新(?)均匀分配。
- 持久化策略:可通过指定时间段来指定消息的过期时间
- 高可用:多个broker组成cluster集群实现了partiton的高可用。
一般的消息系统存在两种消费模型:
- push:优势在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和情况,容易导致producer压垮consumer。
- pull:优势在可以控制消费速度和消费数量,保证consumer不会出现饱和。劣势在于当没有数据,会出现空轮询,消耗cpu。
pull的模式
kafka 采用pull的模式,并通过可配置化参数,保证当存在数据且到达一定量时,consumer才能进行pull操作,否则一直处于block的状态。
默认采取的是Range策略
消费条件
- 同一个consumer Group里消费者的num.streams(线程数)必须相等(消费能力保持一致,或者都设为1)
- 每个消费者订阅的主题必须相同(到达均匀分配的效果)
工作原理:
- 将某个topic所有的分区组成TopicAndPartition列表,列表进行hash排序,轮询分配到不同的消费者。
特点:
- 优点:不同消费者最多差一个分区的消费,当每一个consumer订阅的Topic一样时,推荐使用
- 缺点:消费者消费到不属于自己计算的分区数据
分区划分逻辑:
通过计算:
分区总数/消费者总数
来分配每个消费者消费多少个分区,当有余数时,前面的几个消费者线程将会多消费一个分区。
例子:
有十个分区,三个消费者线程,10/3=3—1,那么消费者线程C1-0 将会多消费一个分区
弊端:
个别消费者线程多消费分区,随着Topic的增多,每个consumer消费的分区数差距越来越大。
kafka通过consumer position记录每个分区的消费状态(
消费者组 + 主题 + 分区 + offset
确定消费位置),当消费完成时,broker收到确认,consumer position指向下次消费的offset。因为消费不会删除,consumer position更新之后,consumer仍然可以重置offset重新消费历史消息。
历史问题
kafka0.9版本之前,consumer默认将offset保存在Zookeeper。但会有以下问题:
- 每次消费时都要将数据的offset写入一次,效率比较低
- Zookeeper与kafka的offset变化确认也需要走IO,给offset的维护带来不稳定性和低效,已经成为kafka性能瓶颈。
现在的版本
kafka0.9版本之后,consumer 默认将 offset 保存在 Kafka 一个内置的 topic :__consumer_offsets
,通过使用内部的broker来管理,将topic提交给__consumer_offsets函数来执行。
更新offset的具体设置
- 自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,消费者fetch到消息后就可以更新offset,无论是否消费成功。
- 手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender线程。
缓存的大小可以通过生产者客户端参数
buffer.memory
配置,默认值为32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数
max.block.ms
的配置,此参数的默认值为 60000ms。
batch.size
配置。
大致逻辑
当一条消息( ProducerRecord ) 流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord, 如果可以写入, 不可以则新建一个 Producer Batch 。
主要作用是从 RecordAccumulator 获取消息并将其发送到 Kafka 中。
对于 KafkaProducer 的应用逻辑而言,主线程只关注向哪个分区中发送哪些消息;
而对于网络连接来说,生产者客户端是与具体 broker 节点建立的连接,而并不关心消息属于哪一个分区,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。
Sender 从 RecordAccumulator 获取缓存的消息之后,会进一步将<分区,Deque>的形式转变成<Node,List< ProducerBatch>的形式,其中 Node 表示 Kafka 集群 broker 节点。
转换之后,Sender 会进一步封装成<Node,Request> 的形式, 这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是 Kafka 各种协议请求。
请求在从 sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map<Nodeld, Deque>,它的主要作用是缓存 已经发出去但还没有收到服务端响应的请求。
最多缓存请求数:max.in.flight.request.per. connection。默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response )。
通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继其发送请求会增大请求超时的可能。
直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性两者不可兼得,只能平衡。
ack有3个可选值,分别是1,0,-1。
ack=1
- producer只要收到一个分区副本(leader)成功写入的通知就认为推送消息成功了。
- ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。
ack=0,producer发送一次就不再发送了,不管是否发送成功。
ack=-1,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
数据的精准写入的逻辑:idempotent(幂等性) + at least once = exactly once。
幂等性: 指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。
设置Producerenable.idompotence=true
启用幂等性。
基本逻辑:
producer启动时会分配一个PID,数据发往partition的消息会附带一个Sequence Number,这时broker会对消息做一个主键缓存<PID, Partition, SegNumber>,当具有相同主键的消息再次提交时,broker只会持久化一条。
幂等性的要求:
需要保证在同一个会话中,当producer重启时,PID就会变化。且因为不同分区的PartitionID,也会导致相同数据可能存到不同分区,所以幂等性是无法保证垮分区的精准一次。
LEO代表每个副本的最后一个offset
HW代表所有副本中最小的LEO
数据可见:HW之前的数据才对Consumer可见,这是可靠性的体现。
多副本数据同步逻辑:
- 当副本数为N时,代表1个leader,N-1个followers,数据写入leader后,follower开始从leader拉取数据进行同步;
- leader会维护一个ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护;
- 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除;
- 当ISR中所有Replica都向Leader发送ACK时,leader才commit。
容错逻辑:
当follower状态变为非同步中时,系统从ISR列表中选举新leader。
当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入 ISR。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。