当前位置:   article > 正文

【Kafka源码分析】一、生产者Producer_producer closed while send in progress

producer closed while send in progress

一、Kafka生产者线程模型、整体流程

Kafka生产者在发送消息时主要存在两个线程:主线程Sender线程

  • 主线程即调用KafkaProducer.send方法的线程。当send方法被调用时,消息并没有真正被发送,而是暂存到RecordAccumulator
  • Sender线程在满足一定条件后,会去RecordAccumulator中取消息并发送到Kafka Server端。

通过这种暂存机制,可以提升Kafka的吞吐量

  1. 可以将多条消息通过一个ProduceRequest批量发送出去;
  2. 提高数据压缩效率(一般压缩算法都是数据量越大越能接近预期的压缩效果);

Kafka生产者发送消息的整体流程如下:

在这里插入图片描述

  1. 当接受外部传过来的数据的时候,会先创建一个main线程,在main线程中创建producer对象,然后调用send方法,将数据进行发送
  2. 消息会经过拦截器,对发送的数据进行处理、加工,再经过序列化器,对传输的数据进行序列化
  3. 根据分区器的分区策略决定传输的数据发送至哪个分区
  4. 通过消息暂存器将数据写入暂存区
    1. 消息暂存器维护了一个内存池(默认大小32M),并且未每一个分区维护了一个双端队列,队列中是一个个批次
    2. 写入数据时,会从内存池中取出内存,创建批次(默认大小16k)
  5. 当一个批次的数据大小积累到 batch.size 或者到达了延迟时间 linger.ms唤醒sender线程
  6. Sender线程从分区中拉取数据。拉取数据的方式是以brokerId为key,所有分区的请求为value放到队列中
  7. Sender线程通过selector发送数据,数据发送成功之后,会有应答机制,返回acks,应答级别有3种
    • 如果反馈回来的请求是成功,则会删除发送数据成功的请求以及清理分区中请求中拉取的数据(释放批次的内存,放回到内存池中)
    • 如果失败会进行重试,重试的次数(默认是Int的最大值,可以进行修改,一般是3-5次)
  8. 如果发送数据的第一个请求到达集群中的某一个broker没有应答,允许继续发送请求,默认每个broker节点最多缓存5个请求

由此,我们可以引出KafkaProducer的几个核心组件:

  • partitionner:分区选择器,决定将消息发送至Topic的哪个分区
  • accumulator:消息暂存器,负责暂存要发送的消息
  • sender:用于处理实际发送消息的业务逻辑,继承自Runnable
  • ioThread:实际负责处理发送消息的io线程,即sender使用的线程对象

接下来,我们将从主线程和Sender线程两个方面来分析KafkaProducer的消息发送过程

二、主线程

我们从该发送消息的方法入口进入阅读源码

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
  • 1
1.准备工作

在调用发送消息方法后,会先进行一些准备工作:

  1. 拦截器 ProducerInterceptors 在发送消息前进行拦截
  2. 获取集群元数据Cluster(包括目标Topic下有几个Partition,分别分布在哪些Broker上)
  3. 序列化(使用序列化器对消息的key和value做序列化处理)

这些准备工作不是我们这里关注的重点,感兴趣的可以自己去看一下源码

2.确认分区

做完准备工作后,生产者接下来要开始发送消息了,首先就需要确认消息发送给topic下的哪个partition这

这里就用到了上文提到过的的一个组件——partitioner

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    ......
    // 确定目标Partition
    int partition = partition(record, serializedKey, serializedValue, cluster);
    ......
}

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 若ProducerRecord中强制指定了partition, 则以该值为准
    Integer partition = record.partition();
    // 否则调用Partitioner动态计算对应的partition
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

在创建KafkaProducer时,可以通过"partitioner.class"配置来指定Partitioner的实现类。若未指定,则使用Kafka内置实现类——DefaultPartitioner。来看DefaultPartitioner的具体实现:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        // 如果没指定key,会调用stickyPartitionCache缓存,发送到该topic某个特定的partition上
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // 如果指定了key,则根据key的hash值确定一个分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

