当前位置:   article > 正文

kafka producer 源码解析_kafka mavencentral

kafka mavencentral

一、源码搭建

kafka源码编译,版本信息如下

环境版本
kafka0.10.1.0
scala2.11
gradle3.1

修改源码 build.gradle 文件添加:

ScalaCompileOptions.metaClass.daemonServer = true
ScalaCompileOptions.metaClass.fork = true
ScalaCompileOptions.metaClass.useAnt = false
ScalaCompileOptions.metaClass.useCompileDaemon = false
  • 1
  • 2
  • 3
  • 4

添加 maven 仓库地址

repositories {
    maven {
      url 'https://maven.aliyun.com/nexus/content/groups/public/'
    }
    maven {
      url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'
    }
    mavenCentral()
    jcenter()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

开始编译为 idea 项目

gradle idea
  • 1

二、Producer 创建流程

本节从一个生产者 demo 开始入手,从上帝视角大体浏览一下生产者都有哪些核心组件以及消息是如何被发送出去的。并在之后的章节对每一个核心组件和流程进行详细剖析。下面是一个生产者 demo

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Test {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test", "111"), (metadata, exception) -> System.out.println("回调"));
        }

        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

从创建一个 KafkaProducer 开始,看一下生产者在初始化的时候都做了什么。如果构建了源码 kafka 生产者源码在 clients 模块下(kafka客户端源码为java,服务端源码为scala)

在阅读 KafkaProducer 构造方法之前,先看一段类注释

A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
  • 1
  • 2

注释告诉我们 KafkaProducer 是线程安全的,线程共享的效率要高于多实例。因此可以预测 KafkaProducer 源码一定有对线程安全做处理的代码。

找到最一般性的构造方法

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer){}
  • 1

这部分代码可以分为两个部分:配置解析、组件初始化

2.1 核心配置解析

