当前位置:   article > 正文

你绝对能看懂的Kafka源代码分析-KafkaProducer类代码分析_kafka producer源码

kafka producer源码

博主:爱码叔

个人博客站点: [icodebook](https://icodebook.com/)

公众号:爱码叔漫画软件设计(搜:爱码叔)

专注于软件设计与架构、技术管理。擅长用通俗易懂的语言讲解技术。对技术管理工作有自己的一定见解。文章会第一时间首发在个站上,欢迎大家关注访问!

目录:

《Kafka Producer设计分析》

《KafkaProducer类代码分析》

《RecordAccumulator类代码分析》

《Sender类代码分析》

《NetworkClient类代码分析》

-------------------------------------------------------------------

<上一节《你绝对能看懂的Kafka源代码分析-Kafka Producer设计分析》

上一小节,我们从设计角度对producer进行了讲解,我们也从宏观上了解了producer的设计原理,接下来我们将进入代码的分析

----------------------------------------------------------------------

客户端发送消息时调用KafkaProducer的send方法,所以我们先分析KafkaProducer,再层层深入。

KafkaProducer相当于整个快递公司的总控员,操作收件员收件,命令分拣员进行分拣,最终通知货车可以发车了。货车发货则在另外一个线程中进行。

消息发送的顶层逻辑都在KafkaProducer中。在看代码前,我先宏观介绍下KafkaProducer中send方法的主流程,帮助代码理解,括号中以发快递类比:

  1. 拦截器处理(对快递进行发送前的预处理
  2. 判断producer是否可用。依据是负责io的线程是否工作。(车队罢工了还如何发送快递?
  3. 判断metadata是否可用。metadata相当于调度员的指挥图,存储了kafka集群的各种信息,包括topic、分区等等。(如果没有快递网点的信息,如何进行调度派发?
  4. 对key和value进行序列化。(对快递进行包装
  5. 获取要发往的分区编号(快递目的地部分地址
  6. 计算序列化后大小(快件称重
  7. 通过RecordAccumulator把消息加入batch中(分拣员进行分拣
  8. 如果batch满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。(已经有封箱的货物了,发货吧!

send方法的整体逻辑如上,每一步其实都和我们真实场景相对应,这就是设计的巧妙之处。

接下来我们进入代码部分,我们先看下KafkaProducer中的部分重要属性

  1. private final Partitioner partitioner;
  2. 说明:分区选择器,根据一定策略,选择出消息要发往的分区
  3. private final int maxRequestSize;
  4. 说明:消息最大程度,包括了消息头、序列化后的key和value
  5. private final long totalMemorySize;
  6. 说明:单个消息发送的缓冲区大小
  7. private final Metadata metadata;
  8. 说明:kafka集群元数据
  9. private final RecordAccumulator accumulator;
  10. 说明:前面所说的分拣员,维护不同分区的batch,负责分拣消息。等待sender来发送
  11. private final Sender sender;
  12. 说明:发送消息,实现Runable,ioThread中启动运行
  13. private final Thread ioThread;
  14. 说明:运行sender的线程,负责发送消息到kafka集群
  15. private final CompressionType compressionType;
  16. 说明:压缩算法类型,gzip等。消息数量越多,压缩效果越好
  17. private final Serializer<K> keySerializer;
  18. 说明:key的序列化器
  19. private final Serializer<V> valueSerializer;
  20. 说明:value的序列化器
  21. private final ProducerConfig producerConfig;
  22. 说明:生产者的配置
  23. private final long maxBlockTimeMs;
  24. 说明:等待更新kafka集群元数据的最大时长
  25. private final ProducerInterceptors<K, V> interceptors;
  26. 说明:发送前消息钱,要先通过一组拦截器的处理。也可以先于用户的callback预处理

KafkaProducer构造器会初始化上面的属性。另外在构造函数最后可以看到启动了ioThread。

  1. this.sender = newSender(logContext, kafkaClient, this.metadata);
  2. String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
  3. this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  4. this.ioThread.start();

接下来我们看一下send方法代码:

  1. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  2. // intercept the record, which can be potentially modified; this method does not throw exceptions
  3. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  4. return doSend(interceptedRecord, callback);
  5. }

ProducerInterceptors对象中维护了List<ProducerInterceptor<K, V>> interceptors。在onSend方法中,会循环这个列表,调用每个ProducerInterceptor的onSend方法做预处理。

拦截器处理后,再调用doSend方法。发送的主要逻辑都在doSend方法中,我按照上面介绍的发送主流程结合代码来讲解。这里不粘贴doSend方法的整段代码,大家自行参考源代码。

1、判断producer是否可用。

可以看到一进来就调用了throwIfProducerClosed()方法,这个方法里逻辑如下:

  1. private void throwIfProducerClosed() {
  2. if (ioThread == null || !ioThread.isAlive())
  3. throw new IllegalStateException("Cannot perform operation after producer has been closed");
  4. }

很简单,就是在检查io线程状态。

2、判断这个topic的metadata是否可用。

代码如下:

  1. ClusterAndWaitTime clusterAndWaitTime;
  2. try {
  3. clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
  4. } catch (KafkaException e) {
  5. if (metadata.isClosed())
  6. throw new KafkaException("Producer closed while send in progress", e);
  7. throw e;
  8. }

主要逻辑在waitOnMetadata方法中。这里不展开讲,方法中做的事情是从缓存获取元数据中topic的信息,判断是否可用,如果缓存中存在分区并且请求分区在范围内,则直接返回cluster信息,否则发送更新请求。再判断分区是否正常并且请求更新的分区在此topic分区的范围内。这个方法的返回值是Cluster原数据以及所花费的时长。

3、对key和value进行序列化。

主要是如下两行代码

  1. serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  2. serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

3、获取消息要发往的分区编号

  1. int partition = partition(record, serializedKey, serializedValue, cluster);
  2. tp = new TopicPartition(record.topic(), partition);

TopicPartition对象实际上只是封装了topic和partition,那么消息的发送地址就齐全了。

4、计算序列化后大小

  1. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
  2. compressionType, serializedKey, serializedValue, headers);
  3. ensureValidRecordSize(serializedSize);

ensureValidRecordSize方法中验证size是否未超过maxRequestSize及totalMemorySize。

5、通过RecordAccumulator把消息加入batch中

  1. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  2. serializedValue, headers, interceptCallback, remainingWaitMs);

accumulator.append方法中做分拣逻辑处理,后面会重点讲解RecordAccumulator。这里我们只需要知道通过这个方法处理,你的消息已经缓存到待发送Batch中。

6、如果batch正好满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。

  1. if (result.batchIsFull || result.newBatchCreated) {
  2. log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  3. this.sender.wakeup();
  4. }

至此Producer中send方法的主要代码逻辑已经分析完毕。下一小节我们看一下分拣员RecordAccumulator是如何实现的。

下一节《RecordAccumulator类代码分析》

-------------------------------------------------------------------------------------------------

附KafkaProducer类部分注释翻译:

producer 线程安全,跨线程共享一个producer对象,通常会比多个对象更快。

producer由一个buffer空间池组成,它容纳了没有被传输到server的数据。同时有一个后台 I/O线程负责把这些数据记录转化为reqeust,然后把他们发送给集群。如果producer用后未成功关闭,这些资源将被泄漏。

send()方法是异步的。当调用他添加记录到等待发送的数据缓冲区会立即得到返回。这允许producer把独立的消息打包起来,这样会更为高效。

ack配置项控制认为请求完成的条件。设置为“all”,数据全部提交前,是不会返回结果的,这是最慢,但是持久性最好的。

如果request失败,producer会自动重试,但是如果我们设置了retries为0,则不会重试。开启重试,会出现数据重复的可能性。

producer维护了每个partition对于未发送消息的缓冲区。缓冲区的大小通过batch.size配置项指定。设置的大一点,可以让batch更大,但是也需要更多的内存。

默认的,即使buffer还有未使用的空间,也是可以被立即发送出去的。然而,如果你想减少请求的次数,你可以设置linger.ms为大于0的值。这会让producer等待相应的时间,以让更多的数据到达batch后再发送。这个和Nagle在TCP里的算法是类似得。例如上面的代码片段,因为我们设置了linger为1ms,可能100条记录会在一次请求中发送出去。你也可以再加1ms,让请求等待更多的数据到达,如果我们的bufer还没填满的话。注意,相近时间到达的数据通常会在同一个batch中,即使linger.ms=0。所以,由于不计后果的linger设置,将会导致出现过重的数据负载batch。所以当你负载并不重,如果把linger设为大于0,会让请求更少,更高效,这只会造成很小的延迟。

buffer.memory控制producer可用的全部buffe的r内存大小。如果record发送的速度快于传输到server的速度,buffer将被耗尽。当buffer耗尽时,发送请求将被block。block的时长通过max.block.ms指定。在这个时间长度后,将会抛出TimeoutException

key.serialize和value.serializer用来指出如何转化key和value的值为byte。你可以使用内置的org.apache.kafka.common.serialization.ByteArraySerializer或者org.apache.kafka.common.serialization.StringSerializer处理简单的string或者byte类型

从kafka0.11开始,kafkaProducer支持两种新模式:幂等的producer和事务producer。幂等的producer让发送的语义从至少一次加强为仅一次。在特殊的producer重试时,不再会产生重复。事务性producer允许应用原子性发送消息给多个分区(及topic!)。

下一节《RecordAccumulator类代码分析》

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/618481
推荐阅读
相关标签
  

闽ICP备14008679号