当前位置:   article > 正文

Kafka Producer 实现源码分析

producerequestresult.await()阻塞

前言

拥抱变化接手了 Kafka 平台,遂学习 0.10.0 线上版本的设计与实现。限于篇幅,本文不会逐行解析源码,而是从逻辑流程、设计模式、并发安全等方面学习各组件,笔记仅供个人 Review


一:准备

1.1 配置项

参考文档 #producerconfigs,部分配置间会相互影响,如下:

  1. batch.size // 单个 batch 的最大字节数
  2. linger.ms // 控制 batch 未满时最多再等多久才发出
  3. buffer.memory // producer 消息缓冲区内存上限
  4. max.block.ms // 决定缓冲区满后阻发送一条消息的流程较长,涉及到的组件很多,塞等待可用内存的最长时间
  5. retries // 发送消息出现临时性错误时的最大重试次数(默认无限重试以实现 AtLeastOnce)
  6. max.in.flight.requests.per.connection // 控制一条连接上能发送的在途请求数(默认 5),若不为 1 则在重试时可能导致消息乱序

乱序情况:

3b6abe77f5934d0bc9773e5d41bfc2e0.png

  1. metadata.max.age.ms // 集群元信息的有效时长,超过则强制刷新
  2. connections.max.idle.ms // 连接的最长闲置时间,超过则主动断开
  3. max.block.ms // 应用层 send(), partitionFor() 的超时时间
  4. request.timeout.ms // 网络层任何请求等待响应的超时时间
  5. retry.backoff.ms, reconnect.backoff.ms // 重发消息、重连 broker 的定时规避周期

实现

producer 有 20+ 配置项,配置模块需对用户给定的Map<String, Object>, Properties等键值对象,进行配置值的类型检查、有效性检查、默认值填充等处理,得到有效配置。配置模块:

87ea60ce9d7751f39debc3ca37a93c1b.png

  • ConfigDef:记录用户原始配置、解析后的配置,并提供 util 方法按类型读取配置值,特别地,define 系列方法都返回 this 以实现链式调用,类似于 Builder 模式

  • ProducerConfig.CONFIG单例:静态常量,并在 static 代码块中定义,故在类加载阶段就会被初始化,是典型的单例模式应用,类似单例还有枚举实现的 protocol.ApiKeys,protocol.Errors 等等

  • Configurable接口规范类的反射构造行为:用户类(Partitioner, Serializer, Interceptor…)配置值都是字符串,反射实例化时构造方法可能无参或有参但类型不定,故抽象出Configurable接口,无参构造完成后接收配置值,进一步实例化

  1. public <T> T getConfiguredInstance(String key, Class<T> t) {
  2. Class<?> c = getClass(key);
  3. Object o = Utils.newInstance(c); // 无参构造实例化
  4. if (o instanceof Configurable)
  5. ((Configurable) o).configure(this.originals); // 完善构造
  6. return t.cast(o);
  7. }

1.2 预处理

如下 demo 展示了两种发送消息的方式

  • send 异步发送:返回类型为 RecordMetadata 的 Future 对象,调用 get 等待消息的发送结果

  • callback 异步发送:实现 Callback 接口,异步处理发送结果

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. Properties props = new Properties();
  4. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  5. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  6. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  7. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  8. ProducerRecord<String, String> record = new ProducerRecord<>("topic-01", "msg1");
  9. Future<RecordMetadata> future = producer.send(record);
  10. RecordMetadata meta = future.get(1, TimeUnit.SECONDS);
  11. log.info("[invoke] send succeed, offset: {}", meta.offset());
  12. producer.send(record, new UserCallback());
  13. producer.flush();
  14. }
  15. static class UserCallback implements org.apache.kafka.clients.producer.Callback {
  16. @Override
  17. public void onCompletion(RecordMetadata metadata, Exception exception) {
  18. if (exception == null) // 无异常,发送成功
  19. log.info("[callback] send succeed, offset: {}", metadata.offset());
  20. else // 发送失败
  21. log.info("[callback] send failed, exception: {}", exception.getMessage());
  22. }
  23. }
  24. }
  25. // INFO [invoke] send succeed, offset: 12 (org.apache.kafka.clients.producer.Main:24)
  26. // INFO [callback] send succeed, offset: 13 (org.apache.kafka.clients.producer.Main:34)

底层发送流程如下,之后章节将逐一解析各个组件:

256275b59e95917189bd63271508a01c.png

1. ProducerInterceptors

用户类可线程安全地实现ProducerInterceptor接口(限制不能抛异常),用在

  • 消息发送前修改内容:如统一添加 msg uuid

  • 在返回前读取元数据:如记录异常信息,tracing 日志等

0c8c6d6eab73d475ad3bae3b28f33011.png

