当前位置:   article > 正文

Kafka源码篇 No.4-Producer消息封装_kafka生产者统一封装

kafka生产者统一封装

第1章 简介

本篇文章从源码的角度,介绍Kafka生产者如何封装消息,细节详见代码中注释。

第2章 详细步骤

2.1 消息大小的校验

在封装前会先进行数据大小的校验

org.apache.kafka.clients.producer.KafkaProducer#doSend

  1. //TODO 校验消息大小
  2. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
  3. compressionType, serializedKey, serializedValue, headers);
  4. ensureValidRecordSize(serializedSize);

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize

  1. private void ensureValidRecordSize(int size) {
  2. // 一条消息的最大size 默认1M
  3. if (size > maxRequestSize)
  4. throw new RecordTooLargeException("The message is " + size +
  5. " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
  6. ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
  7. // 一条消息的总内存size 默认32M
  8. if (size > totalMemorySize)
  9. throw new RecordTooLargeException("The message is " + size +
  10. " bytes when serialized which is larger than the total memory buffer you have configured with the " +
  11. ProducerConfig.BUFFER_MEMORY_CONFIG +
  12. " configuration.");
  13. }

2.2 消息封装

2.2.1 消息封装整体流程

org.apache.kafka.clients.producer.KafkaProducer#doSend

  1. //TODO 把消息封装入RecordAccumulator,内部是多个Deque<ProducerBatch>队列,队列中再分批次;内部定义Bufferpool管理内存
  2. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  3. serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
  4. if (result.abortForNewBatch) {
  5. int prevPartition = partition;
  6. partitioner.onNewBatch(record.topic(), cluster, prevPartition);
  7. partition = partition(record, serializedKey, serializedValue, cluster);
  8. tp = new TopicPartition(record.topic(), partition);
  9. if (log.isTraceEnabled()) {
  10. log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
  11. }
  12. // producer callback will make sure to call both 'callback' and interceptor callback
  13. interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  14. result = accumulator.append(tp, timestamp, serializedKey,
  15. serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
  16. }

其中accumulator是在构造函数中已经初始化的对象。

接下来我们进入RecordAccumulator对象,具体看看是如何封装的。

在org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法中进行封装,这里可以看出整体的流程

  1. public RecordAppendResult append(TopicPartition tp,
  2. long timestamp,
  3. byte[] key,
  4. byte[] value,
  5. Header[] headers,
  6. Callback callback,
  7. long maxTimeToBlock,
  8. boolean abortOnNewBatch,
  9. long nowMs) throws InterruptedException {
  10. // We keep track of the number of appending thread to make sure we do not miss batches in
  11. // abortIncompleteBatches().
  12. appendsInProgress.incrementAndGet();
  13. ByteBuffer buffer = null;
  14. if (headers == null) headers = Record.EMPTY_HEADERS;
  15. try {
  16. // check if we have an in-progress batch
  17. // TODO 根据Partition分区结果,获取到应该加入的队列(双端队列),一个Partition分区对应一个队列
  18. Deque<ProducerBatch> dq = getOrCreateDeque(tp);
  19. synchronized (dq) {
  20. if (closed)
  21. throw new KafkaException("Producer closed while send in progress");
  22. // TODO 尝试往队列里面添加数据,如果未申请内存,这里appendResult==null
  23. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
  24. if (appendResult != null)
  25. return appendResult;
  26. }
  27. // we don't have an in-progress record batch try to allocate a new batch
  28. if (abortOnNewBatch) {
  29. // Return a result that will cause another call to append.
  30. return new RecordAppendResult(null, false, false, true);
  31. }
  32. byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  33. /**
  34. * TODO 获得消息的大小,默认配置大小和实际数据大小取最大值
  35. * 注意:生成环境要注意发送数据的大小,如果发送的数据过大,而配置的大小只能包含一条消息,则相当于每条消息发送一次,
  36. * 就会打破kafka生产者批次发送的设计,从而造成过多的网络IO。
  37. */
  38. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
  39. log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
  40. // TODO 根据计算的消息大小,进行内存分配
  41. buffer = free.allocate(size, maxTimeToBlock);
  42. // Update the current time in case the buffer allocation blocked above.
  43. nowMs = time.milliseconds();
  44. synchronized (dq) {
  45. // Need to check if producer is closed again after grabbing the dequeue lock.
  46. if (closed)
  47. throw new KafkaException("Producer closed while send in progress");
  48. // TODO 尝试往队列里面添加数据,如果未创建batch,这里appendResult==null
  49. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
  50. if (appendResult != null) {
  51. // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
  52. return appendResult;
  53. }
  54. // TODO 根据申请的内存大小,封装batch
  55. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  56. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
  57. // TODO 尝试往batch里添加数据
  58. FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
  59. callback, nowMs));
  60. // TODO 往队列尾端添加batch
  61. dq.addLast(batch);
  62. incomplete.add(batch);
  63. // Don't deallocate this buffer in the finally block as it's being used in the record batch
  64. // TODO batch在使用,设置buffer为null,防止在finally里中释放内存
  65. buffer = null;
  66. return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
  67. }
  68. } finally {
  69. // buffer!=null,没有新创建batch,则可以释放掉本次申请的内存
  70. if (buffer != null)
  71. free.deallocate(buffer);
  72. appendsInProgress.decrementAndGet();
  73. }
  74. }