StickyPartitionCache实际就是一个缓存,其内部维护了一个 ConcurrentMap,用以保存某topic应该发送到哪个分区

public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }

    public int partition(String topic, Cluster cluster) {
        Integer part = indexCache.get(topic);
        if (part == null) {
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;

		// 这里的prevPartition用于第一次写失败了,第二次会尽量换一个分区
        if (oldPart == null || oldPart == prevPartition) {
            // 从集群的元数据中获取可以使用的分区
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
            	// 只有一个分区则使用当前分区
                newPart = availablePartitions.get(0).partition();
            } else {
            	// 保证最后一定换了一个分区
                while (newPart == null || newPart.equals(oldPart)) {
                    // 使用ThreadLocalRandom来随机获取一个可用的分区,
                    // 不同线程可能会产生不同的newPart,但是putIfAbsent保证设置时只会设置一次,从而保证线程安全
                    Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
            	// 多个线程也只会设置一次
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

通过以上代码,可以知道DefaultPartitioner的分区策略如下:

  1. 若指定了key,则通过key来hash到一个partition。(只要key相同就可以保证消息顺序发送!!!
  2. 若未指定key,则在Topic下多个Partition间随机选取一个,并尽可能一直使用该分区;当第一次写入分区失败了,Kafka会再随机一个分区进行使用(和上一次的分区不同)
3.将消息写入暂存区RecordAccumulator

RecordAccumulator作为消息暂存者,其思想是将目的地Partition相同的消息放到一起并按一定的"规格"(由"batch.size"配置指定)划分成多个"批次"(ProducerBatch)然后以批次为单位进行数据压缩&发送。示意图如下:
在这里插入图片描述
因此每一个TopicPartition都对应一个Batch队列,RecordAccumulator中使用一个ConcurrentHashMap进行存储:ConcurrentMap<TopicPartition, Deque> batches;

RecordAccumulator有两个核心方法,分别对应"存"和"取":

  • public RecordAppendResult append():主线程会调用此方法追加消息
  • public Map<Integer, List> drain():Sender线程会调用此方法提取消息

来看看append方法

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers,
           Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {
        
        // appendsInProgress是一个AtomicInteger,用于记录正在进行存操作的线程数
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;

        try {
            // 从batches中获取该partition对应的队列,没有则创建,ConcurrentHashMap会保证多个线程获取的是同一个队列
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            // 以 partition 为粒度进行同步的存操作!!!!!
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                // 尝试将内容写入batch,如果写入成功则直接返回
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    return appendResult;
            }

            // 如果写失败了,如果不允许创建新的批次,则先返回,等待下一次append
            //(第一次写默认为true,不允许创建新批次;第二次写默认为false)
            if (abortOnNewBatch) {
                // 第二个参数表示批次是否已满,第三个参数表示是否创建了新的批次
                return new RecordAppendResult(null, false, false, true);
            }

            // 如果允许创建新的批次:
            // 开始在缓冲区内分配合适的空间
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);

            // 更新当前时间
            nowMs = time.milliseconds();
            // 以partition为粒度开始第二次写入
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                // 开始第二次写入,看看这时候空间释放了没有
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null) {
                    return appendResult;
                }

                // 在缓冲区上创建batch,在该batch上append,返回一个future
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));

                // 将新创建的批次放入队列中和未完成的批次集合中
                dq.addLast(batch);
                incomplete.add(batch);

                buffer = null;
                // 将该future存入RecordAppendResult中返回,第三个参数为true表示已经创建了新的批次
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
        	// 写入完毕释放临时缓冲区
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

来看 tryAppend 方法,就是先从队列中获取最后一个批次ProducerBatch,只要批次存在,就在这个批次中写入数据,返回一个future,主题逻辑跟上面一样,都是通过ProducerBatch进行写数据

     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
        }
        return null;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

那么具体是如何在这个批次上进行数据写入的呢,我们来看 ProducerBatch 的 tryAppend 方法

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        // 首先检查该recordsBuilder是否有充足的空间写入
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
            // 在recordsBuilder上写入数据(包含数据压缩),并返回写入数据的CRC
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);

            // 更新最大记录大小和更新时间
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;
            
            // 返回这条写入记录的Future,其中produceFuture为这个ProducerBatch公用的Future,
            // recordCount为这条记录的偏移量,checksum为这条记录的CRC
            // 此外,还包括了这条记录key和value的大小,通过这些数据可以唯一确认和校验一条记录!!!!
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);
            
            thunks.add(new Thunk(callback, future));
            // 更新偏移量
            this.recordCount++;
            return future;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