支持组合多个拦截器,批量链式调用,但不提倡,因为链式调用时若某个拦截器抛出 unchecked 异常,捕捉后仅记录日志不抛回 send 调用方,用户无法感知

  1. public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
  2. ProducerRecord<K, V> interceptRecord = record;
  3. for (ProducerInterceptor<K, V> interceptor : this.interceptors) { // 有序遍历拦截器链,逐个拦截
  4. try {
  5. interceptRecord = interceptor.onSend(interceptRecord);
  6. } catch (Exception e) {
  7. // onSend 抛出运行时异常,只会打 warn 日志,本次拦截视为无效
  8. // 如 A->B->C->D,若 C 拦截时抛异常,则 D 拿到的消息是 B 拦截后的,这种行为非预期
  9. log.warn("Error executing interceptor onSend callback", e); // ...
  10. }
  11. }
  12. return interceptRecord;
  13. }
2. Serializer

发送消息的 Key 和 Value 统一用byte[]描述,实现二进制安全,故用户类需实现Serializer接口;serialization 模块已内置了基础数字类型、String 的序列化实现,consumer 反序列化 Deserializer 接口同理

  1. public interface Serializer<T> extends Closeable {
  2. public byte[] serialize(String topic, T data); /*...*/
  3. }
3. Partitioner

用户实现Partitioner来决定每一条消息要发往哪个分区

  1. public interface Partitioner extends Configurable {
  2. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  3. }

默认分区器DefaultPartitioner选择分区的流程:

41199b39a373f115fb63f2dccdfa9e5f.png

至此,分析了发送前消息的拦截修改、键值序列化、确定分区的逻辑


二:内存层

职责:内存池资源管理,消息压缩与 batch 分批,发送结果的异步计算

2.1 BufferPool

buffer.memory 配置限制了 producer 缓冲消息所能使用的最大内存,默认启用 batch 机制后,消息常以batch.size大小分批发送,故设计内存池重用 batch 内存

内存池划分为 3 个区域:

  • free 双端队列:元素为batch.size大小的内存块,消息发送成功后 clear 入队重用;有 2 个特性

    • 惰性分配:队列初始状态为空,后续分配的 batch “不规整内存块” 使用完毕则入队

    • 动态回收:若 availableMemory 内存不足,但 free 队列有空闲内存块时,会逐个出队释放内存(GC)

  • 不规整内存块:若 batch 被禁用,或发送大小在(batch.size, max.request.size]范围的大消息,会直接分配该大小的一次性 ByteBuffer,使用完毕后由 GC 回收

  • 空闲内存:非内存块实体,只是统计值,在分配和释放不规整内存块时对应增减库存

4d7972241c973b9fcaea283622eab3f3.png

内存不足时,调用send()的多个用户线程都会 await 阻塞在各自的条件变量上,内存池采用先到先得的策略,当有内存可用时只会 signal 唤醒入队最早、等待时间最长的线程,避免了线程饥饿或多线程低效竞争。示意图:

431c1f3478155a40123e9884e6a7f108.png

如上 thread_0 被唤醒后,会收集该可用内存,若内存已足够则恢复运行并唤醒 thread_1,否则继续等待


2.2 RecordBatch

消息批次用 RecordBatch 描述,维护消息重发、future 结果等元信息,消息实际存储在底层的 MemoryRecords 缓冲区,并使用 Compressor 进行压缩

8679653160dac0f29528c2e9752a7697.png

1. Compressor

1)类加载

producer 支持三种压缩方式:gzip, snappy 和 lz4,但只有 gzip 由 java.util.zip JDK 标准库实现,其他 2 种压缩类需添加 jar 包,在运行时反射加载,不使用时能减少包体积;同时为保证 producer 全局只会反射构造出一个 Constructor,用到了懒加载的 DCL 单例模式

2)封装 batch

每种算法都有预期压缩比,如 gzip 是 50%;在 MemoryRecords 视角,将 16KB batch 传给 Compressor 后,实际至多写入 32KB 数据,由于压缩比不精确,Compressor 要有动态扩容的能力,以容纳更多压缩消息;实现:

  1. public class ByteBufferOutputStream extends OutputStream {
  2. /*...*/
  3. public void write(int b) {
  4. if (buffer.remaining() < 1)
  5. expandBuffer(buffer.capacity() + 1); // 内存不足,自动扩容
  6. buffer.put((byte) b);
  7. }
  8. private void expandBuffer(int size) { // 容量增长 10%,或增长到 size 字节
  9. int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
  10. ByteBuffer temp = ByteBuffer.allocate(expandSize);
  11. temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
  12. buffer = temp;
  13. }
  14. }

3)压缩实现

实现了两层装饰模式:为 ByteBuffer 装饰了自动扩容功能,为各种类型数据的 put 操作装饰了压缩写:

d81f1c84e9219f59a51388bce19dbba0.png

特别地,putRecord() 带压缩地写入一条消息,写入后的内存结构:

7e1da87e1e5e9a970b130de7ec864693.png

更正:magic number 为 1 时带 timestamp,为 0 则无 timestamp 字段

缓冲区写满后触发close(),Compressor 会倒回至 ByteBuffer 的初始位置,插入一条 Shadow Record 补全元数据,将其后所有压缩后的 Real Records 视为其 key 的值,内存结构:

8fdcc7dac3c04b26f93312eecaff953c.png

TODO: 勘误画图,压缩后的 RealRecords 放的是 value 字段

注意,close() 方法中会动态调整压缩比,即压缩比是自适应的

4)写满判断

判断 MemoryRecords 是否已满,是通过估算 Compressor 压缩后字节数实现的,估算逻辑:

  1. public class Compressor {
  2. public long estimatedBytesWritten() {
  3. if (type == CompressionType.NONE)
  4. return bufferStream.buffer().position();
  5. else // 有压缩,压缩后字节数 = 未压缩字节数 * 压缩比 * 误差因子
  6. // 比如用 gzip 压缩 1MB 数据:1MB*0.5*1.05 = 537KB
  7. return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
  8. }
  9. }
  10. public class MemoryRecords implements Records {
  11. public boolean isFull() { // 估算压缩后的大小是否已超 Compressor 容量
  12. return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
  13. }
  14. }

2. MemoryRecords

负责委托 Compressor 追加写 Record,为其添加 LOG_OVERHEAD 头信息

  1. public long append(long offset, long timestamp, byte[] key, byte[] value) {
  2. if (!writable)
  3. throw new IllegalStateException("Memory records is not writable");
  4. int size = Record.recordSize(key, value);
  5. compressor.putLong(offset); // 此 record 在 batch 中的相对偏移量
  6. compressor.putInt(size); // 压缩前此 record 的大小
  7. long crc = compressor.putRecord(timestamp, key, value);
  8. compressor.recordWritten(size + Records.LOG_OVERHEAD);
  9. return crc;
  10. }

当缓冲区满后会切换为只读模式,等待 drain 选中发出

  1. public void close() {
  2. if (writable) {
  3. compressor.close(); // 回填压缩元数据
  4. this.buffer = compressor.buffer(); // compressor 持有的 ByteBuffer,可能已扩容,更新指向
  5. this.buffer.flip(); // 切为读模式
  6. writable = false;
  7. }
  8. }

基于以上 2 个组件,RecordBatch 实现了三个机制:

1)委托 MemoryRecords 追加写 Record,并将各 Record 的元数据 future 与 batch 写结果相关联

  1. public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
  2. /*...*/
  3. // 递增 offsetCounter,即递增 record 的 relative offset,追加写到 batch 中
  4. long checksum = this.records.append(offsetCounter++, timestamp, key, value);
  5. FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
  6. timestamp, checksum,
  7. key == null ? -1 : key.length,
  8. value == null ? -1 : value.length);
  9. if (callback != null)
  10. thunks.add(new Thunk(callback, future)); // 用户 callback 组合 record future 记为 thunk
  11. return future;
  12. }

2)batch 发送结束后,发送结果 baseOffset 和 exception 会被填充到 ProduceRequestResult,通知各 Record 的 FutureRecordMetadata,唤醒阻塞在 get() 调用上的用户线程:

cbec839cdea95597f108b58efd6ca036.png

此处多线程需等待单线程执行结果,用 CountDownLatch 模拟实现了 Future 接口:

  1. public final class ProduceRequestResult {
  2. private final CountDownLatch latch = new CountDownLatch(1); // 模拟 future 异步通知与等待
  3. public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
  4. this.topicPartition = topicPartition;
  5. this.baseOffset = baseOffset;
  6. this.error = error;
  7. this.latch.countDown(); // 发送完毕
  8. }
  9. public void await() throws InterruptedException {
  10. latch.await();
  11. }
  12. }
  13. public final class FutureRecordMetadata implements Future<RecordMetadata> {
  14. private final ProduceRequestResult result; // 二者是组合关系
  15. private final long relativeOffset;
  16. @Override
  17. public RecordMetadata get() throws InterruptedException, ExecutionException {
  18. this.result.await(); // 阻塞等待 batch 发送完毕,会在 Sender.handleProduceResponse 中调用 done
  19. // baseOffset + relativeOffset 即此消息在分区中的绝对 offset
  20. return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, /*...*/);
  21. }
  22. }

3)维护 batch 的重发状态

  • attempts:已重试次数,在retries内都会重试

  • lastAttemptMs:上次重试发送的时间戳,配合retry.backoff.ms避免频繁重试


2.3 RecordAccumulator

accumulator 维护各 topic partition(tp)的 batch 队列,结构如下:

  1. public final class RecordAccumulator {
  2. private final BufferPool free; // 内存池
  3. private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; // 每个 topic 分区持有一个 RecordBatch 双端队列
  4. }

producer 的 send 操作只将消息放入对应的 RecordBatch 中即返回:

9fdaa4e3ddb07b239e5efab68329fa16.png

accumulator 还负责对各 tp 的 batch 队列进行 rollover,在实现上有两个优化亮点:

(1)细粒度锁,提高可用性:创建新 batch 时需阻塞申请内存,会主动放弃 dq 互斥锁。示意图:

1490e8de9fed5c19acfd19481e42b6a4.png

  • 问题:A 持有 dq 锁,阻塞申请内存,会导致虽然内存够,但 B 也必须等待

  • 解决:当锁范围内有耗时操作时,考虑拆为细粒度锁,减少锁的持有时间

(2)解决并发 rollover 的内存碎片问题:细粒度锁的副作用是引入了新的并发竞争

adb758b71baa34a78cdc59bd00ed945a.png

  • 问题:A,B 同时发送大消息,都创建新 batch 后都入队,先入队的 batch 不再被使用,剩余内存将被浪费

  • 解决:创建完新 batch 后不着急使用,先尝试写入 last batch,若写入成功则释放新 batch

缓存消息的实现:

  1. public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) {
  2. // 1. 获取 tp 的可用 batch
  3. Deque<RecordBatch> dq = getOrCreateDeque(tp); // 获取或创建此 tp 的 batch 队列
  4. synchronized (dq) { // batch deque 并非线程安全,整个队列加锁
  5. RecordBatch last = dq.peekLast(); // 之前 batch 都已满,取最后一个 batch
  6. if (last != null) {
  7. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
  8. if (future != null) { // 写成功直接返回
  9. return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
  10. }
  11. }
  12. } // 放弃 dq 锁
  13. // 2. tp 的 batch 队列为空,或现有 batch 都已满,阻塞申请内存创建新 batch
  14. int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
  15. ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
  16. synchronized (dq) { // 重新加锁
  17. RecordBatch last = dq.peekLast(); // 重新尝试入队 last
  18. if (last != null) {
  19. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
  20. if (future != null) {
  21. free.deallocate(buffer); // 使用 batchA,释放 batchB
  22. return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
  23. }
  24. }
  25. // 3. 构建新 batch 重新入队,肯定能写成功
  26. MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
  27. RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
  28. FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
  29. dq.addLast(batch); // 新 batch 入队
  30. return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
  31. }
  32. }

至此,分析了消息的内存分配、压缩写入、batch 读写模式切换、batch rollover 等机制


三:网络层

职责:维护连接状态,执行四种网络 IO 并收集结果

producer 网络层使用了 NIO Selector 机制,内部各组件关系如下,逐个分析

12990259ecb0a5d93eb6188402d3868a.png

3.1 KafkaChannel

1. TransportLayer

封装 SocketChannel 的读写,提供注册事件的快捷方法

  1. // 实现分割、聚合读写 ByteBuffer 的 Channel
  2. interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel{ /*...*/ }
  3. public class PlaintextTransportLayer implements TransportLayer { // 明文传输层
  4. private final SelectionKey key;
  5. private final SocketChannel socketChannel;
  6. @Override
  7. public boolean finishConnect() throws IOException {
  8. boolean connected = socketChannel.finishConnect(); // 等待连接建立,并自动注册 OP_READ 事件
  9. if (connected)
  10. key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  11. return connected;
  12. }
  13. @Override
  14. public int read(ByteBuffer dst) throws IOException {
  15. return socketChannel.read(dst); // 读写操作都被直接委托给了底层的 SocketChannel
  16. }
  17. public int write(ByteBuffer src) throws IOException {
  18. return socketChannel.write(src);
  19. }
  20. @Override
  21. public void addInterestOps(int ops) {
  22. key.interestOps(key.interestOps() | ops); // 注册新事件
  23. }
  24. /*...*/
  25. } /* TODO: SslTransportLayer */
2. 读写缓冲区
  • 读缓冲 NetworkReceive 类:从 SocketChannel 中拆包,持续读取一个完整的响应

    121e976c54d4c5729e510b89d859dc19.png

  • 写缓冲 Send 接口:由 RequestSend->NetworkSend->ByteBufferSend 装饰链,将一个完整的请求封包,持续写入 SocketChannel

    39074f85edb72664ddb16b2c298e1896.png

KafkaChannel 只维护一个读缓冲、一个写缓冲,并提供对应的read, write方法读写 TransportLayer

2b8448f86c3fb83a29a8ed5c3dc55086.png


3.2 KafkaSelector

在 NIO Selector 上封装了网络 IO 的单次执行 4 种结果,对应了 4 种 IO 事件:

  1. List<String> connected; // OP_CONNECT // 新建连接的 broker id 列表
  2. List<String> disconnected; // closed, error // 断开连接的 broker id 列表
  3. List<NetworkReceive> completedReceives; // OP_READ // 读取到的响应数据
  4. List<Send> completedSends; // OP_WRITE // 已写出的请求数据

