赞
踩
目录:
-------------------------------------------------------------------
通过上文分析,我们知道RecordAccumulator是缓存待发送消息的地方,KafkaProducer把消息放进来,当消息满了的时候,通知sender来把消息发出去,释放空间。RecordAccumulator就相当于货运站的仓储,货物不断的往里放,每装满一箱就会通知发货者来取货运走。如下图所示:
从上图可以看到,至少有一个业务主线程和一个sender线程同时操作RecordAccumulator,所以他必须是线程安全的。
下面我们来详细分析RecordAccumulator。
我们直接一头扎入程序的设计和代码,会有一定的理解难度。我还是先以真实世界的某个事物做类比来入手。
前文说RecordAccumulator是一个累积消息的仓库,那么我们就拿快递仓库来类比,看看RecordAccumulator是个怎样的仓库,看下图:
上图是一个快递站的仓库,堆满了货物。分拣员在这里工作。我们可以看到发往不同目的地的大货箱放置在各自对应的区域,分拣员把不同目的地的包裹放入对应目的地的大货箱,每装满一箱就放置在对应的堆放区域。
分拣员工作流程如下:
以上就是分拣员所做的工作,分拣员是谁呢?分拣员就是RecordAccumulator!而那些大货箱以及各自所属的堆放区域,就是RecordAccumulator中缓存消息的地方。所有封箱的大货箱都会等待sender来取货发送出去。
如果你看懂了上面这张图,那么你已经充分理解了RecordAccumulator的设计,后面已经不用继续看了!开个玩笑~不过上图确实完全反映了RecordAccumulator设计和运行的基本逻辑。
我们总结下仓库里有什么:
记住这些概念,这些仓库里的东西最终都会体现在代码里。
下面我们来真正讲解RecordAccumulator的设计。
RecordAccumulator实现了接收消息,然后以主题分区为单元,把消息以ProducerBatch为单位累积缓存。多个ProducerBatch保存在Deque队列中。当Deque中最新的batch已不能容纳消息时,就会创建新的batch来继续缓存,并将其加入Deque。
RecordAccumulator缓存消息的存储结构如下:
RecordAccumulator内部存储消息使用的容器是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,通过上图可以看到消息以主题分区划分存储单元。消息实际是放在ProducerBatch中。ProducerBatch相当于一个个箱子,箱子上写着收件地址:xx主题xx分区。当一个ProducerBatch箱子装满时,就会封箱贴上封条,然后在对应的队列里生成一个新的ProducerBatch,来放置本主题分区新的消息。
由此可见,RecordAccumulator累积消息的过程,就是把消息装进不同收件地址箱子(ProducerBatch),装满封箱,堆放起来(加入Deque<ProducerBatch>),然后继续产生新箱子装消息的过程。
每次封箱操作后都会返回可以发货的结果给调用者,调用者KafkaProducer再唤醒sender把已经封箱的ProducerBatch发送出去。
图中可以看到,消息真实存储的地方是DataOutputStream。ProducerBatch内部有一个MemoryRecordsBuilder对象,图中未画,而DataOutputStream在MemoryRecordsBuilder中。三者关系:ProducerBatch-->MemoryRecordsBuilder-->DataOutputStream。
接下来对RecordAccumulator的代码分析,主要围绕以下三个类:
1、RecordAccumulator
消息累积器的顶层逻辑,维护存放消息的容器
2、ProducerBatch
封装MemoryRecordsBuilder,并且有很多控制用的信息及统计信息
3、MemoryRecordsBuilder
消息真正累积存放的地方
append()方法是RecordAccumulator暴露的累积消息入口,KafkaProducer通过此接口累积消息。我们也先从此方法开始层层递进,分析累积消息的逻辑。
代码如下:
- public RecordAppendResult append(TopicPartition tp,
- long timestamp,
- byte[] key,
- byte[] value,
- Header[] headers,
- Callback callback,
- long maxTimeToBlock) throws InterruptedException {
- // We keep track of the number of appending thread to make sure we do not miss batches in
- // abortIncompleteBatches().
- appendsInProgress.incrementAndGet();
- ByteBuffer buffer = null;
- if (headers == null) headers = Record.EMPTY_HEADERS;
- try {
- // check if we have an in-progress batch
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null)
- return appendResult;
- }
-
- // we don't have an in-progress record batch try to allocate a new batch
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
- buffer = free.allocate(size, maxTimeToBlock);
- synchronized (dq) {
- // Need to check if producer is closed again after grabbing the dequeue lock.
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
-
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
- return appendResult;
- }
-
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
-
- dq.addLast(batch);
- incomplete.add(batch);
-
- // Don't deallocate this buffer in the finally block as it's being used in the record batch
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
- }
- } finally {
- if (buffer != null)
- free.deallocate(buffer);
- appendsInProgress.decrementAndGet();
- }
- }
代码的主要逻辑如下:
1、appendsInProgress.incrementAndGet();
首先通过原子操作,把追加消息的线程计数器+1
2、 Deque<ProducerBatch> dq = getOrCreateDeque(tp);
获取主题分区对应的ProducerBatch队列。getOrCreateDeque方法中判断如果没有队列,则新建队列。
3、RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
同步代码块中尝试去做一次追加操作tryAppend(),如果成功就直接返回追加的结果对象。tryAppend方法中逻辑是:
a)如果dq中有ProducerBatch则往新一个batch中追加
a.1)追加不成功,说明最新的batch空间不足,返回null。需要外层逻辑创建新的batch
a.2)追加成功,返回RecordAppendResult
b)dq中无producerBatch,返回null,代表没有能追加成功。
第一个到达此方法的线程肯定是返回了null,因为还没有消息累积进来,也不存在ProducerBatch对象。
如果tryAppend返回null,说明没能直接在现有的batch上追加成功(也可能还没有batch),此时需要初始化新的ProducerBatch
4、预估size大小,从BufferPool申请ByteBuffer。
如果BufferPool空间不足就轮询等待。大家可以自己看一下BufferPool的代码,这里就不展开讲了,他的目的是控制总的内存消耗以及实现ByteBuffer复用。
5、再次tryAppend
由于第4步代码不在同步代码块中,所以接下来的同步代码块中,首先需要再次调用tryAppend,因为可能别的线程已经创建了本主题分区新的ProducerBatch,那么消息直接追加成功。
6、创建ProducerBatch
如果上一步返回null,说明还是没有可用的ProducerBatch,我们需要创建新的ProducerBatch。首先构建MemoryRecordsBuilder对象,真正做消息累加的就是这个对象,每个ProducerBatch都有一个MemoryRecordsBuilder的引用。
7、真正去追加消息
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
首先调用ProducerBatch的tryAppend()方法,这是真正做消息追加的地方,内部通过MemoryRecordsBuilder实现。后续再详细分析。
然后把新的ProducerBatch放入队列中
8、把batch放入incomplete集合
incomplete本质是个存放未完成发送batch的Set
9、释放BufferPool空间
10、累积消息完成后的处理
finally代码块中再最终确保释放BufferPool空间,然后通过原子操作把追加消息的线程计数器-1
通过以上分析,我们捋清楚了RecordAccumulator追加消息的主逻辑,其中有两个步骤还需要展开代码说一下,我们先说RecordAccumulator中的tryAppend方法,也就是上面步骤中的第3步。
代码如下:
- private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
- Callback callback, Deque<ProducerBatch> deque) {
- ProducerBatch last = deque.peekLast();
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
- if (future == null)
- last.closeForRecordAppends();
- else
- return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
- }
- return null;
- }
逻辑上文已经讲过,这里再对照代码多说两句:
方法如果返回null,说明没有可以成功追加此消息的ProducerBatch,有两种情况:
如果能取得队列中最新的batch,并且能够成功追加消息,那么就会返回RecordAppendResult。
append方法返回对象RecordAppendResult,代码如下:
- public final static class RecordAppendResult {
- public final FutureRecordMetadata future;
- public final boolean batchIsFull;
- public final boolean newBatchCreated;
-
- public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
- this.future = future;
- this.batchIsFull = batchIsFull;
- this.newBatchCreated = newBatchCreated;
- }
- }
我们可以看到里面有异步发送的Future对象,此外还有两个标识,batchIsFull代表batch是否满了,newBatchCreated代表是否本次append增加了新的batch。
大家是否还记得KafkaProducer调用accumulator的apend方法后的逻辑是什么吗?不记得也没有关系,代码如下:
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
KafkaProducer通过这两个标识位来决定是否唤醒sender。翻译过来就是,“已经有封箱的消息了!sender快点把消息发走吧!不要再睡了”
讲到这里,其实整个消息追加的流程已经讲通了。不过消息追加的具体实现我们还没有讲解,那么接下来我们讲解发生消息追加的真正地方:ProducerBatch和MemoryRecordsBuilder。
前文说过ProducerBatch可以理解为存放消息的大货箱。此类中的主要方法是tryAppend,也就是把消息放入箱子的操作,它是消息追加的顶层逻辑,代码如下:
- public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
- if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
- return null;
- } else {
- Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
- this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
- recordsBuilder.compressionType(), key, value, headers));
- this.lastAppendTime = now;
- FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
- timestamp, checksum,
- key == null ? -1 : key.length,
- value == null ? -1 : value.length,
- Time.SYSTEM);
- // we have to keep every future returned to the users in case the batch needs to be
- // split to several new batches and resent.
- thunks.add(new Thunk(callback, future));
- this.recordCount++;
- return future;
- }
- }
主要做了三件事
另外还有一个重要的方法就是closeForRecordAppends(),当batch无空间容纳新的消息的时候会调用此方法封箱,这里不展开来讲。
讲到这里,终于讲到消息追加真正落地的地方了。每个ProducerBatch都维护了一个MemoryRecordsBuilder。ProducerBatch追加消息,实际是调用MemoryRecordsBuilder完成的。
消息最终通过MemoryRecordsBuilder的append方法,追加到MemoryRecordsBuilder的DataOutputStream中。
上一节我们可以看到它有两个主要的方法hasRoomFor()和append()。
1、hasRoomFor()
这个方法比较简单,通过计算消息的预估大小,以及剩余空间,返回true或者false。代码就不贴了,感兴趣的话自行查看。
2、append()
这个方法我们需要仔细分析一下,消息的追加最终发生在这里。我先简述下逻辑:
我们继续看一下appendDefaultRecord()方法,在此方法中最终调用了DefaultRecord.writeTo()来写入appendStream
最后做检查,更新一些统计状态,如消息的数量、lastOffset等。
在DefaultRecord.writeTo()方法中,通过调用Utils.writeTo(DataOutput out, ByteBuffer buffer, int length),往appendStream写入key,value,header。
Utils.writeTo()代码如下:
- if (buffer.hasArray()) {
- out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
- } else {
- int pos = buffer.position();
- for (int i = pos; i < length + pos; i++)
- out.writeByte(buffer.get(i));
- }
我们总结一下MemoryRecordsBuilder:它内部维护了一个buffer,并记录能写入的大小,以及写入的位置在哪里,而每条消息都被追加到DataOutput对象appendStream中。appendStream是消息在RecordAccumulator中最终的去处。
至此,对RecordAccumulator的讲解就结束了,其实只是结束了追加消息部分,本片博客的讲解范围限于此。在sender讲解中我们再继续讲解RecordAccumulator另外的内容。
最后我们总结一下:
1、RecordAccumulator使用ProducerBatch缓存消息。每个主题分区拥有一个ProducerBatch的队列。
2、当ProducerBatch队列的队尾batch不能再容纳新消息时,对其进行封箱操作,同时新建ProducerBatch放入队尾来存放新消息。
3、ProducerBatch对消息追加的操作都是通过MemoryRecordsBuilder进行的。消息最终被追加到MemoryRecordsBuilder中的DataOutputStream appendStream中
追加消息的部分已经全部完成,接下来我们来看看消息是怎么被发送出去的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。