赞
踩
private KafkaTemplate<String, Object> kafkaTemplate;
……
……
//发送消息,回调结果
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);
追踪send方法后,我们将目标锁定在一个doSend方法身上。
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(inTransaction(),
"No transaction is in process; "
"possible solutions: run the template operation within the scope of a "
"template.executeInTransaction() operation, start a transaction with @Transactional "
"before invoking the template method, "
“run in a transaction started by a listener container when consuming a record”);
}
final Producer<K, V> producer = getTheProducer();
this.logger.trace(() -> "Sending: " + producerRecord);
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
这段代码的核心是下面这行代码:
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
它首先构造了一个Producer对象,这个Producer是一个接口类,KafkaProducer实现了这个接口,所以此处的Producer对象即为本文的重点内容——KafkaProducer。接着这个producer调用了一个极为重要的send方法,它的第二个参数buildCallback方法返回的是一个Callback接口,用于异步回调结果。本文将围绕KafkaProducer的构造及这个send方法进行分析。
下面我们将从这行代码正式进入生产者的源码探析!!
一、KafkaProducer构造函数
===================
下面是KafkaProducer的构造方法的部分核心源码,先来简单的看看KafkaProducer构造生产者对象时都干了些什么:首先利用参数中的配置信息创建了配置对象,接着利用这个配置对象获取到了partitioner(分区器)、keySerializer(key序列化器)、valueSerializer(value序列化器)、interceptors(拦截器)……,这些都是KafkaProducer类的私有常量,后面都会用到它们;接着创建RecordAccumulator(消息收集器),后面KafkaProducer调用send方法后会用将消息数据存入其中;再然后创建更新Kafka集群的元数据;最后通过newSender()方法创建Sender线程并启动。
KafkaProducer(Map<String, Object> configs,
Serializer keySerializer,
Serializer valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
//创建生产者配置对象
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
…………………………
//通过反射机制获取到partitioner(分区器)、keySerializer(key序列化器)、valueSerializer(value序列化器)
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
//获取interceptors拦截器,之后KafkaProducer调用send方法后会用到该拦截器
List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
//创建RecordAccumulator(消息收集器),之后KafkaProducer调用send方法后会用将消息数据存入其中
this.accumulator = new RecordAccumulator(……);
//创建更新Kafka集群的元数据
List addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses, time.milliseconds());
}
//创建Sender线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
//启动Sender对应的线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
……………………
} catch (Throwable t) {
……………………
}
}
再来看一下是如何创建Sender线程的。
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
………………
//创建NetworkClient,这是Kafka网络I/O的核心
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(………………), ………………);
………………
//返回Sender线程对象
return new Sender(………………);
}
去掉一些非核心代码,发现newSender方法要做的事情其实很简单:创建NetworkClient,这是Kafka网络I/O的核心,在后面发送消息请求时会用到;最后创建Sender对象,Sender实现了Runnable接口,是个线程类。至于Sender线程都做了什么我们现在并不需要太关心,毕竟本文的主角并不是它,我们把它留着在后面的文章中单独分析。
二、send方法探析
==========
构造完KafkaProducer对象之后,接着就会调用它的send方法,所以下面我开始关注send方法。
@Override
public Future send(ProducerRecord<K, V> record, Callback callback) {
// 拦截器,可在发送消息之前对消息进行拦截修改
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
可以看到在发送消息之前,我们可以利用之前获取的拦截器对消息进行拦截修改,然后调用了一个doSend方法,该方法将会完成更新kafka集群元数据信息、对Key和Value进行序列化、分区选择、追加消息到RecordAccumulator消息累加器中、唤醒Sender线程的操作。下面将围绕这些内容进行分析。
ProducerInterceptors其实是一个ProducerInterceptor拦截器的集合,它的onSend方法只不过是在循环遍历这些拦截器,并调用每个拦截器的onSend方法,源码如下:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
//循环遍历拦截器
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
//调用每个拦截器的onSend方法
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
………………
}
return interceptRecord;
}
ProducerInterceptor是一个接口,所以如果我们需要写自己的拦截逻辑时,只需要去实现这个接口,将自己的拦截逻辑放在onSend方法中即可。
消息经过拦截修改后进入到doSend方法,若没有指定分区,后面将会使用Cluster信息计算分区号,因此在此之前需要获取最新的Cluster集群信息。下面是doSend方法中涉及到元数据信息更新的代码部分,其余部分省略。
private Future doSend(ProducerRecord<K, V> record, Callback callback) {
………………
ClusterAndWaitTime clusterAndWaitTime;
try {
//等待元数据更新
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
………………
}
//获取到Cluster集群最新信息
Cluster cluster = clusterAndWaitTime.cluster;
………………
//计算分区号
int partition = partition(record, serializedKey, serializedValue, cluster);
………………
}
进入waitOnMetadata方法源码,可以看到这里的逻辑主要是判断metadata中的元数据信息是否需要更新,当需要更新时,则通过do-while循环进行更新,其中核心部分是通过**metadata.awaitUpdate()**方法阻塞当前线程,等待Sender线程向远程服务器发起元数据更新请求,直到远程服务器返回了新的元数据信息才唤醒当前线程,最终返回最新的cluster元数据信息。
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
//通过metadata获取cluster信息, metadata之前已经在KafkaProducer构造方法中获取到
Cluster cluster = metadata.fetch();
……………………
//将topic加入到metadata中进行维护
metadata.add(topic);
//从cluster信息中获取topic的分区数
Integer partitionsCount = cluster.partitionCountForTopic(topic);
//如果partitionsCount不为空则说明metadata中已经维护了该topic的元数据,并且需要更新的分区号未定义或者在已知的分区范围内
//则直接返回metadata中的cluster信息
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
………………
//如果metadata中没有维护该topic的元数据,或者需要更新的分区号是新的时,则进行metadata的更新。
//do-while循环更新
do {
…………
//将topic加入到metadata中进行维护
metadata.add(topic);
//获取当前元数据版本号
int version = metadata.requestUpdate();
//唤醒sender线程
sender.wakeup();
try {
//阻塞等待元数据更新结束
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
……………………
}
//拿到更新后的集群信息
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
//检测超时时间
if (elapsed >= maxWaitMs) {
……………………
}
……………………
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
//返回更新后的cluster信息
return new ClusterAndWaitTime(cluster, elapsed);
}
Kafka发送的消息是在网络上进行传输,所以,doSend方法还会通过keySerializer和valueSerializer将我们的消息进行序列化。producer端需要序列化,consumer端需要反序列化。下面是doSend方法中涉及到消息序列化的代码部分,其余部分省略。
private Future doSend(ProducerRecord<K, V> record, Callback callback) {
………………
byte[] serializedKey;
try {
//使用keySerializer将key进行序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
………………
}
byte[] serializedValue;
try {
//使用valueSerializer将value进行序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
………………
}
………………
}
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
最后,强调几点:
我个人觉得面试也像是一场全新的征程,失败和胜利都是平常之事。所以,劝各位不要因为面试失败而灰心、丧失斗志。也不要因为面试通过而沾沾自喜,等待你的将是更美好的未来,继续加油!
以上面试专题的答小编案整理成面试文档了,文档里有答案详解,以及其他一些大厂面试题目。
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
面的项目也非常重要,这很可能是面试官会大量发问的地方,所以在面试之前好好回顾一下自己所做的项目;
我个人觉得面试也像是一场全新的征程,失败和胜利都是平常之事。所以,劝各位不要因为面试失败而灰心、丧失斗志。也不要因为面试通过而沾沾自喜,等待你的将是更美好的未来,继续加油!
以上面试专题的答小编案整理成面试文档了,文档里有答案详解,以及其他一些大厂面试题目。
[外链图片转存中…(img-l36wuCVt-1713458991327)]
[外链图片转存中…(img-yo5dgpoa-1713458991328)]
[外链图片转存中…(img-psfHEtHV-1713458991330)]
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。