执行网络 IO:

  1. public void poll(long timeout) throws IOException {
  2. clear(); // 清理上次 poll 的结果
  3. int readyKeys = select(timeout); // 等待网络 IO
  4. if (readyKeys > 0)
  5. pollSelectionKeys(this.nioSelector.selectedKeys(), false);
  6. addToCompletedReceives();
  7. }
  8. private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  9. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  10. while (iterator.hasNext()) {
  11. SelectionKey key = iterator.next(); // 逐个处理 selectionKey
  12. iterator.remove();
  13. KafkaChannel channel = channel(key); // 取出 attach 的 kafkaChannel
  14. try {
  15. // 1. 处理 OP_CONNECT
  16. if (isImmediatelyConnected || key.isConnectable()) {
  17. if (channel.finishConnect()) // 连接成功,注册 OP_READ
  18. this.connected.add(channel.id()); // 1. 收集新连接
  19. else
  20. continue; // 连接未完成,后续不必再处理
  21. }
  22. // 2. 处理 OP_READ
  23. if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
  24. NetworkReceive networkReceive;
  25. while ((networkReceive = channel.read()) != null)
  26. addToStagedReceives(channel, networkReceive); // 2. 收集一个完整的 Receive 响应
  27. }
  28. // 3. 处理 OP_WRITE
  29. if (channel.ready() && key.isWritable()) {
  30. Send send = channel.write(); // 写出 send 缓冲
  31. if (send != null)
  32. this.completedSends.add(send); // 3. 收集写成功的 Send 请求
  33. }
  34. // 4. 处理 closed
  35. if (!key.isValid()) {
  36. close(channel);
  37. this.disconnected.add(channel.id()); // 4. 收集不可用连接
  38. }
  39. } catch (Exception e) {
  40. close(channel);
  41. this.disconnected.add(channel.id()); // 有异常视为连接不可用
  42. }
  43. }
  44. }

结果收集完毕后,通过 List<Send> completedSends()等多个对应方法暴露给上层的 NetworkClient


3.3 NetworkClient

负责统一网络 IO 结果并解析响应,维护发往各个节点的有序请求队列,维护与各节点单一连接的状态;持有 2 个子组件:

(1)ClusterConnectionStates:连接状态管理

client 与每个 broker 都只会保持一条 TCP 连接,而非维护一个连接池,以简化消息有序性的实现。连接有三种状态,并由NodeConnectionState维护重连信息,由ClusterConnectionStates持有整个集群的连接状态

  1. final class ClusterConnectionStates {
  2. private final long reconnectBackoffMs;
  3. private final Map<String, NodeConnectionState> nodeState;
  4. /* ... */
  5. private static class NodeConnectionState {
  6. ConnectionState state;
  7. long lastConnectAttemptMs;
  8. }
  9. }
  10. public enum ConnectionState {
  11. DISCONNECTED, CONNECTING, CONNECTED
  12. }

(2)InFlightRequests:各节点的有序请求队列

client 发往各节点的请求都会对应入队 InFlightRequests 暂存,以实现三个功能:

  • 有序收发:收到响应的顺序,必须与发出请求的顺序保持一致

  • 请求超时检测:发出的请求在request.timeout.ms时限仍未收到响应,向用户返回连接异常

  • 请求数限制:限制单个节点(连接)并发请求数,不超过max.in.flight.requests.per.connection配置,配置设为 1 以实现消息的绝对有序性

  1. final class InFlightRequests {
  2. private final int maxInFlightRequestsPerConnection;
  3. private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
  4. // 是否限制 node 发送新请求
  5. public boolean canSendMore(String node) {
  6. Deque<ClientRequest> queue = requests.get(node);
  7. return queue == null || queue.isEmpty() || // 1. 未发出任何请求
  8. // 2. 上一个请求必须已完成 write,否则网络可能不可用,上一个请求还在该 channel 的 send 缓冲区中,不能强行覆盖
  9. (queue.peekFirst().request().completed()
  10. // 3. 同时在途请求数不能超过配置
  11. && queue.size() < this.maxInFlightRequestsPerConnection);
  12. }
  13. // 筛出出有超时请求的节点列表
  14. public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
  15. List<String> nodeIds = new LinkedList<String>();
  16. for (String nodeId : requests.keySet()) {
  17. if (inFlightRequestCount(nodeId) > 0) {
  18. ClientRequest request = requests.get(nodeId).peekLast(); // 最早入队的 request
  19. long timeSinceSend = now - request.sendTimeMs();
  20. if (timeSinceSend > requestTimeout) // 超过 request.timeout.ms 都还未处理完,视为超时
  21. nodeIds.add(nodeId);
  22. }
  23. }
  24. return nodeIds;
  25. }
  26. }

