赞
踩
拥抱变化接手了 Kafka 平台,遂学习 0.10.0 线上版本的设计与实现。限于篇幅,本文不会逐行解析源码,而是从逻辑流程、设计模式、并发安全等方面学习各组件,笔记仅供个人 Review
参考文档 #producerconfigs,部分配置间会相互影响,如下:
|
乱序情况:
|
实现
producer 有 20+ 配置项,配置模块需对用户给定的Map<String, Object>, Properties
等键值对象,进行配置值的类型检查、有效性检查、默认值填充等处理,得到有效配置。配置模块:
ConfigDef
:记录用户原始配置、解析后的配置,并提供 util 方法按类型读取配置值,特别地,define 系列方法都返回 this 以实现链式调用,类似于 Builder 模式
ProducerConfig.CONFIG
单例:静态常量,并在 static 代码块中定义,故在类加载阶段就会被初始化,是典型的单例模式应用,类似单例还有枚举实现的 protocol.ApiKeys,protocol.Errors
等等
Configurable
接口规范类的反射构造行为:用户类(Partitioner, Serializer, Interceptor…)配置值都是字符串,反射实例化时构造方法可能无参或有参但类型不定,故抽象出Configurable
接口,无参构造完成后接收配置值,进一步实例化
|
如下 demo 展示了两种发送消息的方式
send 异步发送:返回类型为 RecordMetadata 的 Future 对象,调用 get 等待消息的发送结果
callback 异步发送:实现 Callback 接口,异步处理发送结果
|
底层发送流程如下,之后章节将逐一解析各个组件:
用户类可线程安全地实现ProducerInterceptor
接口(限制不能抛异常),用在
消息发送前修改内容:如统一添加 msg uuid
在返回前读取元数据:如记录异常信息,tracing 日志等
支持组合多个拦截器,批量链式调用,但不提倡,因为链式调用时若某个拦截器抛出 unchecked 异常,捕捉后仅记录日志不抛回 send 调用方,用户无法感知
|
发送消息的 Key 和 Value 统一用byte[]
描述,实现二进制安全,故用户类需实现Serializer
接口;serialization 模块已内置了基础数字类型、String 的序列化实现,consumer 反序列化 Deserializer 接口同理
|
用户实现Partitioner
来决定每一条消息要发往哪个分区
|
默认分区器DefaultPartitioner
选择分区的流程:
至此,分析了发送前消息的拦截修改、键值序列化、确定分区的逻辑
职责:内存池资源管理,消息压缩与 batch 分批,发送结果的异步计算
buffer.memory
配置限制了 producer 缓冲消息所能使用的最大内存,默认启用 batch 机制后,消息常以batch.size
大小分批发送,故设计内存池重用 batch 内存
内存池划分为 3 个区域:
free 双端队列:元素为batch.size
大小的内存块,消息发送成功后 clear 入队重用;有 2 个特性
惰性分配:队列初始状态为空,后续分配的 batch “不规整内存块” 使用完毕则入队
动态回收:若 availableMemory 内存不足,但 free 队列有空闲内存块时,会逐个出队释放内存(GC)
不规整内存块:若 batch 被禁用,或发送大小在(batch.size, max.request.size]
范围的大消息,会直接分配该大小的一次性 ByteBuffer,使用完毕后由 GC 回收
空闲内存:非内存块实体,只是统计值,在分配和释放不规整内存块时对应增减库存
内存不足时,调用send()
的多个用户线程都会 await 阻塞在各自的条件变量上,内存池采用先到先得的策略,当有内存可用时只会 signal 唤醒入队最早、等待时间最长的线程,避免了线程饥饿或多线程低效竞争。示意图:
如上 thread_0 被唤醒后,会收集该可用内存,若内存已足够则恢复运行并唤醒 thread_1,否则继续等待
消息批次用 RecordBatch 描述,维护消息重发、future 结果等元信息,消息实际存储在底层的 MemoryRecords 缓冲区,并使用 Compressor 进行压缩
1)类加载
producer 支持三种压缩方式:gzip, snappy 和 lz4,但只有 gzip 由 java.util.zip
JDK 标准库实现,其他 2 种压缩类需添加 jar 包,在运行时反射加载,不使用时能减少包体积;同时为保证 producer 全局只会反射构造出一个 Constructor,用到了懒加载的 DCL 单例模式
2)封装 batch
每种算法都有预期压缩比,如 gzip 是 50%;在 MemoryRecords 视角,将 16KB batch 传给 Compressor 后,实际至多写入 32KB 数据,由于压缩比不精确,Compressor 要有动态扩容的能力,以容纳更多压缩消息;实现:
|
3)压缩实现
实现了两层装饰模式:为 ByteBuffer 装饰了自动扩容功能,为各种类型数据的 put 操作装饰了压缩写:
特别地,putRecord()
带压缩地写入一条消息,写入后的内存结构:
更正:magic number 为 1 时带 timestamp,为 0 则无 timestamp 字段
缓冲区写满后触发close()
,Compressor 会倒回至 ByteBuffer 的初始位置,插入一条 Shadow Record 补全元数据,将其后所有压缩后的 Real Records 视为其 key 的值,内存结构:
TODO: 勘误画图,压缩后的 RealRecords 放的是 value 字段
注意,close()
方法中会动态调整压缩比,即压缩比是自适应的
4)写满判断
判断 MemoryRecords 是否已满,是通过估算 Compressor 压缩后字节数实现的,估算逻辑:
|
负责委托 Compressor 追加写 Record,为其添加 LOG_OVERHEAD 头信息
|
当缓冲区满后会切换为只读模式,等待 drain 选中发出
|
基于以上 2 个组件,RecordBatch 实现了三个机制:
1)委托 MemoryRecords 追加写 Record,并将各 Record 的元数据 future 与 batch 写结果相关联
|
2)batch 发送结束后,发送结果 baseOffset 和 exception 会被填充到 ProduceRequestResult,通知各 Record 的 FutureRecordMetadata,唤醒阻塞在 get() 调用上的用户线程:
此处多线程需等待单线程执行结果,用 CountDownLatch 模拟实现了 Future 接口:
|
3)维护 batch 的重发状态
attempts:已重试次数,在retries
内都会重试
lastAttemptMs:上次重试发送的时间戳,配合retry.backoff.ms
避免频繁重试
accumulator 维护各 topic partition(tp)的 batch 队列,结构如下:
|
producer 的 send 操作只将消息放入对应的 RecordBatch 中即返回:
accumulator 还负责对各 tp 的 batch 队列进行 rollover,在实现上有两个优化亮点:
(1)细粒度锁,提高可用性:创建新 batch 时需阻塞申请内存,会主动放弃 dq 互斥锁。示意图:
问题:A 持有 dq 锁,阻塞申请内存,会导致虽然内存够,但 B 也必须等待
解决:当锁范围内有耗时操作时,考虑拆为细粒度锁,减少锁的持有时间
(2)解决并发 rollover 的内存碎片问题:细粒度锁的副作用是引入了新的并发竞争
问题:A,B 同时发送大消息,都创建新 batch 后都入队,先入队的 batch 不再被使用,剩余内存将被浪费
解决:创建完新 batch 后不着急使用,先尝试写入 last batch,若写入成功则释放新 batch
缓存消息的实现:
|
至此,分析了消息的内存分配、压缩写入、batch 读写模式切换、batch rollover 等机制
职责:维护连接状态,执行四种网络 IO 并收集结果
producer 网络层使用了 NIO Selector 机制,内部各组件关系如下,逐个分析
封装 SocketChannel 的读写,提供注册事件的快捷方法
|
读缓冲 NetworkReceive 类:从 SocketChannel 中拆包,持续读取一个完整的响应
写缓冲 Send 接口:由 RequestSend->NetworkSend->ByteBufferSend
装饰链,将一个完整的请求封包,持续写入 SocketChannel
KafkaChannel 只维护一个读缓冲、一个写缓冲,并提供对应的read, write
方法读写 TransportLayer
在 NIO Selector 上封装了网络 IO 的单次执行 4 种结果,对应了 4 种 IO 事件:
|
执行网络 IO:
|
结果收集完毕后,通过 List<Send> completedSends()
等多个对应方法暴露给上层的 NetworkClient
负责统一网络 IO 结果并解析响应,维护发往各个节点的有序请求队列,维护与各节点单一连接的状态;持有 2 个子组件:
(1)ClusterConnectionStates:连接状态管理
client 与每个 broker 都只会保持一条 TCP 连接,而非维护一个连接池,以简化消息有序性的实现。连接有三种状态,并由NodeConnectionState
维护重连信息,由ClusterConnectionStates
持有整个集群的连接状态
|
(2)InFlightRequests:各节点的有序请求队列
client 发往各节点的请求都会对应入队 InFlightRequests 暂存,以实现三个功能:
有序收发:收到响应的顺序,必须与发出请求的顺序保持一致
请求超时检测:发出的请求在request.timeout.ms
时限仍未收到响应,向用户返回连接异常
请求数限制:限制单个节点(连接)并发请求数,不超过max.in.flight.requests.per.connection
配置,配置设为 1 以实现消息的绝对有序性
|
此外,还负责读取网络 IO 结果,执行协议解析并汇总成 Response pipeline,逐个调用 protocol handler:
|
至此,分析了底层 SocketChannel 的读缓冲拆包、写缓冲封包,中间层 NIO Selector 四种网络 IO 事件的处理及单次结果收集,上层 NetworkClient 维护连接状态及 IO 结果处理等机制
职责:筛选出待发送的 batch,构造请求,处理发送结果,重发消息,维护集群元数据
(1)协议描述
参考文档#The_Messages_Produce,以 Produce v0 请求为例,协议的字段分布如下:
|
可见协议是由类型不一的字段组合嵌套而成,protocol 模块用Type
类描述字段类型,Field
类描述字段本身,Schema
类描述协议字段集,Struct
描述协议及对应数据:
|
由上可知 Produce v0 请求可被描述为:
|
(2)统一协议头
client 发出的每个请求都会带上 header,标识此请求的版本、类型、自增 id,描述如下:
|
(1)元信息
producer 需知道各 topic 的分区分布、各分区的 leader broker 地址,此类元信息由 Cluster 类描述:
|
类比 String,Cluster 类及其字段都是 final 只读,被修改时返回新实例,以保证 Sender 线程写、Producer 主线程读不存在并发问题。加上版本号分辨元信息的新旧,同时维护重试状态,由 Metadata 类描述:
|
(2)更新时机
NetworkClient 内部有一个持有 Metadata 的 DefaultMetadataUpdater 类,负责发起 MetadataRequest 更新请求并解析响应。client 每次执行 poll 前,都会先检查是否需要更新 Metadata,由于更新操作会阻塞主线程,故触发条件较为苛刻,有两层筛选:
Metadata 层:需等待自动过期、等待 backoff、被请求强制更新(网络层新建连接、断开连接、请求超时)
DefaultMetadataUpdater 层:若网络层无节点可用,不具备更新条件,也要等待 backoff
(3)更新实现
Metadata 由 Sender 线程写,而多个 Producer 线程读,是典型的线程间通信场景,故使用同一个 Sender.metadata
对象的wait & notifyAll
实现
实现:
|
(1)筛选各 node 需要发出的 batch 队列
Sender 线程在读写网络层之前,会根据 accumlator 排队时间、网络状态等条件,筛选出最紧急、最可能发送成功的 batch 集合,核心实现:
|
如上有两个核心筛选条件
ready()
:根据各 tp 的 batch 队列缓存情况,筛选出有 batch 要发送的 node 集合
drain()
:每个 node 需发送多个 tp batch 队列中的最老 batch,但由于单个请求大小有max.request.size
上限,为避免分区饥饿(有的分区迟迟不被选中导致 batch 超时),会从随机的 tp 开始收集 batch;若 max.request.size 较大,还会继续收集各个 tp 等待时间第二长的 batch,设计十分巧妙!
(2)协议封装
第一步为各 node 都筛选出了要发送的 batch 队列,还需进一步封装为 ClientRequest:
|
(3)处理发送结果
在 NetworkClient 收到响应后,会执行先解析出响应的 Struct,协议如下:
|
返回结果中指明了各 tp 的 base_offset 与 error_code,若有错误则检查是否可重试,无错误则 batch 发送成功
|
至此,分析了协议描述,Metadata 更新机制,accumulator 筛选 batch 的两层过滤机制,以及 Sender 包装请求和处理发送结果的过程
本文将 Kafka Producer 分为了三层
内存层:Compressor 实现消息压缩;MemoryRecords 实现 batch 写入并分批;RecordBatch 实现 batch 中各条消息元数据的异步计算,维护消息重发元数据;RecordAccumulator 则负责内存分配与协调
网络层:KafkaChannel 在 SocketChannel 上封装了拆包的读缓冲、封包的写缓冲;KafkaSelector 负责执行网络 IO 并收集结果;NetworkClient 负责维护连接状态,解析 IO 结果
数据处理层:Sender 线程从内存层筛选 batch,构造 Produce 请求下发给网络层,并处理发送结果
Producer 的亮点很多,个人认为有三点
朴素的并发设计:用 Condition 队列实现公平的内存分配、用 CountdownLatch 简化实现 Future 异步通知机制、用 metadata 对象的 wait & notifyAll 实现多线程同步等待更新,用 DCL 思想反射实例化 Compressor Constructor 单例…
严谨的并发逻辑:RecordAccumulator 在 rollover batch 时解决了细粒度锁引入的内存碎片问题…
简洁的模块解耦:RecordAccumulator 负责消息的缓冲分批,Sender 负责筛选 batch 构造发送请求并处理发送结果,NetworkClient 负责执行网络 IO
本文分析了 send 流程涉及到的核心模块及部分代码,更细致的逻辑还需参考源码
原文地址:https://yinzige.com/2020/02/15/kafka-producer/
end
- Flink 从入门到精通 系列文章
-
- 基于 Apache Flink 的实时监控告警系统
- 关于数据中台的深度思考与总结(干干货)
- 日志收集Agent,阴暗潮湿的地底世界
公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/618452
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。