赞
踩
本篇文章从源码的角度,介绍Kafka生产者如何封装消息,细节详见代码中注释。
在封装前会先进行数据大小的校验
org.apache.kafka.clients.producer.KafkaProducer#doSend
- //TODO 校验消息大小
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
- compressionType, serializedKey, serializedValue, headers);
- ensureValidRecordSize(serializedSize);
org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize
- private void ensureValidRecordSize(int size) {
- // 一条消息的最大size 默认1M
- if (size > maxRequestSize)
- throw new RecordTooLargeException("The message is " + size +
- " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
- ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
- // 一条消息的总内存size 默认32M
- if (size > totalMemorySize)
- throw new RecordTooLargeException("The message is " + size +
- " bytes when serialized which is larger than the total memory buffer you have configured with the " +
- ProducerConfig.BUFFER_MEMORY_CONFIG +
- " configuration.");
- }
org.apache.kafka.clients.producer.KafkaProducer#doSend
- //TODO 把消息封装入RecordAccumulator,内部是多个Deque<ProducerBatch>队列,队列中再分批次;内部定义Bufferpool管理内存
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
-
- if (result.abortForNewBatch) {
- int prevPartition = partition;
- partitioner.onNewBatch(record.topic(), cluster, prevPartition);
- partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- if (log.isTraceEnabled()) {
- log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
- }
- // producer callback will make sure to call both 'callback' and interceptor callback
- interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
-
- result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
- }
其中accumulator是在构造函数中已经初始化的对象。
接下来我们进入RecordAccumulator对象,具体看看是如何封装的。
在org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法中进行封装,这里可以看出整体的流程:
- public RecordAppendResult append(TopicPartition tp,
- long timestamp,
- byte[] key,
- byte[] value,
- Header[] headers,
- Callback callback,
- long maxTimeToBlock,
- boolean abortOnNewBatch,
- long nowMs) throws InterruptedException {
- // We keep track of the number of appending thread to make sure we do not miss batches in
- // abortIncompleteBatches().
- appendsInProgress.incrementAndGet();
- ByteBuffer buffer = null;
- if (headers == null) headers = Record.EMPTY_HEADERS;
- try {
- // check if we have an in-progress batch
- // TODO 根据Partition分区结果,获取到应该加入的队列(双端队列),一个Partition分区对应一个队列
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
- // TODO 尝试往队列里面添加数据,如果未申请内存,这里appendResult==null
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
- if (appendResult != null)
- return appendResult;
- }
-
- // we don't have an in-progress record batch try to allocate a new batch
- if (abortOnNewBatch) {
- // Return a result that will cause another call to append.
- return new RecordAppendResult(null, false, false, true);
- }
-
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- /**
- * TODO 获得消息的大小,默认配置大小和实际数据大小取最大值
- * 注意:生成环境要注意发送数据的大小,如果发送的数据过大,而配置的大小只能包含一条消息,则相当于每条消息发送一次,
- * 就会打破kafka生产者批次发送的设计,从而造成过多的网络IO。
- */
- int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
- // TODO 根据计算的消息大小,进行内存分配
- buffer = free.allocate(size, maxTimeToBlock);
-
- // Update the current time in case the buffer allocation blocked above.
- nowMs = time.milliseconds();
- synchronized (dq) {
- // Need to check if producer is closed again after grabbing the dequeue lock.
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
-
- // TODO 尝试往队列里面添加数据,如果未创建batch,这里appendResult==null
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
- if (appendResult != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
- return appendResult;
- }
-
- // TODO 根据申请的内存大小,封装batch
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
- // TODO 尝试往batch里添加数据
- FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
- callback, nowMs));
-
- // TODO 往队列尾端添加batch
- dq.addLast(batch);
- incomplete.add(batch);
-
- // Don't deallocate this buffer in the finally block as it's being used in the record batch
- // TODO batch在使用,设置buffer为null,防止在finally里中释放内存
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
- }
- } finally {
- // buffer!=null,没有新创建batch,则可以释放掉本次申请的内存
- if (buffer != null)
- free.deallocate(buffer);
- appendsInProgress.decrementAndGet();
- }
- }
接下来从这个整体流程中,找重点的方法进行分解。
org.apache.kafka.clients.producer.internals.RecordAccumulator#getOrCreateDeque
- private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
- // TODO 从batches的数据结构 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,可以看出是一个Parition分区对应一个Deque队列,并且是线程安全的
- // TODO 注意这是一个高频的操作!!
- Deque<ProducerBatch> d = this.batches.get(tp);
- if (d != null)
- return d;
- // TODO 如果没有该partition对应的队列,则新建一个ArrayDeque队列
- d = new ArrayDeque<>();
- // TODO 将新建的队列加入到batches中,并返回
- // TODO 注意这是一个低频的操作!!
- Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
- if (previous == null)
- return d;
- else
- return previous;
- }
其中,batches的数据结构是一个实现了ConcurrentMap接口的Map
- // TODO 线程安全的Map,partition分区->Deque队列
- private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
batches在构造函数中初始化是一个CopyOnWriteMap:
- /**
- * TODO 线程安全的Map
- * TODO write:synchronized加锁并copy一个新的对象,在新copy的map中进行写操作,写操作完毕后赋值给原有的Map(volatile修饰)
- * TODO read: 不加锁,直接从map中获取值
- * TODO 适合读多写少的场景
- */
- this.batches = new CopyOnWriteMap<>();
CopyOnWriteMap是kafka自定义的一个数据结构:
- public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
-
- private volatile Map<K, V> map;
-
- //...
-
- }
org.apache.kafka.clients.producer.internals.RecordAccumulator#tryAppend
- private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
- Callback callback, Deque<ProducerBatch> deque, long nowMs) {
- // TODO 获取队列尾端的ProducerBatch
- ProducerBatch last = deque.peekLast();
- // TODO 如果获取到ProducerBatch,则往ProducerBatch中添加数据
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
- if (future == null)
- last.closeForRecordAppends();
- else
- return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
- }
- return null;
- }
org.apache.kafka.clients.producer.internals.BufferPool#allocate
- public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
- // TODO 如果申请的size大于最大内存(默认32M)
- if (size > this.totalMemory)
- throw new IllegalArgumentException("Attempt to allocate " + size
- + " bytes, but there is a hard limit of "
- + this.totalMemory
- + " on memory allocations.");
-
- ByteBuffer buffer = null;
- this.lock.lock();
-
- if (this.closed) {
- this.lock.unlock();
- throw new KafkaException("Producer closed while allocating memory");
- }
-
- try {
- // check if we have a free buffer of the right size pooled
- // TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从队列头部返回ByteBuffer
- if (size == poolableSize && !this.free.isEmpty())
- return this.free.pollFirst();
-
- // now check if the request is immediately satisfiable with the
- // memory on hand or if we need to block
- // TODO 计算free总内存大小= Deque<ByteBuffer> free的size * 一个ProducerBatch默认的大小(16KB)
- int freeListSize = freeSize() * this.poolableSize;
- // TODO 可用内存大于要申请的内存,备注:总可用内存=nonPooledAvailableMemory + freeListSize
- if (this.nonPooledAvailableMemory + freeListSize >= size) {
- // we have enough unallocated or pooled memory to immediately
- // satisfy the request, but need to allocate the buffer
- freeUp(size);
- // TODO 扣减内存
- this.nonPooledAvailableMemory -= size;
- } else {
- // TODO 总可用内存不足
- // we are out of memory and will have to block
- // 当前分配到的内存大小
- int accumulated = 0;
- Condition moreMemory = this.lock.newCondition();
- try {
- long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
- // TODO 加入到等待队列
- this.waiters.addLast(moreMemory);
- // loop over and over until we have a buffer or have reserved
- // enough memory to allocate one
- // TODO 循环等待其他线程释放内存
- while (accumulated < size) {
- long startWaitNs = time.nanoseconds();
- long timeNs;
- boolean waitingTimeElapsed;
- try {
- // TODO 等待,需要被唤醒:释放内存,这里就被唤醒
- waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
- } finally {
- long endWaitNs = time.nanoseconds();
- timeNs = Math.max(0L, endWaitNs - startWaitNs);
- recordWaitTime(timeNs);
- }
-
- if (this.closed)
- throw new KafkaException("Producer closed while allocating memory");
-
- if (waitingTimeElapsed) {
- this.metrics.sensor("buffer-exhausted-records").record();
- throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
- }
-
- remainingTimeToBlockNs -= timeNs;
-
- // check if we can satisfy this request from the free list,
- // otherwise allocate memory
- // TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从free队列头头部获取ByteBuffer
- if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
- // just grab a buffer from the free list
- buffer = this.free.pollFirst();
- accumulated = size;
- } else {
- // TODO 如果依然不够
- // we'll need to allocate memory, but we may only get
- // part of what we need on this iteration
- freeUp(size - accumulated);
- // TODO 计算本次获得到的内存大小got
- int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
- // TODO 进行内存扣减,占用内存
- this.nonPooledAvailableMemory -= got;
- // TODO 累加获得到的内存,进行下一次循环等待
- accumulated += got;
- }
- }
- // Don't reclaim memory on throwable since nothing was thrown
- accumulated = 0;
- } finally {
- // When this loop was not able to successfully terminate don't loose available memory
- this.nonPooledAvailableMemory += accumulated;
- // TODO 移除等待队列
- this.waiters.remove(moreMemory);
- }
- }
- } finally {
- // signal any additional waiters if there is more memory left
- // over for them
- try {
- if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
- this.waiters.peekFirst().signal();
- } finally {
- // Another finally... otherwise find bugs complains
- lock.unlock();
- }
- }
-
- if (buffer == null)
- // TODO 分配内存
- return safeAllocateByteBuffer(size);
- else
- return buffer;
- }
org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer)
- public void deallocate(ByteBuffer buffer, int size) {
- lock.lock();
- try {
- // TODO 归还的size==poolableSize(16K)
- if (size == this.poolableSize && size == buffer.capacity()) {
- // TODO 清空buffer
- buffer.clear();
- // TODO 把清空后的buffer放入free队列
- this.free.add(buffer);
- } else {
- // TODO 内存加入到nonPooledAvailableMemory
- this.nonPooledAvailableMemory += size;
- }
- // TODO 获取等待队列中队首的Condition,并唤醒
- Condition moreMem = this.waiters.peekFirst();
- if (moreMem != null)
- moreMem.signal();
- } finally {
- lock.unlock();
- }
- }
-
- public void deallocate(ByteBuffer buffer) {
- deallocate(buffer, buffer.capacity());
- }
总结,经过上面这些步骤,最终封装成RecordAccumulator.RecordAppendResult消息对象,在后续进行发送使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。