此外,还负责读取网络 IO 结果,执行协议解析并汇总成 Response pipeline,逐个调用 protocol handler:

  1. public List<ClientResponse> poll(long timeout, long now) {
  2. long metadataTimeout = metadataUpdater.maybeUpdate(now); // 若有必要,发起额外的 MetadataRequest 请求
  3. this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); // min timeout 保证各种超时事件能被及时执行
  4. List<ClientResponse> responses = new ArrayList<>();
  5. // 1. 处理 acks=0 的请求:对于发送成功的 send,若 acks=0 不需要等待响应,构造空 response 并收集
  6. handleCompletedSends(responses, updatedNow);
  7. // 2. 解析响应:用对应版本的协议,去解析 receive 响应数据,填充到 Struct 后入队 responses
  8. handleCompletedReceives(responses, updatedNow);
  9. // 3. 处理连接断开:已断开连接节点的 in-flights 请求,构造 disconnect response
  10. handleDisconnections(responses, updatedNow);
  11. // 4. 处理新建连接
  12. handleConnections();
  13. // 5. 处理请求超时:超时节点被视为不可用,同样为 in-flights 队列重的请求,构造 disconnect response
  14. // 连接断开是网络异常,请求超时是网络延迟,都视为节点不可用,连接都会被置为 DISCONNECTED 等待重连
  15. handleTimedOutRequests(responses, updatedNow);
  16. // 处理本轮 IO 结果,调用 request 的 callback handler
  17. for (ClientResponse response : responses) {
  18. if (response.request().hasCallback())
  19. // 如 Produce 请求会执行 RequestCompletionHandler.handleProduceResponse
  20. response.request().callback().onComplete(response);
  21. }
  22. return responses;
  23. }

至此,分析了底层 SocketChannel 的读缓冲拆包、写缓冲封包,中间层 NIO Selector 四种网络 IO 事件的处理及单次结果收集,上层 NetworkClient 维护连接状态及 IO 结果处理等机制


四:数据处理层

职责:筛选出待发送的 batch,构造请求,处理发送结果,重发消息,维护集群元数据

4.1 通信协议

(1)协议描述

参考文档#The_Messages_Produce,以 Produce v0 请求为例,协议的字段分布如下:

  1. acks => INT16
  2. timeout_ms => INT32
  3. [ topic_name => STRING # [] 表示数组
  4. topic_data => [
  5. partition_index => INT32
  6. record_set => BYTES # 压缩后的 batch 队列数据
  7. ]
  8. ]

可见协议是由类型不一的字段组合嵌套而成,protocol 模块用Type类描述字段类型,Field类描述字段本身,Schema类描述协议字段集,Struct描述协议及对应数据:

  1. public class Field {
  2. public final String name; // 字段名
  3. public final Type type; // 可序列化的字段类型
  4. }
  5. public abstract class Type {
  6. public abstract void write(ByteBuffer buffer, Object o); // 每种数据类型都需实现的读写方法
  7. public abstract Object read(ByteBuffer buffer); // 即完成数据的序列化、反序列化
  8. public static final Type INT32 = new Type() {/*...*/} // 基本类型、String、ByteBuffer 对应的协议类型
  9. public static final Type BYTES = new Type() {/*...*/}
  10. }
  11. public class Schema extends Type {
  12. private final Field[] fields; // 协议的字段集
  13. private final Map<String, Field> fieldsByName;
  14. }
  15. public class Struct {
  16. private final Schema schema;
  17. private final Object[] values; // 协议对应的数据
  18. }

由上可知 Produce v0 请求可被描述为:

  1. public static final Schema TOPIC_PRODUCE_DATA_V0
  2. = new Schema(new Field("topic", STRING),
  3. new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
  4. new Field("record_set", BYTES)))));
  5. public static final Schema TOPIC_PRODUCE_DATA_V0
  6. = new Schema(new Field("acks", INT16),
  7. new Field("timeout", INT32),
  8. new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));

(2)统一协议头

client 发出的每个请求都会带上 header,标识此请求的版本、类型、自增 id,描述如下:

  1. public static final Schema REQUEST_HEADER
  2. = new Schema(new Field("api_key", INT16), // 请求类型,比如 Metadata 类型为 3
  3. new Field("api_version", INT16), // 协议版本号,如 v2 版本值为 2
  4. new Field("correlation_id", INT32), // 请求的自增唯一 id,broker 会原样返回,用于校验响应
  5. new Field("client_id", NULLABLE_STRING));
  6. public enum ApiKeys {
  7. PRODUCE(0, "Produce"),
  8. METADATA(3, "Metadata"), /* ... */
  9. }

4.2 Metadata

(1)元信息

producer 需知道各 topic 的分区分布、各分区的 leader broker 地址,此类元信息由 Cluster 类描述:

  1. public class Node { // broker 由 Node 描述
  2. private final int id;
  3. private final String idString;
  4. private final String host;
  5. private final int port;
  6. }
  7. public class PartitionInfo {
  8. private final String topic; // TopicPartition: Topic, Partition
  9. private final int partition;
  10. private final Node leader;
  11. }
  12. public final class Cluster {
  13. private final List<Node> nodes;
  14. private final Map<Integer, Node> nodesById; // nodeId -> node
  15. private final Map<Integer, List<PartitionInfo>> partitionsByNode; // leaderNode -> [partInfo, ...]
  16. private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // tp -> partInfo
  17. private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic -> [all tp]
  18. private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; // topic -> [有 leader 的 tp]
  19. }

