当前位置:   article > 正文

kafka源码阅读--producer核心流程(KafakProducer.java)send线程_waitonmetadata (kafkaproducer.java:882

waitonmetadata (kafkaproducer.java:882

Producer.java中的run()方法调用了producer.send方法,点进去send方法返回doSend。 doSend方法封装了producer的核心流程,对应前面画的那张流程图,之后还有一些细节的实现:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        /**
         * 步骤一:
         *      waitOnMetadata 同步等待拉取元数据
         *      maxBlockTimeMs  这个参数代表最多等待多久
         *      返回一个 ClusterAndWaitTime 的结果
         * */
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        // clusterAndWaitTime.waitedOnMetadataMs 代表拉取元数据用了多少时间
        // 最多等待多久的时间 - 拉取元数据用了多长时间 = 还剩余多少时间可以用  代表消息没有发送成功
        // 重新发送还可以有多少时间  超过这个时间会抛出异常
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        // 更新集群的元数据
        Cluster cluster = clusterAndWaitTime.cluster;
        /**
         *  步骤二:
         *      对消息的 key和value 进行序列化
         * */
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        }

        /**
         *  步骤三:
         *      根据分区器选择消息应该发送的分区
         * */
        int partition = partition(record, serializedKey, serializedValue, cluster);
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        /**
         *  步骤四:
         *       确认消息的大小是否超过了最大值
         *       KafkaProducer初始化的时候, 指定producer最大能发送的一条消息有多大
         *       默认是1M, 根据公司实际业务进行修改
         * */
        ensureValidRecordSize(serializedSize);
        /**
         *  步骤五:
         *      根据元数据信息, 封装分区对象
         *
         * */
        tp = new TopicPartition(record.topic(), partition);
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        /**
         *  步骤六:
         *      消息是异步发送, 给每一条消息绑定回调函数
         * */
        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
        /**
         *  步骤七:
         *      把消息放入RecordAccumulator, (32兆的一块内存)
         *      然后由accumulator把消息封装成一个一个批次去发送
         * */
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
        // 唤醒线程的条件:
        //          批次满了 或者 新创建一个批次
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            /**
             *  步骤八:
             *      唤醒sender线程, 这里是真正发送数据
             * */
            this.sender.wakeup();
        }
        return result.future;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

本节中值得学习的技术:
例如第四步中判断消息大小是否超过最大值的方法ensureValidRecordSize,点进去后发现, kafka自定义了自己的异常, 在底层代码中往上抛出, 在核心代码中捕获异常, 能让用户更好的知道在哪里出现了错误, 更利于错误排查。这种编程思想可用于很多项目。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/707569
推荐阅读
相关标签
  

闽ICP备14008679号