赞
踩
在上一篇中我们详细的讲解kafka生产者实例源码及send()方法的源码。并且知道消息最终添加到消息累加器RecordAccumulator
中。那接下来我们看看kafka是如何实现消息的累加和缓存的。类的主要定义和主要成员变量如下:
public final class RecordAccumulator { private final Logger log; //创建的时候设置为false private volatile boolean closed; // private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; //进行批处理时候的大小 private final int batchSize; //压缩的类型 private final CompressionType compression; private final int lingerMs; private final long retryBackoffMs; private final int deliveryTimeoutMs; //用来对buffer进行复用 private final BufferPool free; private final Time time; private final ApiVersions apiVersions; //根据每个主题的分区创建一个双向队列,向队列中存储消息的合集 private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. private final Map<TopicPartition, Long> muted; private int drainIndex; private final TransactionManager transactionManager; private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. .... }
从上面的源码中我们可以看出,消息累加器(RecoredAccumulator)使用concurrentHaspMap作为缓存消息的容器,其中key为消息所属的TopicPartition类,value为一个双向的ProducerBatch队列。主线程发送消息的时候往该队列的尾部添加消息,而Sender线程则负责从该队列的头部获取消息并发送到kafka的broker上。
这里先分析:append方法,其源码如下:
//org.apache.kafka.clients.producer.internals.RecordAccumulator#append public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { ....省略一些非核心代码 try { // check if we have an in-progress batch //1、根据主题分区获取一个可用的双向队列,如果没有则进行新建 Deque<ProducerBatch> dq = getOrCreateDeque(tp); //2、同步的尝试往队列中添加消息如果添加成功则直接返回 synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch //3、尝试分配一个新的buffer来存储要发送的消息 byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); //4、再次同步尝试向双向队列中添加消息,如果成功则返回。那么此时分配的buffer需要在finally方法中进行释放 synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } //5、还是不成功,那么就新建一个ProducerBatch,使用这个对象保存消息对象,并将其添加到队尾 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
上述源码的逻辑主要是:
从其中我们还可以看到最核心的是将消息添加到ProducerBatch对象中,那么ProducerBatch又是怎么实现的呢?
2、ProducerBatch
源码浅析
public final class ProducerBatch { private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class); private enum FinalState { ABORTED, FAILED, SUCCEEDED } final long createdMs; final TopicPartition topicPartition; final ProduceRequestResult produceFuture; private final List<Thunk> thunks = new ArrayList<>(); private final MemoryRecordsBuilder recordsBuilder; private final AtomicInteger attempts = new AtomicInteger(0); private final boolean isSplitBatch; private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); int recordCount; int maxRecordSize; private long lastAttemptMs; private long lastAppendTime; private long drainedMs; private boolean retry; private boolean reopened; ... }
其核心方法为tryAppend():源码如下:
/** org.apache.kafka.clients.producer.internals.ProducerBatch#tryAppend * Append the record to the current record set and return the relative offset within that record set * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { //1、判断recordsBuilder是否还有足够的空间,如果没有直接返回null if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } else { //2、将消息添加到recordsBuilder中 Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
上述代码的逻辑比较简单:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。