当前位置:   article > 正文

【Kafka精进系列006】KafkaProducer消息发送源码解析

【Kafka精进系列006】KafkaProducer消息发送源码解析

上一讲中,我们大概的分析了下KafkaProducer消息发送流程,本节将从源码的角度深入分析消息发送过程。

消息发送之前的准备工作都已经在客户端KafkaProducer的构造器中完成,包括:配置项加载、序列器初始化、消息收集器初始化、消息发送线程初始化等等。而消息的发送入口在send()方法中:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
   
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1、拦截器

拦截器ProducerInterceptor是一个接口,我们可以自定义实现,在客户端KafkaProducer的构造器中会去查找ProducerInterceptor的实现并加载到集合中:

// 客户端KafkaProducer构造器初始化拦截器
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
  • 1
  • 2
  • 3

在消息发送之前,我们可以使用拦截器对消息进行处理:

ProducerInterceptors.onSend()

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
   
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
   
        try {
   
            //  自定义的拦截器执行逻辑
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
   
            // 省略...
        }
    }
    return interceptRecord;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

一般没有拦截处理的逻辑就不需要实现该接口。

2、元数据更新,获取最新Cluster集群数据

Metadata封装了Kafka集群Cluster对象,并且保存Cluster数据的最后更新时间、版本号、是否需要更新数据等字段。

由于集群中分区数量、Leader副本是可能随时变化的,所以在发送消息之前,需要确认发送到topic对应的metadata的分区是可用的(有可能过期或者不存在等等),返回最新的集群数据。

  • metadata添加当前topic,不存在说明是新topic,调用requestUpdateForNewTopics,将needUpdate设置为true表示需要更新最新数据;
  • 尝试获取topic对应分区的详细信息,存在即返回集群信息cluster;
  • 不存在最新topic的分区信息,唤醒sender线程更新Metadata中保存的Kafka集群元数据Cluster.
  • 调用metadata.awaitUpdate等待sender线程更新完元数据;
  • 若未获取到topic对应的分区信息,继续循环上面2个步骤,直至超时抛出异常;
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
   
  //  metadata添加当前topic,不存在说明是新topic,需要更新最新数据
  metadata.add(topic);
    // 从元数据中获取topic对应的分区信息, cluster维护了topic与分区的关系、leader/follower等关系
    Cluster cluster = metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    // 如果集群中分区存在并且, 大于之前的分区(说明是最新的数据),直接返回一个ClusterAndWaitTime,包含了cluster信息
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    // 如果cluster不存在分区信息或者是过期数据,就唤醒sender线程去更新metadata数据
    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;
    do {
   
        log.trace("Requesting metadata update for topic {}.", topic);
        metadata.add(topic);
        int version = metadata.requestUpdate();
        // 唤醒sender线程, 更新metadata
        sender.wakeup();
        try {
   
           // 阻塞等待元数据更新完
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
   
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        }
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        if (cluster.unauthorizedTopics().contains(topic))
            throw new TopicAuthorizationException(topic);
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null);

    if (partition != null && partition >= partitionsCount) {
   
        throw new KafkaException(
                String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    }

    return new ClusterAndWaitTime(cluster, elapsed);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

再回到Sender线程中,看看Sender线程是如何更新Metadata中的cluster数据的:

主线程通过wakeup()唤醒Sender线程,间接调用间接唤醒 NetworkClient 的poll()去服务端拉取Cluster信息:

NetworkClient.poll(long timeout, long now):

public List<ClientResponse> poll(long timeout, long now) {
   
    // ...
    
    // 判断是否需要更新Metadata
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
   
        this.selector.poll(Utils
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/839431
推荐阅读
相关标签
  

闽ICP备14008679号