赞
踩
broker: 存储消息的机器
(1)使用zookeeper, 除了提供一般的broker功能之外,还负责选举分区首领。通过在zookeepr中创建一个名为 /controller的临时节点称为 controller。每个选出的controller都会有一个递增的epoch。
(2)使用 KRaft,通过Kraft来选举,所有的元数据都存储在 metadata.log.dir 目录下。通过pull从controller获取信息
消息通过主题分类。类似数据库的表或文件系统的文件夹
一个主题可以被分为若干个分区。消息无法在主题内保证有序,但可以在单个分区内有序。
为了保证可靠性,一个分区可以有多个副本。其中分为leader副本和follower副本。所有副本统称为AR(assigned replicas),所有与leader副本保持一定程度同步的副本(包括leader)组成ISR(in-sync replicas)。follower滞后leader指定的时间时,则会退出 ISR。(不可读,只当做备份作用);就是从leader副本拉取消息,如果持续拉取速度慢于leader副本写入速度,慢于时间超过replica.lag.time.max.ms后,它就变成“非同步”副本,
标识一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息。比如HW=6, 只能拉取到 0~5的消息
标识当前日志文件中下一条待写入的消息的offset
很多时候也会被称之为消息堆积量。 当事务是read-committed, Lag=LSO – ConsumerOffset, 否则等于 Lag=HW-ConsumerOffset
LSO 的值等于事务中第一条消息的位置(firstUnstableOffset,如上图所示),对已完成的事务而言,它的值同 HW 相同
消费组
Broker的controller协调者选举
(1) zookeeper模式,通过创建一个 /controller的临时节点来选举。
(2) KRaft模式,通过raft算法来选举。follower通过pull的方式拉取leader的日志。
副本的leader选举
首先选出broker的controller节点,然后节点会从分区的 ISR中选举第一个follower副本为leader副本。
事务处理流程如下:
根据transactionId的哈希值计算主题 _transaction_state中的分区编号,再找到此分区leader副本所在的broker节点。
(1)read_uncommitted。默认的事务级别。
(2)read_committed。
“消--处理—生产”的模式。即从源主题读取消息,然后对消息进行一些处理,再将结果写入到另一个主题。
启动了幂等生产者,每条消息都将包含生产者ID(PID)和序列号。在leader副本的broker会保存map<pid,分区>维护一个序列号。
幂等生产者只能防止由生产者内部重试逻辑引起的消息重复。
每个生产者会被分配一个ProducerId(PID),SeqNumber
然后生产者端和Broker端都有<PID,PartitionID> SeqNumber 的映射关系
生产者每发送一条消息后就将对应的分区序列号加一
broker端会比较序列号,如果new Sq < old Sq+1,抛弃这条数据。说明它已经是过期的了如果new Sq > old sq +1,说明有消息丢失了。对生产者抛出异常
在分配PID时,会分配epoch,新的生产者就会加1,如果出现了两个同样的生产者PID一样,取epoch最大的那个。
轮询策略(默认分区策略)
随机策略
按键保存策略。
其中轮询策略是默认的分区策略,而随机策略则是较老版本的分区策略,不过由于其分配的均衡性不如轮询策略,故而后来改成了轮询策略为默认策略。
确定好消息的topic+partition后,直接发送给对应leader副本所在的broker。消息的可靠性主要通过acks参数来配置,主要有以下三种:
(1)acks=0, 生产者发送消息后不需要等待任务服务端的响应。吞吐性能最好,可靠性最差。
(2)acks=1, 只要分区的leader副本成功写入消息即可。消息的可靠性和吞吐量折中
(3)acks=-1或all,生产者在发送消息后,需要等待ISR中的所有副本都成功写入消息之后才能收到来自服务端的成功响应。但因为可能ISR中只有leader副本,也会导致消息不可靠。更高可靠性需要配置 min.insync.replicas,指定写入的最小ISR的数量,如果没有达到则会发送失败。
groupCoordinator和comsumerCoordinator之间的逻辑如下:
(1)每一个broker都有coordinator(辅助实现消费组的初始化和分区的分配),根据groupId进行哈希取模得到选举那个coordinator对消费组进行管理
(2)消费者向负载最小的节点请求获取到groupCoordinator, 然后连接到groupCoordinator节点并发送JoinGroup请求。
(3)groupCoordinator为消费组选举一个消费组的leader。如果没有消费组leader,则加入group的第一个消费者为leader。如果消费组leader挂了之后,会比较随意的选举一个。
(4)消费者leader从选举出的分区分配策略来实施具体的分区分配(逻辑下面会介绍)
(5)groupCoordinator发送SyncGroupRequest请求来同步分配方案给各个消费者。
(6)每个消费者都会和groupCoordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发平衡
总结了一下:
(1)find_coordinator
(2)join_group
(3)sync_group
(4)hearbeat
1.RangeAssignor分配策略
按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
2.RoundRobinAssignor分配策略(默认的分区策略)
将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。
3.StickyAssignor分配策略
目的:(1)分区的分配尽可能均匀(2)分区的分配尽可能与上次分配的保持相同
4.自定义分区策略
(1)收集各个消费者支持的所有分配策略,组成候选集candidates
(2)每个消费者从candidates中找出第一个自身支持的策略,为这个策略投一票
(3)计算candidates中各个策略的选票数,选票数最多的策略即为当前消费者的分配策略
(1)有新的消费者加入消费组
(2)有消费者宕机下线。
(3)有消费者主动退出消费组
(4)消费组所对应的groupCoordinator节点发生了变更
(5)消费组内所订阅的任意主题或者主题的分区数量发生了变化
(1)自动提交
(2)提交当前偏移量
(3)异步提交。api只管提交偏移量,无须等待broker做出响应。
(4)提交特定偏移量
底层存储数据
流式处理
linux操作系统 “零拷贝” 机制使用了sendfile方法, 允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。示意图如下:
为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存
Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。
Kafka数据读写也是批量的而不是单条的。
如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
大概读写的QPS:
20W
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。