可以看到 ProducerBatch又通过recordsBuilder进行数据写入,这里就不再继续往下看了,内部实际就是对要发送的消息进行了数据压缩,并通过计算得到这批数据的CRC校验值。

写完后返回了这条数据的RecordMetadata,其中包含了这条数据的偏移量,校验值,内容长度,通过这些数据可以唯一确认和校验一条记录,此外,还包含了各个批次一个公有的produceFuture,这个Future是什么稍后再看

append完成后,我们再回到 KafkaProducer的 doSend()方法

// 如果在上面创建了新的批次,result里的abortForNewBatch为false
// 如果在上面不允许创建批次(默认情况),接着往下走
if (result.abortForNewBatch) {
                int prevPartition = partition;
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

                result = accumulator.append(tp, timestamp, serializedKey,
                	// 第二次写abortOnNewBatch参数修改为false
                    serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            // 处理事务
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            // 如果发现批次以及满了或者创建了新的批次,则唤醒sender线程!!!!
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

            // 返回该batch的future
            return result.future;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

可以看到,如果我们这一批写满了或者新的批次被创建出来了(实际就是上一批写满了),就会去唤醒Sender线程,告诉它我们暂存的数据够多了,该把这些数据发送给Server了!

最后,发送完毕后,返回的是我们上面方法创建的FutureRecordMetadata

那么这个返回的future到底是什么,它实际就是我们使用消费者线程发送消息后返回的Future<RecordMetadata>,RecordMetadata包含了消息发送的分区,分配的offset和消息的时间戳

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
  • 1

来看看他内部包含的信息是如何写入的:
首先,在使用 ProducerBatch 写信息前,会在构建函数中将该Batch对应的topicPartition放入Future中

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
    ...
    this.produceFuture = new ProduceRequestResult(topicPartition);
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5

之后,在消息发送完毕后,也会将该消息对应的偏移量、logAppendTime, exception写入到该Future中。

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);
}
  • 1
  • 2
  • 3
  • 4

由此,就可以通过该Future获取我们所需要的信息

三、Sender线程

以上主线程执行完毕,消息被写入了topicpartition的一个又一个batch中。Sender线程就开始负责发送数据了

Sender线程需要用到一个基础通信类NetworkClient,NetworkClient中有一个核心属性 Selectable selector,进行网络通讯时的 send和 poll 操作都是通过 selector实现的,org.apache.kafka.common.network.Selector 内部则通过 java.nio.channels.Selector 来实现,因此 Kafka内部本质是通过NIO进行网络通讯的

在KafkaProducer中,和Sender线程相关的有两个属性:在这里插入图片描述
他们在 KafkaProducer 构造函数中被创建:

this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
        return new Sender(logContext,
                client,
                metadata,
                this.accumulator,
                maxInflightRequests == 1,
                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
                metricsRegistry.senderMetrics,
                time,
                requestTimeoutMs,
                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

来看Sender的run方法,实际就是通过sendProducerData方法发送消息

public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    while (running) {
        try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
    .....
}

void runOnce() {
    ... ...
    // 1. 发送请求,并确定下一步的阻塞超时时间
    long pollTimeout = sendProducerData(now);
    // 2. 处理端口事件,poll的timeout为上一步计算结果
    client.poll(pollTimeout, now);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

来看sendProducerData方法:

  1. 先去找集群中准备好的Server节点
Cluster cluster = metadata.fetch();

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  • 1
  • 2
  • 3

进入accumulator的ready方法

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            Deque<ProducerBatch> deque = entry.getValue();
            // 以分区为粒度,同步地获取各个分区准备好的Server节点
            synchronized (deque) {
                // 先检查队列里是否有数据,没有数据直接跳过
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    TopicPartition part = entry.getKey();
                    // 寻找集群中该分区对应的Leader节点
                    Node leader = cluster.leaderFor(part);
                    if (leader == null) {
                        unknownLeaderTopics.add(part.topic());
                    } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            // 没问题则将Leader节点加入准备好的Server节点的集合中
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 判断与这些准备好的Server节点的连接是否可用
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            // 检查客户端Client是否能连接到远程Server节点,连接不上就先移除
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }        

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 从分区队列中获取批次batch及其内部存储的数据
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
  • 1
  • 2

进入drain方法

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            // 把准备好的节点的batch返回
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
            batches.put(node.id(), ready);
        }
        return batches;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

drainBatchesForOneNode()方法会去找准备好的节点存储的分区,然后找到分区对应的队列,从队列的头部取出批次,然后将该批次放入返回结果,并关闭该batch,释放批次占用的空间

返回的结果为<Node,List<ProducerBatch>>形式,其中Node表示Kafka集群的broker节点

Deque<ProducerBatch> deque = getDeque(tp);
            if (deque == null)
                continue;

            synchronized (deque) {
                ProducerBatch first = deque.peekFirst();
                if (first == null)
                    continue;

                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                
                if (backoff)
                    continue;

                // 控制发送批次数据的大小
                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                    // there is a rare case that a single batch size is larger than the request size due to
                    // compression; in this case we will still eventually send this batch in a single request
                    break;
                } else {
                    if (shouldStopDrainBatchesForPartition(first, tp))
                        break;

                    boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
                    ProducerIdAndEpoch producerIdAndEpoch =
                        transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                    ProducerBatch batch = deque.pollFirst();
                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
                        batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                        transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                        log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                                "{} being sent to partition {}", producerIdAndEpoch.producerId,
                            producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                        transactionManager.addInFlightBatch(batch);
                    }

                    batch.close();
                    size += batch.records().sizeInBytes();
                    ready.add(batch);

                    batch.drained(now);
                }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

有个疑问,drain获取的批次一定是放满数据的批次吗?谁明白的可以给我解答一下

  1. Sender会进一步将结果封装成<Node,Request>形式,这里的Request是指Kafka的各种协议请求
  2. 请求在从Sender发往Kafka之前还会保存到InFlightRequest中,InFlightRequest保存的形式为 Map<NodeId,Deque<Request>>,主要作用是缓存已经发出去但还没有收到响应的请求InFlightRequest提供了许多管理类的方法,如:
  • 通过 max.in.flight.request.per.connect参数限制每个连接最多缓存的请求数。
  • 通过比较 Deque<Request>的size来判断对应的Node是否已经堆积了很多未响应的消息,表明该Node节点负载较大或网络连接有问题。因此进而可以选择负载最小的节点leastLoadedNode发送请求使它能够尽快发出。
  1. 发送数据
........
sendProduceRequests(batches, now);
return pollTimeout;
  • 1
  • 2
  • 3

取出批次后 sendProduceRequests 会将batch中的信息构建成 ClientRequest 通过 KafkaClient 的 send方法将消息发送至指定的partition,在发送完毕后会有一个 callback 来进行异步的处理

由此一来,Kafka整个消息的发送逻辑就完成了

参考文献:https://zhuanlan.zhihu.com/p/371361083
https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/618474
推荐阅读
相关标签
  

闽ICP备14008679号