这里将涉及配置解析的代码单独提取出来如下:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer){
  try {
    log.trace("Starting the Kafka producer");
    Map<String, Object> userProvidedConfigs = config.originals();
    this.producerConfig = config;
    this.time = new SystemTime();
    // TODO  在一个 jvm,如果有多个生产者,client-id
    clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);// client.id,客户端id
    if (clientId.length() <= 0)
      clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
    // TODO  ============== 监控指标,可以不用管 ================>>>
    Map<String, String> metricTags = new LinkedHashMap<String, String>();
    metricTags.put("client-id", clientId);
    MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
      .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
      .tags(metricTags);
    List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                                                    MetricsReporter.class);
    reporters.add(new JmxReporter(JMX_PREFIX));
    // 监控的指标
    this.metrics = new Metrics(metricConfig, reporters, time);
    // TODO  <<<==============================
    // TODO 核心组件1:分区器,用于决定消息被路由到topic的哪个分区
    this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
    // 重试间隔 100ms
    long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
    // 生产者最大发送的字节数,注意是一次的request的batch的size,不是一个消息size,默认 1m
    this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
    // buffer memory 大小,默认 32m
    // 缓冲区满会阻塞 max.block.ms 时间,默认 60 s
    this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
    // 压缩
    this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
    /* check for user defined settings.
     * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
     * This should be removed with release 0.9 when the deprecated configs are removed.
     */
    if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
      log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
               "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
      boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
      if (blockOnBufferFull) {
        this.maxBlockTimeMs = Long.MAX_VALUE;
      } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
        log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
        this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
      } else {
        this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
      }
    } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
      log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
               "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
      this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
    } else {
      this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
    }

    /* check for user defined settings.
     * If the TIME_OUT config is set use that for request timeout.
     * This should be removed with release 0.9
     */
    if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
      log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
               ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
      this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
    } else {
      // 请求超时时间,默认30s
      this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    }
    // bootstrap.servers
    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

    // 序列化器
    if (keySerializer == null) {
      this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                        Serializer.class);
      this.keySerializer.configure(config.originals(), true);
    } else {
      config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
      this.keySerializer = keySerializer;
    }
    if (valueSerializer == null) {
      this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                          Serializer.class);
      this.valueSerializer.configure(config.originals(), false);
    } else {
      config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
      this.valueSerializer = valueSerializer;
    }

    // load interceptors and make sure they get clientId
    userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                                                                                                                              ProducerInterceptor.class);
    // 拦截器
    this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

    config.logUnused();
    AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
    log.debug("Kafka producer started");
  } catch (Throwable t) {
    // call close methods if internal objects are already constructed
    // this is to prevent resource leak. see KAFKA-2121
    close(0, TimeUnit.MILLISECONDS, true);
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka producer", t);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

第一个有意思的点体现在线程安全上,kafka 为每个生产者都分配一个 client-id(如果配置文件没有指定的话),通过 AtomicInteger 的 getAndIncrement 方式获取到自增的 id 并拼接上 producer-,该参数主要用于监控场景作为指标的标识,之后会尝试获取一个核心参数整理如下:

retry.backoff.ms:消息发送失败的重试间隔,默认值 100ms

metadata.max.age.ms:元数据更新时间,默认是 5min,注意这个配置只是客户端定时去同步元数据(topic、partition、replica、leader、follow、isr等),但并不是说一定是定时同步,大胆猜测一下当生产者发送消息时发现待发送的topic元数据没有缓存此时一定会去拉取一次且一定是阻塞模式;当 broker 发生变化触发元数据改变也会去拉取一次

max.request.size:一次请求最大的数据量,默认值 1M;提前剧透一下,这里的数据量限制不是说单挑消息的大小,而是一次请求,kafka在发送消息时会将多条消息打包成一个 RecordBatch,且一个分区生成一个 RecordBatch,因分区大概率是在不同的 broker 中,因此 kafka 会将若干个 RecordBatch 按照 broker 打包成一个 request,这里的数据量是对 request 的限制

buffer.memory:缓冲区大小,默认值 32M;提前剧透一下,生产者对消息有打包的过程,在没有达到打包条件时生产者会将消息缓存在缓冲区中,当缓存的数据量超过该值生产者会阻塞,直到达到阻塞时间的最大值

compression.type:压缩,默认值 none

max.block.ms:最大阻塞时间,默认值 60s。这里的时间从调用 send 方法开始一直到消息被发送的时间,包括上述说的缓冲区满产生的阻塞,以及元数据拉取时的阻塞都是包含在内,可以理解为一次完整的消息发送包含的时间

batch.size:批次大小,默认值 16K;就是上面说的 RecordBatch 的大小,这个参数非常重要在源码有多处体现,对效率有极大的影响

linger.ms:两次发送的时间间隔,默认值 0;两次发送没有间隔即来一条发送一条,这个参数主要防止当消息过小迟迟达不到 batch.size 的打包条件,导致数据延迟;因此这个配置在生产上建议配置,默认值导致生产者没有打包的行动极大地影响吞吐量,但又不能过大,过大会影响数据的时效性。通常的参考标准时根据 batch.size 估算数据量达到一个 batch 的时间

connections.max.idle.ms:连接最大空闲时间,默认值 9min;为了减轻客户端服务端的压力,对于长时间不活跃的连接会根据 lru 算法进行关闭

max.in.flight.requests.per.connection:每个连接允许没有响应的最大请求数,默认值 5;消息发送成功后得到响应前的请求会被放置在内部的 in-flight 数组中,当得到响应后(无论是成功还是失败)会被从这里移除,特别是当消息发送失败后进行重试,因为不知道服务端什么时候接收成功,当该值大于 1 时会存在消息乱序的情况(即使topic只有一个分区)。

reconnect.backoff.ms:连接重试间隔,默认值 5ms

认为比较重要的参数都做了必要的说明,到这里生产者核心的参数就解析完了,后续的各个组件初始化都是借助这些参数进行。

2.2 核心组件初始化

2.2.1 MetaData

元数据,在构造方法中有两行代码的体现

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
// TODO 核心行为:初始化的时候拉取元数据,第一次只是初始化,并不会拉取数据
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
  • 1
  • 2
  • 3

在创建元数据组件时传入重试次数和拉取的时间间隔,它的逻辑在 metadata.max.age.ms 已经提到过了,后续会有一个更新的动作,从代码逻辑来看只是做了一个简单的初始化并没有在创建的时候就拉取集群元数据,这里可以思考一下如果是你你会在什么时间什么条件下开始拉取、怎么拉取(同步还是异步)、拉取多少(全量拉取还是按需拉取)

public synchronized void update(Cluster cluster, long now) {
  // TODO  Kafka生产者初始化的时候会不会去拉取元数据?
  // 初始化只是初始化了集群元数据,第一次 cluster 主题分区都是空的集合
  this.needUpdate = false;
  this.lastRefreshMs = now;
  this.lastSuccessfulRefreshMs = now;
  this.version += 1;

  for (Listener listener: listeners)
    listener.onMetadataUpdate(cluster);

  // Do this after notifying listeners as subscribed topics' list can be changed by listeners
  this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

  notifyAll();
  log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

上面解释可以出一个面试题:Kafka生产者初始化的时候会不会去拉取元数据?

2.2.2 RecordAccumulator

内存缓冲区,这里包含对大多数生产者的核心代码如:消息如何被添加到缓冲区,数据如何被打包,内存如何复用等。但初始化并不是很难

// 核心参数 RecordAccumulator 负责消息复杂的缓冲机制
// batch.size 同一个分区的数据会被打包成一个batch,一个broker上多个分区对应的多个batch会被打包成一个request
// batch.size 太小会降低吞吐量,设置为0则一条消息发送一次
// batch.size 太大会在本地缓存大量数据
// 默认 16 k
// TODO  核心组件3:缓冲区
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                         // 缓冲区大小,默认 32m
                                         this.totalMemorySize,
                                         // 压缩
                                         this.compressionType,
                                         // 两次 request 间隔,如果消息非常小,很久都没有积累到 batch.size,如果到了 linger 时间就直接发送出去
                                         // 默认 0 ms
                                         config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                         retryBackoffMs,
                                         metrics,
                                         time);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.2.3 NetworkClient

网络客户端,生产者和服务端交互的组件负责所有的网络请求包括:数据的发送、消息的响应等。提前剧透一下这个组件使用存 Java NIO 编写,可以称为工业级 NIO 模板对于网络编程有很好的参考价值。因此看这部分源码之前请务必理解 Java NIO 的知识,需要达到可以实现简单 C/S 通信模型

// 网络通信组件,它是构建 SocketChannel 的组件同时封装了加密,如:SSL
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
// TODO 核心组件4:网络通信客户端
NetworkClient client = new NetworkClient(
  // 每个broker连接最大空闲时间 默认 9 分钟,超时连接会被回收
  new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
  this.metadata,
  clientId,
  // 每个broker最多接收五个没有响应的request,会存在消息乱序的问题
  config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
  // 重试连接时间间隔5毫秒
  config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  // socket 发送缓冲区大小,默认 128k
  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  // socket 接收缓冲区大小,默认 32k
  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  this.requestTimeoutMs, time);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.2.4 Sender

生产者唯一的线程(主线程除外)

// TODO 核心组件5:sender 线程,从缓冲区获取数据发送到broker
this.sender = new Sender(client,
                         this.metadata,
                         this.accumulator,
                         config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                         // 一次请求消息最大值,默认 1m
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         // ack 默认 1,只要 leader 写入成功
                         (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                         // request 重试次数,默认0
                         config.getInt(ProducerConfig.RETRIES_CONFIG),
                         this.metrics,
                         new SystemTime(),
                         clientId,
                         // 请求超时时间 30s
                         this.requestTimeoutMs);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

可以看出 Sender 封装了上述提到的所有组件同时实现了 Runnable 接口。因此它的 run 方法将是我们研究生产者与服务端交互的入口。

同时关于如何封装 Runnable 和启动线程,kafka 的做法值得我们去借鉴(copy)

String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动 sender 线程
this.ioThread.start();


public class KafkaThread extends Thread {

    private final Logger log = LoggerFactory.getLogger(getClass());

    public KafkaThread(final String name, Runnable runnable, boolean daemon) {
        super(runnable, name);
        setDaemon(daemon);
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                log.error("Uncaught exception in " + name + ": ", e);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

可以看出 kafka 封装线程类的思路是:通过继承 Thread 类,将必要的参数封装在自定义的类中并将核心参数通过 super 传给父类,最后调用 start 方法启动 Sender。可以预测 Sender 的 run 方法一定是一个不断轮训的逻辑

还有一个无关紧要的组件如:分区器、拦截器;可以自己看一下并不是很难,因为这两个都是可以自定义相对而言没有那么神秘

下面通过一个图画一下上述的组件,后续的核心逻辑都会基于此图进行(先大致这么布局,后续会根据执行逻辑进行调整,这里主要去体现组件的封装关系)

image-20220711204129991

三、Metadata

了解了 KafkaProducer 的初始化过程后接下来就是从 send 方法开始,阅读一下消息的发送过程;本节重点在过程中体会元数据的变化,涉及消息的发送、网络活动将在下面章节涉及

3.1 从 send 开始

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  // intercept the record, which can be potentially modified; this method does not throw exceptions
  // 回调自定义的拦截器
  ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
  return doSend(interceptedRecord, callback);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

首先消息会走自定义的拦截器,对原始消息做进一步的包装,核心逻辑在 doSend() 中

// TODO  maxBlockTimeMs 这个参数决定一次 send() 在这个时间长度内必须返回,比如遇到网络波动、缓冲区满等
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
  • 1
  • 2

这一行代码是元数据的重点,首先获取到当前消息的 topic,并且传入 maxBlockTimeMs,这个时间就是上面提到的最大的阻塞时间;仅从这两个参数可以生产者拉取元数据是按需拉取的(不然就不需要传这个topic信息),接下来重点分析 waitOnMetadata。

private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
  // add topic to metadata topic list if it is not there already.
  if (!this.metadata.containsTopic(topic))
    // TODO  本地还没有该 topic 元数据缓存加入集合中
    this.metadata.add(topic);

  if (metadata.fetch().partitionsForTopic(topic) != null)
    return 0;

  long begin = time.milliseconds();
  long remainingWaitMs = maxWaitMs;
  while (metadata.fetch().partitionsForTopic(topic) == null) {
    log.trace("Requesting metadata update for topic {}.", topic);
    int version = metadata.requestUpdate();
    // TODO  唤醒 sender 线程,异步拉取元数据
    sender.wakeup();
    // TODO  同步等待,当元数据更新后 version + 1,判断 version 是否改变即可,同时最多等待 remainingWaitMs
    metadata.awaitUpdate(version, remainingWaitMs);
    long elapsed = time.milliseconds() - begin;
    if (elapsed >= maxWaitMs)
      throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
    if (metadata.fetch().unauthorizedTopics().contains(topic))
      throw new TopicAuthorizationException(topic);
    remainingWaitMs = maxWaitMs - elapsed;
  }
  // TODO  返回剩余可以阻塞的时间
  return time.milliseconds() - begin;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

第一步:判断 topic 有没有被缓存,如果没有被缓存则加入到 metadata 这个属性集合中,表示我需要去拉取这个 topic 的元数据

第二步:fetch() 其实就是返回 Cluster 对象,可以得知第一次发送消息 Cluster 一定是空的(生产者初始化的时候只是对其里面的各种集合赋初值),这里有个细节:如果在这里下一次发现这个 topic 已经在 Cluster 缓存了直接返回 0,表示此时获取元数据没有耗时,不占用最大的阻塞时间

第三步:获取当前时间作为开始时间,之后开启一个 while 循环直到当前的 topic 信息被获取到 partitionsForTopic 返回当前 topic 的分区信息退出循环。这里我们先只关注这个时间参数的变化。

long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
  throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
  throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

elapsed 表明上面三行代码的耗时,如果大于 maxWaitMs 表明上面的操作已经超时了此时直接报错(很合理);如果发现拉取到的 topic 元数据没有权限也会抛出异常,最后 maxWaitMs - elapsed 表明当前的一次循环后还剩多久,直到成功获取到元数据后退出循环再次 time.milliseconds() - begin 返回,即:本次元数据获取总共耗时多久,返回还可以阻塞的最大时间。因此真正元数据的获取是在我们忽略的三行代码中。

第四步:调用 requestUpdate,本质是将 needUpdate 置为 true(似乎在告诉某个人说我需要更新元数据了)同时返回当前的版本信息;之后调用 sender.wakeup() 唤醒 sender 线程。因此生产者的元数据本质上是 Sender 来进行拉取的(因为只有 Sender 内部封装了用于网络请求的 NetWork 组件),当唤醒 sender 线程后调用 awaitUpdate 方法等待元数据的更新。awaitUpdate 源码如下:

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
  if (maxWaitMs < 0) {
    throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
  }
  long begin = System.currentTimeMillis();
  long remainingWaitMs = maxWaitMs;
  while (this.version <= lastVersion) {
    if (remainingWaitMs != 0)
      // TODO  等待 sender 拉取元数据,sender 完成拉取一定会去唤醒
      wait(remainingWaitMs);
    long elapsed = System.currentTimeMillis() - begin;
    if (elapsed >= maxWaitMs)
      throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
    remainingWaitMs = maxWaitMs - elapsed;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

第五步:可以猜测当 sender 拉取完元数据后一定将 version 加一;awaitUpdate 逻辑和上面几乎一样,通过 while 循环不断的判断 version 是否增加,并等待最大的可用阻塞时间即 wait(remainingWaitMs),之后的超时判断是一样的逻辑。同时可以断定当 Sender 更新完元数据后一定会去唤醒它。

因此,生产者元数据更新的逻辑是:同步阻塞,异步更新,按需更新元数据。

注:这里两个部分都是用到 while,皆在解决多线程的问题,因为更新元数据阻塞被唤醒时不一定是因为拉取到当前topic想要的(可能是其他topic的元数据),因此需要不断的 while 判断是不是自己的元数据,有点 CAS 的味道

到这里似乎元数据就已经结束了,因为如果要继续追元数据的更新则就需要从 Sender 的 run 开始了(只有它一个线程),同时这部分涉及到客户端与服务端的交互(客户端发送拉取元数据请求,服务端返回元数据信息)本质上和消息的发送在网络交互上没有什么区别,因此 sender 是如何拉取元数据放在后面专门的网络交互章节(涉及到大量工业级 NIO 处理)。

假设现在元数据已经更新好了,即阻塞的 waitOnMetadata 方法执行完了返回 waitedOnMetadataMs,这个值就是留给我们后续操作的时间(最大阻塞时间-拉取元数据等待的时间)

之后的操作有

序列化 key 和 value 这部分不是重点

byte[] serializedKey;
try {
  serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
  throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                                   " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                                   " specified in key.serializer");
}
// 序列化 value
byte[] serializedValue;
try {
  serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
  throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                                   " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                                   " specified in value.serializer");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

计算分区,后面可以独立出来一小节讲一下默认分区器在没有指定分区是如何实现 RoundRobin 算法

int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
  • 1

最后就是将消息放入缓冲区,当缓冲区 batch 满或者创建了新的 batch 再次唤醒 sender。这里剧透一下唤醒 sender 的目的是让 sender 将满足条件的 batch 发送出去,本质上和元数据的逻辑没有区别。

// TODO 将消息添加到内存缓冲区 buffer memory 中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
// batch 满了或者 新的batch在创建 唤醒 sender
if (result.batchIsFull || result.newBatchCreated) {
  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  this.sender.wakeup();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

因此该部分的流程图如下:

image-20220712214058081

3.2 元数据的数据结构

上面一直说元数据感觉这个东西很高大上、很抽象,但所谓的元数据就是一组 java 集合,即源码为 Cluster 类属性如下

private final boolean isBootstrapConfigured;
private final List<Node> nodes; // 一个broker节点
private final Set<String> unauthorizedTopics;// 没有授权的topic
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// topic 分区信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;// 每个 topic 有哪些分区
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; // 每个 topic 有哪些可用的分区
private final Map<Integer, List<PartitionInfo>> partitionsByNode; // 每个 broker 放了哪些分区
private final Map<Integer, Node> nodesById; // broker-id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

同时 PartitionInfo 封装了 topic、leader、partition号、replicas、isr

public class PartitionInfo {

  private final String topic;
  private final int partition;
  private final Node leader;
  private final Node[] replicas;
  private final Node[] inSyncReplicas;

  public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
    this.topic = topic;
    this.partition = partition;
    this.leader = leader;
    this.replicas = replicas;
    this.inSyncReplicas = inSyncReplicas;
  }
  // 省略 getter/setter
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

该部分值得参考的点在于缓存的存储设计。

3.3 默认分区器如何实现 round-robin

当我们在构建 ProducerRecord 没有指定分区也没有指定 key 时生产者为了避免数据倾斜采用 round-robin 算法将消息竟可能分散到多个分区中,下面我们看一下默认分区器 DefaultPartitioner 的逻辑

首先看到的是内部的一个属性和一个方法

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
private static int toPositive(int number) {
  return number & 0x7fffffff;
}
  • 1
  • 2
  • 3
  • 4

一个线程安全的 Integer 和一个最高效的取绝对值方法(这个位运算背下来可以装X)

而核心的分区逻辑就是实现的 partition 方法

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  // 获取所有分区数量
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    // 原子类递增
    int nextValue = counter.getAndIncrement();
    // 获取可用分区
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      // 有可用分区时,对获取到的递增int算一个正整数(如果本身就是正整数就返回本身),与可用分区数取模
      int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
      // 返回
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return DefaultPartitioner.toPositive(nextValue) % numPartitions;
    }
  } else {
    // hash the keyBytes to choose a partition
    // 计算指定key的hash值与分区数取模
    return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

首先获取当前 topic 缓存的元数据(能走到这里元数据一定缓存了)即:当前 topic 的分区信息;随后获取到 topic 的分区个数之后就会分 key 存不存的情况。

情况一:key 不存在,走 round-robin 算法

注意到原子类整型的初始值是一个随机值,因为如果是一个固定值假如每次只发一条数据后就重启,那么算法就失去了轮训的意义。之后自增一判断有没有可用的分区,对获取到的原子类整型取绝对值(随机值可能是负数)与可用分区取模来获取分区数。随后当有更多的消息走到分区器都会自增一从而达到轮训的效果;当没有可用的分区这里就与所有的分区数取模,猜测这里是期望在真正发送的时候该分区可能使用。

情况二:key 存在,key 的 hash 与分区取模

这类情况就是对 key 去 hash 后取正数与分区数取模。

四、RecordAccumulate

4.1 消息放入 accumulate 前做了什么

上节说到消息已经知道发送到 topic 的哪个分区,在发送到 accumulate 前还有一处代码需要我们关注一下

int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
// 检查本次消息是否超出请求大小,以及内存缓冲
ensureValidRecordSize(serializedSize);
  • 1
  • 2
  • 3

即:kafka 的消息格式,当前版本的源码是 0.10.1.0 处于消息格式 V1 阶段

public static final int CRC_OFFSET = 0;
public static final int CRC_LENGTH = 4;
public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
public static final int MAGIC_LENGTH = 1;
public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
public static final int ATTRIBUTE_LENGTH = 1;
public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
public static final int TIMESTAMP_LENGTH = 8;
public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
public static final int KEY_SIZE_LENGTH = 4;
public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
public static final int KEY_OFFSET_V1 = KEY_SIZE_OFFSET_V1 + KEY_SIZE_LENGTH;
public static final int VALUE_SIZE_LENGTH = 4;

public static int recordSize(byte[] key, byte[] value) {
  return recordSize(key == null ? 0 : key.length, value == null ? 0 : value.length);
}

public static int recordSize(int keySize, int valueSize) {
  return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

因此当前版本的消息格式如下:

  • crc:校验码,确保消息在传输过程中不会被篡改;4byte
  • magic:消息的版本,V0 存 0,V1 存 1;1byte
  • attribute:属性字段,当前版本只存储压缩类型;1byte
  • timestamp:消息时间戳,V1 版本才有;8byte
  • key length:存储 key 的字节数;4byte
  • key:具体消息具体计算,如果是 null 长度为 0,否则为字节数组长度
  • value length:存储 value 的字节数;4byte
  • value:具体消息具体计算,如果是 null 长度为 0,否则为字节数组长度

注:V0 版本的消息格式没有 timestamp 字段,因此按时间删除消息存在bug(按照segment的修改时间,但是linux部分操作会改变这个时间),因此 V1 版本加入时间字段修复了这个bug;但 V1 版本的消息仍存在缺陷,例如无论 key 或者 value 是否有值都会用 4byte 存储它们的长度,在 V2 版本的消息格式通过可变长度来解决。感兴趣的可以研究一下 0.11.0.0 之后的源码

因此 V1 版本的消息格式如下:

image-20220713105623083

同时消息的具体写入在 Record 的 write 中

public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
  // write crc
  compressor.putInt((int) (crc & 0xffffffffL));
  // write magic value
  compressor.putByte(CURRENT_MAGIC_VALUE);
  // write attributes
  compressor.putByte(attributes);
  // write timestamp
  compressor.putLong(timestamp);
  // write the key
  if (key == null) {
    compressor.putInt(-1);
  } else {
    compressor.putInt(key.length);
    compressor.put(key, 0, key.length);
  }
  // write the value
  if (value == null) {
    compressor.putInt(-1);
  } else {
    int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
    compressor.putInt(size);
    compressor.put(value, valueOffset, size);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

4.2 accumulate 数据结构

在消息放入 accumulate 之前我们再来看一下 RecordAccumulate 的数据结构,这些前置知识将有助于理解后面的操作。从组件的初始化开始

// TODO  核心组件3:缓冲区
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                         // 缓冲区大小,默认 32m
                                         this.totalMemorySize,
                                         // 压缩
                                         this.compressionType,
                                         // 两次 request 间隔,如果消息非常小,很久都没有积累到 batch.size,如果到了 linger 时间就直接发送出去
                                         // 默认 0 ms
                                         config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                         retryBackoffMs,
                                         metrics,
                                         time);
public RecordAccumulator(int batchSize,
                         long totalSize,
                         CompressionType compression,
                         long lingerMs,
                         long retryBackoffMs,
                         Metrics metrics,
                         Time time) {
  this.drainIndex = 0;
  this.closed = false;
  this.flushesInProgress = new AtomicInteger(0);
  this.appendsInProgress = new AtomicInteger(0);
  this.batchSize = batchSize;
  this.compression = compression;
  this.lingerMs = lingerMs;
  this.retryBackoffMs = retryBackoffMs;
  this.batches = new CopyOnWriteMap<>();
  String metricGrpName = "producer-metrics";
  this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
  this.incomplete = new IncompleteRecordBatches();
  this.muted = new HashSet<>();
  this.time = time;
  registerMetrics(metrics, metricGrpName);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

对于传入的 batchSize、totalSize、lingerMs、retryBackoffMs 需要对变量名混个眼熟分别对应:批次大小、内存缓冲区容量、批次发送间隔时间、重试间隔时间;本节不需要关注后面的两个时间(时间与网络请求有关),最最最重要的是 batches 和 free,其中 batches 初始化了一个 CopyOnWriteMap 其属性定义如下

private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
  • 1

可以看出 batches 是一个线程安全的 Map,观察它的两个泛型可以推断:生产者缓存消息是按照 topic 的分区来缓存的,一个分区一个 Deque,这个 Deque 的主要优势在于操作头尾元素比较友好,因为生产者在构建新的批次数据一定是放在所有数据的尾部遵循 FIFO,而消息的发送则是去头部元素,失败重试也是要将消息放回头部优先下一次发送。同时对于线程安全的 Map 生产者实现了自己的 Map 即 CopyOnWriteMap;我们知道写时复制在解决读多写少的并发场景有很好的体验(写数据的时候会复制一个副本,基于副本进行修改然后通过加锁进行原件的替换),而这种场景恰好符合生产者的场景。只有在新的 topic 分区加入才会触发写操作。

下一个核心属性 free 属性定义如下

private final BufferPool free;

public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
  // poolableSize 就是 batchSize
  this.poolableSize = poolableSize;
  this.lock = new ReentrantLock();
  // 进行内存空间复用
  this.free = new ArrayDeque<ByteBuffer>();
  this.waiters = new ArrayDeque<Condition>();
  this.totalMemory = memory;
  this.availableMemory = memory;
  this.metrics = metrics;
  this.time = time;
  this.waitTime = this.metrics.sensor("bufferpool-wait-time");
  MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                             metricGrpName,
                                             "The fraction of time an appender waits for space allocation.");
  this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

这是生产者内存复用的核心组件,而 accumulate 的内存上限 32MB 就是 BufferPool 的容量,更底层就是 this.free = new ArrayDeque<ByteBuffer>() 中所有 ByteBuffer 的容量和(这里的 ByteBuffer 就是 NIO 里的)

4.3 第一次发送消息

4.3.1 分区队列的安全创建

从 append() 开始探究消息如何被放入 batch 中

// TODO 将消息添加到内存缓冲区 buffer memory 中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
  • 1
  • 2

这个方法几乎每一行都是重点,当然方法第一行除外

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/952429
推荐阅读
相关标签