类比 String,Cluster 类及其字段都是 final 只读,被修改时返回新实例,以保证 Sender 线程写、Producer 主线程读不存在并发问题。加上版本号分辨元信息的新旧,同时维护重试状态,由 Metadata 类描述:

  1. public final class Metadata {
  2. private int version; // 版本号标识 Cluster 的新旧
  3. private Cluster cluster;
  4. private long lastRefreshMs; // 上次更新时间戳,用于重试 backoff 等待
  5. private long lastSuccessfulRefreshMs; // 上次成功更新的时间戳
  6. private boolean needUpdate; // 是否需要强制更新
  7. }
  8. /* 各种 synchronized 读写方法*/

(2)更新时机

NetworkClient 内部有一个持有 Metadata 的 DefaultMetadataUpdater 类,负责发起 MetadataRequest 更新请求并解析响应。client 每次执行 poll 前,都会先检查是否需要更新 Metadata,由于更新操作会阻塞主线程,故触发条件较为苛刻,有两层筛选:

  • Metadata 层:需等待自动过期、等待 backoff、被请求强制更新(网络层新建连接、断开连接、请求超时)

  • DefaultMetadataUpdater 层:若网络层无节点可用,不具备更新条件,也要等待 backoff

429615cc5d5292fdfe5f4452dbebf218.png

(3)更新实现

Metadata 由 Sender 线程写,而多个 Producer 线程读,是典型的线程间通信场景,故使用同一个 Sender.metadata对象的wait & notifyAll实现

ebea3fdaf03bcc50608b07136468d7f7.png

实现:

  1. public class KafkaProducer<K, V> implements Producer<K, V> {
  2. private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
  3. /* ... */
  4. while (metadata.fetch().partitionsForTopic(topic) == null) { // 带超时地等待该 topic 元数据可用
  5. int version = metadata.requestUpdate(); // 请求更新
  6. sender.wakeup(); // 唤醒 selector,触发 NetworkClient 执行 poll
  7. metadata.awaitUpdate(version, remainingWaitMs); // 等待 Sender 线程更新完毕调用 notifyAll
  8. }
  9. }
  10. }
  11. public final class Metadata {
  12. public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
  13. while (this.version <= lastVersion) {
  14. if (remainingWaitMs != 0)
  15. wait(remainingWaitMs);
  16. /*...*/
  17. }
  18. }
  19. public synchronized void update(Cluster cluster, long now) {
  20. this.version += 1;
  21. this.cluster = cluster;
  22. notifyAll();
  23. }
  24. }
  25. public class NetworkClient implements KafkaClient {
  26. class DefaultMetadataUpdater implements MetadataUpdater {
  27. private final Metadata metadata;
  28. /*...*/ // 找出 in-flights 请求数最少的节点,发起 Metadata 请求
  29. private void handleResponse(RequestHeader header, Struct body, long now) {
  30. MetadataResponse response = new MetadataResponse(body);
  31. Cluster cluster = response.cluster(); // 解析响应读取元信息
  32. this.metadata.update(cluster, now); // 通知 Producer 线程
  33. }
  34. }
  35. }

4.3 Sender

(1)筛选各 node 需要发出的 batch 队列

Sender 线程在读写网络层之前,会根据 accumlator 排队时间、网络状态等条件,筛选出最紧急、最可能发送成功的 batch 集合,核心实现:

  1. public class Sender implements Runnable {
  2. void run(long now) {
  3. // 1. 读取缓存的集群元信息
  4. Cluster cluster = metadata.fetch();
  5. // 2. accumulator: 筛选出有 topic 分区 batch 要发的节点集合
  6. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  7. // 3. kafkaClient: 筛掉出符合网络 IO 条件的节点集合
  8. Iterator<Node> iter = result.readyNodes.iterator();
  9. while (iter.hasNext())
  10. /*...*/ // 过滤非 CONNECTED 连接
  11. // 4. accumulator:受限于请求大小上限,为各 node 收集其负责读写的各 tp 上的等待时间最长 batch
  12. Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
  13. // 5. 检测已超时的 batch
  14. this.accumulator.abortExpiredBatches(this.requestTimeout, now);
  15. // 6. 将 batch 队列对于封装为 ClientRequest 队列
  16. List<ClientRequest> requests = createProduceRequests(batches, now);
  17. for (ClientRequest request : requests)
  18. client.send(request, now); // 发送
  19. // 7. 发送请求,读取响应
  20. this.client.poll(pollTimeout, now); // pollTimeout 计算取了节点重连,lingerMs 到期等时间最小值
  21. }
  22. }

如上有两个核心筛选条件

  • ready():根据各 tp 的 batch 队列缓存情况,筛选出有 batch 要发送的 node 集合

    1c1f4eb552dfd073a225c7dc9cef3c0f.png

  • drain():每个 node 需发送多个 tp batch 队列中的最老 batch,但由于单个请求大小有max.request.size上限,为避免分区饥饿(有的分区迟迟不被选中导致 batch 超时),会从随机的 tp 开始收集 batch;若 max.request.size 较大,还会继续收集各个 tp 等待时间第二长的 batch,设计十分巧妙!

    623e17728ff44c4441c368fe4e2b6bf5.png

(2)协议封装

