赞
踩
博主:爱码叔
个人博客站点: [icodebook](https://icodebook.com/)
公众号:爱码叔漫画软件设计(搜:爱码叔)
专注于软件设计与架构、技术管理。擅长用通俗易懂的语言讲解技术。对技术管理工作有自己的一定见解。文章会第一时间首发在个站上,欢迎大家关注访问!
目录:
-------------------------------------------------------------------
<上一节《你绝对能看懂的Kafka源代码分析-Kafka Producer设计分析》
上一小节,我们从设计角度对producer进行了讲解,我们也从宏观上了解了producer的设计原理,接下来我们将进入代码的分析
----------------------------------------------------------------------
客户端发送消息时调用KafkaProducer的send方法,所以我们先分析KafkaProducer,再层层深入。
KafkaProducer相当于整个快递公司的总控员,操作收件员收件,命令分拣员进行分拣,最终通知货车可以发车了。货车发货则在另外一个线程中进行。
消息发送的顶层逻辑都在KafkaProducer中。在看代码前,我先宏观介绍下KafkaProducer中send方法的主流程,帮助代码理解,括号中以发快递类比:
send方法的整体逻辑如上,每一步其实都和我们真实场景相对应,这就是设计的巧妙之处。
接下来我们进入代码部分,我们先看下KafkaProducer中的部分重要属性
- private final Partitioner partitioner;
- 说明:分区选择器,根据一定策略,选择出消息要发往的分区
- private final int maxRequestSize;
- 说明:消息最大程度,包括了消息头、序列化后的key和value
- private final long totalMemorySize;
- 说明:单个消息发送的缓冲区大小
- private final Metadata metadata;
- 说明:kafka集群元数据
- private final RecordAccumulator accumulator;
- 说明:前面所说的分拣员,维护不同分区的batch,负责分拣消息。等待sender来发送
- private final Sender sender;
- 说明:发送消息,实现Runable,ioThread中启动运行
- private final Thread ioThread;
- 说明:运行sender的线程,负责发送消息到kafka集群
- private final CompressionType compressionType;
- 说明:压缩算法类型,gzip等。消息数量越多,压缩效果越好
- private final Serializer<K> keySerializer;
- 说明:key的序列化器
- private final Serializer<V> valueSerializer;
- 说明:value的序列化器
- private final ProducerConfig producerConfig;
- 说明:生产者的配置
- private final long maxBlockTimeMs;
- 说明:等待更新kafka集群元数据的最大时长
- private final ProducerInterceptors<K, V> interceptors;
- 说明:发送前消息钱,要先通过一组拦截器的处理。也可以先于用户的callback预处理
KafkaProducer构造器会初始化上面的属性。另外在构造函数最后可以看到启动了ioThread。
- this.sender = newSender(logContext, kafkaClient, this.metadata);
- String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
- this.ioThread.start();
接下来我们看一下send方法代码:
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- // intercept the record, which can be potentially modified; this method does not throw exceptions
- ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
- return doSend(interceptedRecord, callback);
- }
ProducerInterceptors对象中维护了List<ProducerInterceptor<K, V>> interceptors。在onSend方法中,会循环这个列表,调用每个ProducerInterceptor的onSend方法做预处理。
拦截器处理后,再调用doSend方法。发送的主要逻辑都在doSend方法中,我按照上面介绍的发送主流程结合代码来讲解。这里不粘贴doSend方法的整段代码,大家自行参考源代码。
1、判断producer是否可用。
可以看到一进来就调用了throwIfProducerClosed()方法,这个方法里逻辑如下:
- private void throwIfProducerClosed() {
- if (ioThread == null || !ioThread.isAlive())
- throw new IllegalStateException("Cannot perform operation after producer has been closed");
- }
很简单,就是在检查io线程状态。
2、判断这个topic的metadata是否可用。
代码如下:
- ClusterAndWaitTime clusterAndWaitTime;
- try {
- clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
- } catch (KafkaException e) {
- if (metadata.isClosed())
- throw new KafkaException("Producer closed while send in progress", e);
- throw e;
- }
主要逻辑在waitOnMetadata方法中。这里不展开讲,方法中做的事情是从缓存获取元数据中topic的信息,判断是否可用,如果缓存中存在分区并且请求分区在范围内,则直接返回cluster信息,否则发送更新请求。再判断分区是否正常并且请求更新的分区在此topic分区的范围内。这个方法的返回值是Cluster原数据以及所花费的时长。
3、对key和value进行序列化。
主要是如下两行代码
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
3、获取消息要发往的分区编号
- int partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
TopicPartition对象实际上只是封装了topic和partition,那么消息的发送地址就齐全了。
4、计算序列化后大小
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
- compressionType, serializedKey, serializedValue, headers);
- ensureValidRecordSize(serializedSize);
ensureValidRecordSize方法中验证size是否未超过maxRequestSize及totalMemorySize。
5、通过RecordAccumulator把消息加入batch中
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs);
accumulator.append方法中做分拣逻辑处理,后面会重点讲解RecordAccumulator。这里我们只需要知道通过这个方法处理,你的消息已经缓存到待发送Batch中。
6、如果batch正好满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
至此Producer中send方法的主要代码逻辑已经分析完毕。下一小节我们看一下分拣员RecordAccumulator是如何实现的。
-------------------------------------------------------------------------------------------------
附KafkaProducer类部分注释翻译:
producer 线程安全,跨线程共享一个producer对象,通常会比多个对象更快。
producer由一个buffer空间池组成,它容纳了没有被传输到server的数据。同时有一个后台 I/O线程负责把这些数据记录转化为reqeust,然后把他们发送给集群。如果producer用后未成功关闭,这些资源将被泄漏。
send()方法是异步的。当调用他添加记录到等待发送的数据缓冲区会立即得到返回。这允许producer把独立的消息打包起来,这样会更为高效。
ack配置项控制认为请求完成的条件。设置为“all”,数据全部提交前,是不会返回结果的,这是最慢,但是持久性最好的。
如果request失败,producer会自动重试,但是如果我们设置了retries为0,则不会重试。开启重试,会出现数据重复的可能性。
producer维护了每个partition对于未发送消息的缓冲区。缓冲区的大小通过batch.size配置项指定。设置的大一点,可以让batch更大,但是也需要更多的内存。
默认的,即使buffer还有未使用的空间,也是可以被立即发送出去的。然而,如果你想减少请求的次数,你可以设置linger.ms为大于0的值。这会让producer等待相应的时间,以让更多的数据到达batch后再发送。这个和Nagle在TCP里的算法是类似得。例如上面的代码片段,因为我们设置了linger为1ms,可能100条记录会在一次请求中发送出去。你也可以再加1ms,让请求等待更多的数据到达,如果我们的bufer还没填满的话。注意,相近时间到达的数据通常会在同一个batch中,即使linger.ms=0。所以,由于不计后果的linger设置,将会导致出现过重的数据负载batch。所以当你负载并不重,如果把linger设为大于0,会让请求更少,更高效,这只会造成很小的延迟。
buffer.memory控制producer可用的全部buffe的r内存大小。如果record发送的速度快于传输到server的速度,buffer将被耗尽。当buffer耗尽时,发送请求将被block。block的时长通过max.block.ms指定。在这个时间长度后,将会抛出TimeoutException
key.serialize和value.serializer用来指出如何转化key和value的值为byte。你可以使用内置的org.apache.kafka.common.serialization.ByteArraySerializer或者org.apache.kafka.common.serialization.StringSerializer处理简单的string或者byte类型
从kafka0.11开始,kafkaProducer支持两种新模式:幂等的producer和事务producer。幂等的producer让发送的语义从至少一次加强为仅一次。在特殊的producer重试时,不再会产生重复。事务性producer允许应用原子性发送消息给多个分区(及topic!)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。