赞
踩
上一讲中,我们大概的分析了下KafkaProducer消息发送流程,本节将从源码的角度深入分析消息发送过程。
消息发送之前的准备工作都已经在客户端KafkaProducer的构造器中完成,包括:配置项加载、序列器初始化、消息收集器初始化、消息发送线程初始化等等。而消息的发送入口在send()方法中:
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
拦截器ProducerInterceptor是一个接口,我们可以自定义实现,在客户端KafkaProducer的构造器中会去查找ProducerInterceptor的实现并加载到集合中:
// 客户端KafkaProducer构造器初始化拦截器
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
在消息发送之前,我们可以使用拦截器对消息进行处理:
ProducerInterceptors.onSend()
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { // 自定义的拦截器执行逻辑 interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // 省略... } } return interceptRecord; }
一般没有拦截处理的逻辑就不需要实现该接口。
Metadata封装了Kafka集群Cluster对象,并且保存Cluster数据的最后更新时间、版本号、是否需要更新数据等字段。
由于集群中分区数量、Leader副本是可能随时变化的,所以在发送消息之前,需要确认发送到topic对应的metadata的分区是可用的(有可能过期或者不存在等等),返回最新的集群数据。
requestUpdateForNewTopics
,将needUpdate设置为true表示需要更新最新数据;metadata.awaitUpdate
等待sender线程更新完元数据;private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // metadata添加当前topic,不存在说明是新topic,需要更新最新数据 metadata.add(topic); // 从元数据中获取topic对应的分区信息, cluster维护了topic与分区的关系、leader/follower等关系 Cluster cluster = metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); // 如果集群中分区存在并且, 大于之前的分区(说明是最新的数据),直接返回一个ClusterAndWaitTime,包含了cluster信息 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); // 如果cluster不存在分区信息或者是过期数据,就唤醒sender线程去更新metadata数据 long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; do { log.trace("Requesting metadata update for topic {}.", topic); metadata.add(topic); int version = metadata.requestUpdate(); // 唤醒sender线程, 更新metadata sender.wakeup(); try { // 阻塞等待元数据更新完 metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); }
再回到Sender线程中,看看Sender线程是如何更新Metadata中的cluster数据的:
主线程通过wakeup()
唤醒Sender线程,间接调用间接唤醒 NetworkClient 的poll()
去服务端拉取Cluster信息:
NetworkClient.poll(long timeout, long now):
public List<ClientResponse> poll(long timeout, long now) {
// ...
// 判断是否需要更新Metadata
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。