第一步为各 node 都筛选出了要发送的 batch 队列,还需进一步封装为 ClientRequest:

  1. // 为各 node 构建 ClientRequest 网络请求
  2. private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
  3. Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
  4. final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
  5. for (RecordBatch batch : batches) {
  6. TopicPartition tp = batch.topicPartition;
  7. produceRecordsByPartition.put(tp, batch.records.buffer()); // tp -> 压缩后的 byteBuffer
  8. recordsByPartition.put(tp, batch); // tp -> 维护重试信息的 RecordBatch
  9. }
  10. // ProduceRequest 协议及数据 --> RequestSend 写缓冲
  11. ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
  12. RequestSend send = new RequestSend(Integer.toString(destination),
  13. this.client.nextRequestHeader(ApiKeys.PRODUCE), // 单调递增 correlation id
  14. request.toStruct());
  15. // 当 NetworkClient 收到此请求的响应时,将调用此 callback 处理响应结果
  16. RequestCompletionHandler callback = new RequestCompletionHandler() {
  17. public void onComplete(ClientResponse response) {
  18. handleProduceResponse(response, recordsByPartition, time.milliseconds()); // 捕捉了各 tp 的 batch 信息
  19. }
  20. };
  21. // RequestSend --callback handler--> ClientRequest
  22. return new ClientRequest(now, acks != 0, send, callback);
  23. }

(3)处理发送结果

在 NetworkClient 收到响应后,会执行先解析出响应的 Struct,协议如下:

  1. public static final Schema PRODUCE_RESPONSE_V2
  2. = new Schema(new Field("responses",
  3. new ArrayOf(new Schema(new Field("topic", STRING),
  4. new Field("partition_responses",
  5. new ArrayOf(new Schema(new Field("partition", INT32),
  6. new Field("error_code", INT16),
  7. new Field("base_offset", INT64),
  8. new Field("timestamp", INT64))))))),
  9. new Field("throttle_time_ms", INT32));

返回结果中指明了各 tp 的 base_offset 与 error_code,若有错误则检查是否可重试,无错误则 batch 发送成功

  1. private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
  2. if (error != Errors.NONE && canRetry(batch, error)) {
  3. this.accumulator.reenqueue(batch, now); // 1. 发生可重试错误,重新入队到队头,尽快重发
  4. } else {
  5. // 2. 致命错误 or 重试次数已耗尽 or 无错误
  6. RuntimeException exception;
  7. if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
  8. exception = new TopicAuthorizationException(batch.topicPartition.topic());
  9. else
  10. exception = error.exception(); // 各种异常单例
  11. batch.done(baseOffset, timestamp, exception); // 回调用户 callback,构造 future 结果结束等待
  12. this.accumulator.deallocate(batch); // 向 accumulator 归还 batch 内存块
  13. }
  14. if (error.exception() instanceof InvalidMetadataException)
  15. metadata.requestUpdate(); // 元数据过期,请求强制更新
  16. if (guaranteeMessageOrder)
  17. this.accumulator.unmutePartition(batch.topicPartition); // 解除 tp 屏蔽
  18. }

至此,分析了协议描述,Metadata 更新机制,accumulator 筛选 batch 的两层过滤机制,以及 Sender 包装请求和处理发送结果的过程


总结

本文将 Kafka Producer 分为了三层

  • 内存层:Compressor 实现消息压缩;MemoryRecords 实现 batch 写入并分批;RecordBatch 实现 batch 中各条消息元数据的异步计算,维护消息重发元数据;RecordAccumulator 则负责内存分配与协调

  • 网络层:KafkaChannel 在 SocketChannel 上封装了拆包的读缓冲、封包的写缓冲;KafkaSelector 负责执行网络 IO 并收集结果;NetworkClient 负责维护连接状态,解析 IO 结果

  • 数据处理层:Sender 线程从内存层筛选 batch,构造 Produce 请求下发给网络层,并处理发送结果

Producer 的亮点很多,个人认为有三点

  • 朴素的并发设计:用 Condition 队列实现公平的内存分配、用 CountdownLatch 简化实现 Future 异步通知机制、用 metadata 对象的 wait & notifyAll 实现多线程同步等待更新,用 DCL 思想反射实例化 Compressor Constructor 单例…

  • 严谨的并发逻辑:RecordAccumulator 在 rollover batch 时解决了细粒度锁引入的内存碎片问题…

  • 简洁的模块解耦:RecordAccumulator 负责消息的缓冲分批,Sender 负责筛选 batch 构造发送请求并处理发送结果,NetworkClient 负责执行网络 IO

本文分析了 send 流程涉及到的核心模块及部分代码,更细致的逻辑还需参考源码

原文地址:https://yinzige.com/2020/02/15/kafka-producer/

end

  1. Flink 从入门到精通 系列文章
  2. 基于 Apache Flink 的实时监控告警系统
  3. 关于数据中台的深度思考与总结(干干货)
  4. 日志收集Agent,阴暗潮湿的地底世界

8aec91265eac9e5f37be0abbb12e5590.png

1e9be416652a8147b8495a210ae43a45.png

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/618452
推荐阅读
相关标签