当前位置:   article > 正文

kafka生产者RecordAccumulator源码解析_org.apache.kafka.clients.producer.internals.record

org.apache.kafka.clients.producer.internals.recordaccumulator

在上一篇中我们详细的讲解kafka生产者实例源码及send()方法的源码。并且知道消息最终添加到消息累加器RecordAccumulator 中。那接下来我们看看kafka是如何实现消息的累加和缓存的。类的主要定义和主要成员变量如下:

public final class RecordAccumulator {
	
    private final Logger log;
    //创建的时候设置为false
    private volatile boolean closed;
    //
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    //进行批处理时候的大小
    private final int batchSize;
    //压缩的类型
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    //用来对buffer进行复用
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    //根据每个主题的分区创建一个双向队列,向队列中存储消息的合集
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Map<TopicPartition, Long> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
    ....
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

在这里插入图片描述

从上面的源码中我们可以看出,消息累加器(RecoredAccumulator)使用concurrentHaspMap作为缓存消息的容器,其中key为消息所属的TopicPartition类,value为一个双向的ProducerBatch队列。主线程发送消息的时候往该队列的尾部添加消息,而Sender线程则负责从该队列的头部获取消息并发送到kafka的broker上。

1、消息累加器中的 核心方法解析

这里先分析:append方法,其源码如下:

//org.apache.kafka.clients.producer.internals.RecordAccumulator#append

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
			....省略一些非核心代码
        try {
            // check if we have an in-progress batch
            //1、根据主题分区获取一个可用的双向队列,如果没有则进行新建
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            //2、同步的尝试往队列中添加消息如果添加成功则直接返回
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            //3、尝试分配一个新的buffer来存储要发送的消息
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);
            //4、再次同步尝试向双向队列中添加消息,如果成功则返回。那么此时分配的buffer需要在finally方法中进行释放
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
				
                //5、还是不成功,那么就新建一个ProducerBatch,使用这个对象保存消息对象,并将其添加到队尾
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

上述源码的逻辑主要是:

  • 1、根据主题分区获取一个可用的双向队列,如果没有则进行新建
  • 2、同步的尝试往队列中添加消息如果添加成功则直接返回
  • 3、尝试分配一个新的buffer来存储要发送的消息
  • 4、再次同步尝试向双向队列中添加消息,如果成功则返回。那么此时分配的buffer需要在finally方法中进行释放
  • 5、还是不成功,那么就新建一个ProducerBatch,使用这个对象保存消息对象,并将其添加到队尾

从其中我们还可以看到最核心的是将消息添加到ProducerBatch对象中,那么ProducerBatch又是怎么实现的呢?

2、ProducerBatch 源码浅析

public final class ProducerBatch {

    private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);

    private enum FinalState { ABORTED, FAILED, SUCCEEDED }

    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;

    private final List<Thunk> thunks = new ArrayList<>();
    private final MemoryRecordsBuilder recordsBuilder;
    private final AtomicInteger attempts = new AtomicInteger(0);
    private final boolean isSplitBatch;
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);

    int recordCount;
    int maxRecordSize;
    private long lastAttemptMs;
    private long lastAppendTime;
    private long drainedMs;
    private boolean retry;
    private boolean reopened;
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

在这里插入图片描述

其核心方法为tryAppend():源码如下:

    /**
    org.apache.kafka.clients.producer.internals.ProducerBatch#tryAppend
    
     * Append the record to the current record set and return the relative offset within that record set
     *
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
        //1、判断recordsBuilder是否还有足够的空间,如果没有直接返回null
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
            //2、将消息添加到recordsBuilder中
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);
            // we have to keep every future returned to the users in case the batch needs to be
            // split to several new batches and resent.
            thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

上述代码的逻辑比较简单:

  • 1、首先判断是否还有足够的空间来存储当前的消息,没有则返回null。
  • 2、如果有则添加,并返回操作的异步对象
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/618395
推荐阅读
相关标签
  

闽ICP备14008679号