接下来从这个整体流程中,找重点的方法进行分解。

2.2.2 getOrCreateDeque获取队列

org.apache.kafka.clients.producer.internals.RecordAccumulator#getOrCreateDeque

  1. private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
  2. // TODO 从batches的数据结构 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,可以看出是一个Parition分区对应一个Deque队列,并且是线程安全的
  3. // TODO 注意这是一个高频的操作!!
  4. Deque<ProducerBatch> d = this.batches.get(tp);
  5. if (d != null)
  6. return d;
  7. // TODO 如果没有该partition对应的队列,则新建一个ArrayDeque队列
  8. d = new ArrayDeque<>();
  9. // TODO 将新建的队列加入到batches中,并返回
  10. // TODO 注意这是一个低频的操作!!
  11. Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
  12. if (previous == null)
  13. return d;
  14. else
  15. return previous;
  16. }

其中,batches的数据结构是一个实现了ConcurrentMap接口的Map

  1. // TODO 线程安全的Map,partition分区->Deque队列
  2. private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

batches在构造函数中初始化是一个CopyOnWriteMap:

  1. /**
  2. * TODO 线程安全的Map
  3. * TODO write:synchronized加锁并copy一个新的对象,在新copy的map中进行写操作,写操作完毕后赋值给原有的Map(volatile修饰)
  4. * TODO read: 不加锁,直接从map中获取值
  5. * TODO 适合读多写少的场景
  6. */
  7. this.batches = new CopyOnWriteMap<>();

CopyOnWriteMap是kafka自定义的一个数据结构:

  1. public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
  2. private volatile Map<K, V> map;
  3. //...
  4. }

2.2.3 tryAppend填充数据

org.apache.kafka.clients.producer.internals.RecordAccumulator#tryAppend

  1. private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
  2. Callback callback, Deque<ProducerBatch> deque, long nowMs) {
  3. // TODO 获取队列尾端的ProducerBatch
  4. ProducerBatch last = deque.peekLast();
  5. // TODO 如果获取到ProducerBatch,则往ProducerBatch中添加数据
  6. if (last != null) {
  7. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
  8. if (future == null)
  9. last.closeForRecordAppends();
  10. else
  11. return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
  12. }
  13. return null;
  14. }

2.2.4 allocate分配内存

org.apache.kafka.clients.producer.internals.BufferPool#allocate

  1. public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
  2. // TODO 如果申请的size大于最大内存(默认32M)
  3. if (size > this.totalMemory)
  4. throw new IllegalArgumentException("Attempt to allocate " + size
  5. + " bytes, but there is a hard limit of "
  6. + this.totalMemory
  7. + " on memory allocations.");
  8. ByteBuffer buffer = null;
  9. this.lock.lock();
  10. if (this.closed) {
  11. this.lock.unlock();
  12. throw new KafkaException("Producer closed while allocating memory");
  13. }
  14. try {
  15. // check if we have a free buffer of the right size pooled
  16. // TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从队列头部返回ByteBuffer
  17. if (size == poolableSize && !this.free.isEmpty())
  18. return this.free.pollFirst();
  19. // now check if the request is immediately satisfiable with the
  20. // memory on hand or if we need to block
  21. // TODO 计算free总内存大小= Deque<ByteBuffer> free的size * 一个ProducerBatch默认的大小(16KB)
  22. int freeListSize = freeSize() * this.poolableSize;
  23. // TODO 可用内存大于要申请的内存,备注:总可用内存=nonPooledAvailableMemory + freeListSize
  24. if (this.nonPooledAvailableMemory + freeListSize >= size) {
  25. // we have enough unallocated or pooled memory to immediately
  26. // satisfy the request, but need to allocate the buffer
  27. freeUp(size);
  28. // TODO 扣减内存
  29. this.nonPooledAvailableMemory -= size;
  30. } else {
  31. // TODO 总可用内存不足
  32. // we are out of memory and will have to block
  33. // 当前分配到的内存大小
  34. int accumulated = 0;
  35. Condition moreMemory = this.lock.newCondition();
  36. try {
  37. long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
  38. // TODO 加入到等待队列
  39. this.waiters.addLast(moreMemory);
  40. // loop over and over until we have a buffer or have reserved
  41. // enough memory to allocate one
  42. // TODO 循环等待其他线程释放内存
  43. while (accumulated < size) {
  44. long startWaitNs = time.nanoseconds();
  45. long timeNs;
  46. boolean waitingTimeElapsed;
  47. try {
  48. // TODO 等待,需要被唤醒:释放内存,这里就被唤醒
  49. waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
  50. } finally {
  51. long endWaitNs = time.nanoseconds();
  52. timeNs = Math.max(0L, endWaitNs - startWaitNs);
  53. recordWaitTime(timeNs);
  54. }
  55. if (this.closed)
  56. throw new KafkaException("Producer closed while allocating memory");
  57. if (waitingTimeElapsed) {
  58. this.metrics.sensor("buffer-exhausted-records").record();
  59. throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
  60. }
  61. remainingTimeToBlockNs -= timeNs;
  62. // check if we can satisfy this request from the free list,
  63. // otherwise allocate memory
  64. // TODO 如果申请的size==ProducerBatch && Deque<ByteBuffer> free不是空,则直接从free队列头头部获取ByteBuffer
  65. if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
  66. // just grab a buffer from the free list
  67. buffer = this.free.pollFirst();
  68. accumulated = size;
  69. } else {
  70. // TODO 如果依然不够
  71. // we'll need to allocate memory, but we may only get
  72. // part of what we need on this iteration
  73. freeUp(size - accumulated);
  74. // TODO 计算本次获得到的内存大小got
  75. int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
  76. // TODO 进行内存扣减,占用内存
  77. this.nonPooledAvailableMemory -= got;
  78. // TODO 累加获得到的内存,进行下一次循环等待
  79. accumulated += got;
  80. }
  81. }
  82. // Don't reclaim memory on throwable since nothing was thrown
  83. accumulated = 0;
  84. } finally {
  85. // When this loop was not able to successfully terminate don't loose available memory
  86. this.nonPooledAvailableMemory += accumulated;
  87. // TODO 移除等待队列
  88. this.waiters.remove(moreMemory);
  89. }
  90. }
  91. } finally {
  92. // signal any additional waiters if there is more memory left
  93. // over for them
  94. try {
  95. if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
  96. this.waiters.peekFirst().signal();
  97. } finally {
  98. // Another finally... otherwise find bugs complains
  99. lock.unlock();
  100. }
  101. }
  102. if (buffer == null)
  103. // TODO 分配内存
  104. return safeAllocateByteBuffer(size);
  105. else
  106. return buffer;
  107. }

2.2.5 deallocate释放内存

org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer)

  1. public void deallocate(ByteBuffer buffer, int size) {
  2. lock.lock();
  3. try {
  4. // TODO 归还的size==poolableSize(16K)
  5. if (size == this.poolableSize && size == buffer.capacity()) {
  6. // TODO 清空buffer
  7. buffer.clear();
  8. // TODO 把清空后的buffer放入free队列
  9. this.free.add(buffer);
  10. } else {
  11. // TODO 内存加入到nonPooledAvailableMemory
  12. this.nonPooledAvailableMemory += size;
  13. }
  14. // TODO 获取等待队列中队首的Condition,并唤醒
  15. Condition moreMem = this.waiters.peekFirst();
  16. if (moreMem != null)
  17. moreMem.signal();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. public void deallocate(ByteBuffer buffer) {
  23. deallocate(buffer, buffer.capacity());
  24. }

总结,经过上面这些步骤,最终封装成RecordAccumulator.RecordAppendResult消息对象,在后续进行发送使用。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/618429
推荐阅读
相关标签
  

闽ICP备14008679号