  17. package kafka.examples;
  18. import org.apache.kafka.clients.producer.Callback;
  19. import org.apache.kafka.clients.producer.KafkaProducer;
  20. import org.apache.kafka.clients.producer.ProducerRecord;
  21. import org.apache.kafka.clients.producer.RecordMetadata;
  22. import org.apache.kafka.clients.producer.ProducerConfig;
  23. import org.apache.kafka.common.serialization.IntegerSerializer;
  24. import org.apache.kafka.common.serialization.StringSerializer;
  25. import java.util.Properties;
  26. import java.util.concurrent.CountDownLatch;
  27. import java.util.concurrent.ExecutionException;
  28. public class Producer extends Thread {
  29. private final KafkaProducer<Integer, String> producer;
  30. private final String topic;
  31. private final Boolean isAsync;
  32. private int numRecords;
  33. private final CountDownLatch latch;
  34. public Producer(final String topic,
  35. final Boolean isAsync,
  36. final String transactionalId,
  37. final boolean enableIdempotency,
  38. final int numRecords,
  39. final int transactionTimeoutMs,
  40. final CountDownLatch latch) {
  41. Properties props = new Properties();
  42. //指定Kafka集群节点列表(全部 or 部分均可),用于KafkaProducer初始获取Server端元数据(如完整节点列表、Partition分布等等)
  43. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
  44. props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
  45. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
  46. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  47. if (transactionTimeoutMs > 0) {
  48. props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
  49. }
  50. if (transactionalId != null) {
  51. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
  52. }
  53. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
  54. //指定服务端有多少个副本完成同步,才算该Producer发出的消息写入成功
  55. props.put(ProducerConfig.ACKS_CONFIG, "-1");
  56. //失败重试次数;
  57. props.put(ProducerConfig.RETRIES_CONFIG, "3");
  58. producer = new KafkaProducer<>(props);
  59. this.topic = topic;
  60. this.isAsync = isAsync;
  61. this.numRecords = numRecords;
  62. this.latch = latch;
  63. }
  64. KafkaProducer<Integer, String> get() {
  65. return producer;
  66. }
  67. @Override
  68. public void run() {
  69. // key用来决定目标Partition
  70. int messageKey = 0;
  71. int recordsSent = 0;
  72. while (recordsSent < numRecords) {
  73. //传递业务数据
  74. String messageStr = "Message_" + messageKey;
  75. long startTime = System.currentTimeMillis();
  76. if (isAsync) { // Send asynchronously
  77. producer.send(new ProducerRecord<>(topic,
  78. messageKey,
  79. messageStr), new DemoCallBack(startTime, messageKey, messageStr));
  80. } else { // Send synchronously
  81. try {
  82. // KafkaProducer中各类send方法均返回Future,并不会直接返回发送结果,其原因便是线程模型设计。
  83. producer.send(new ProducerRecord<>(topic,
  84. messageKey,
  85. messageStr)).get();
  86. System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
  87. } catch (InterruptedException | ExecutionException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. messageKey += 2;
  92. recordsSent += 1;
  93. }
  94. System.out.println("Producer sent " + numRecords + " records successfully");
  95. latch.countDown();
  96. }
  97. }
  98. class DemoCallBack implements Callback {
  99. private final long startTime;
  100. private final int key;
  101. private final String message;
  102. public DemoCallBack(long startTime, int key, String message) {
  103. this.startTime = startTime;
  104. this.key = key;
  105. this.message = message;
  106. }
  107. /**
  108. * A callback method the user can implement to provide asynchronous handling of request completion. This method will
  109. * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
  110. * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
  111. *
  112. * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
  113. * with -1 value for all fields except for topicPartition will be returned if an error occurred.
  114. * @param exception The exception thrown during processing of this record. Null if no error occurred.
  115. */
  116. public void onCompletion(RecordMetadata metadata, Exception exception) {
  117. long elapsedTime = System.currentTimeMillis() - startTime;
  118. if (metadata != null) {
  119. System.out.println(
  120. "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
  121. "), " +
  122. "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
  123. } else {
  124. exception.printStackTrace();
  125. }
  126. }
1)baseOffset:当前RecordBatch的起始位移,Record中的offset delta与该baseOffset相加才能得到真正的offset值。当RecordBatch还在producer端的时候,offset是producer分配的一个值(不partition的offset);

2)batchLength:RecordBatch的总长度,从`partition leader epoch`到末尾的长度;

3)partitionLeaderEpoch:用于标记目标 partition中leader replica的纪元信息,可以看做是分区leader的版本号或者更新次数;


5)crc32校验码:参与校验的部分是从attributes到RecordBatch末尾的全部数据;partitionLeaderEpoch 不在 CRC 里面是因为每次 broker 收到 RecordBatch 的时候,都会赋值 partitionLeaderEpoch,如果包含在 CRC 里面会导致需要重新计算CRC;

6)attributes:从 V1 版本中的 8 位扩展到 16 位,0~2 位表示压缩类型,第 3 位表示时间戳类型,第 4 位表示是否是事务型记录。所谓“事务”是Kafka的新功能,开启事务之后,只有在事务提交之后,事务型 consumer 才可以看到记录。5表示是否是 Control Record,这类记录总是单条出现,被包含在一个 control record batch 里面,它可以用于标记“事务是否已经提交”、“事务是否已经中止” 等,它只会在 broker 内处理,不会被传输给 consumer 和 producer,即对客户端是透明的;



9)maxTimestamp:RecordBatch中最大的时间戳,一般是最后一条消息的时间戳,用于broker确认RecordBatch中 Records 的组装正确性。




13)recordCount:Record 的数量;

















在RecordAccumulator内部,维护了一个Map集合batchs,用于缓存发送到Kafka服务端的批次消息,因为需要保证线程安全,所以类型是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,并且初始化的时候设置的对象是Kafka自定义的一个对象CopyOnWriteMap,Deque的实例是ArrayDeque,这个是非线程安全的,所以在操作的时候是要加锁的








  • 首先是是进行一些消息格式的验证:
  • 当前不处理ControlBatch,ControlBatch有自己的逻辑处理;
  • 当前Record要追加的记录肯定是要在最近一次lastOffset之后才是合理的;
  • 消息Record的timestamp需是大于0的合法数字;
  • 然后调用数据写入方法写入数据流DataOutputStream,更新当前RecordBatch的相关元数据;






  1. public void close() {
  2. if (aborted)
  3. throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted");
  4. if (builtRecords != null)
  5. return;
  6. // 简单的参数校验
  7. validateProducerState();
  8. // 流资源的释放和关闭
  9. closeForRecordAppends();
  10. // 初始化records位置
  11. if (numRecords == 0L) {
  12. buffer().position(initialPosition);
  13. builtRecords = MemoryRecords.EMPTY;
  14. } else {
  15. if (magic > RecordBatch.MAGIC_VALUE_V1)
  16. this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
  17. else if (compressionType != CompressionType.NONE)
  18. this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
  19. //复制一份ByteBuffer出来,然后切换到读模式(flip),通过slice()方法得到新一个独立的ByteBuffer,设置给builtRecords,builtRecords对象里面持有对象ByteBuffer
  20. ByteBuffer buffer = buffer().duplicate();
  21. buffer.flip();
  22. buffer.position(initialPosition);
  23. builtRecords = MemoryRecords.readableRecords(buffer.slice());
  24. }
  25. }







  1. public Deque<ProducerBatch> split(int splitBatchSize) {
  2. // 分割结果
  3. Deque<ProducerBatch> batches = new ArrayDeque<>();
  4. // 获取当前batch的MemoryRecords对象,也就是获取当前batch的ByteBuffer中存储的消息;
  5. MemoryRecords memoryRecords = recordsBuilder.build();
  6. Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();
  7. if (!recordBatchIter.hasNext())
  8. throw new IllegalStateException("Cannot split an empty producer batch.");
  9. RecordBatch recordBatch = recordBatchIter.next();
  10. if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed())
  11. throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " +
  12. "with version v0 and v1");
  13. if (recordBatchIter.hasNext())
  14. throw new IllegalArgumentException("A producer batch should only have one record batch.");
  15. Iterator<Thunk> thunkIter = thunks.iterator();
  16. // We always allocate batch size because we are already splitting a big batch.
  17. // And we also Retain the create time of the original batch.
  18. ProducerBatch batch = null;
  19. // 遍历batch的record
  20. for (Record record : recordBatch) {
  21. assert thunkIter.hasNext();
  22. Thunk thunk = thunkIter.next();
  23. if (batch == null)
  24. //首次循环会调用createBatchOffAccumulatorForRecord()方法来分配一个ByteBuffer内存空间,空间大小根据Record和splitBatchSize(batch.size)最大值来决定,然后创建MemoryRecordsBuilder和ProducerBatch对象,这里同正常创建MemoryRecordsBuilder和ProducerBatch一样,不同的是这里的Record记录的大小可能超过batch.size
  25. batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
  26. //调用方法tryAppendForSplit来追加当前记录到新创建的更大的ProducerBatch,tryAppendForSplit追加Record的方式同上面的tryAppend基本差不多,不同的是这里的Thunk对象使用已经存在的即可,这里Thunk里面使用的是元数据FutureRecordMetadata 链,也就是加在原来batch的future后面;在batch拆分的情况下,此时的ProducerBatch应该只有一条Record的,如果在下次循环或者某个循环内,tryAppendForSplit失败的情况下,也是空间不足了,就再创建新的ProducerBatch,以此类推,直到把所有的Records记录循环完毕;
  27. // A newly created batch can always host the first message.
  28. if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
  29. batches.add(batch);
  30. batch.closeForRecordAppends();
  31. batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
  32. batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
  33. }
  34. }
  35. // 拆分后的batch存放在队列batches里面,并结束拆分前的batch的发送结果produceFuture,调用了ProduceRequestResult.done()方法之后,就会唤醒等待在ProduceRequestResult处理结果上的await的线程;
  36. // Close the last batch and add it to the batch list after split.
  37. if (batch != null) {
  38. batches.add(batch);
  39. batch.closeForRecordAppends();
  40. }
  41. produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, index -> new RecordBatchTooLargeException());
  42. //会唤醒等待在ProduceRequestResult处理结果上的await的线程;
  43. produceFuture.done();
  44. // 当设置baseSequence基础序号的时候,需要设置ProducerState用于支持事务和幂等性,校验是否是重复Record
  45. if (hasSequence()) {
  46. int sequence = baseSequence();
  47. ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
  48. for (ProducerBatch newBatch : batches) {
  49. newBatch.setProducerState(producerIdAndEpoch, sequence, isTransactional());
  50. sequence += newBatch.recordCount;
  51. }
  52. }
  53. return batches;
  54. }
  55. /**
  56. * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
  57. * @return true if the record has been successfully appended, false otherwise.
  58. */
  59. private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
  60. if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
  61. return false;
  62. } else {
  63. // No need to get the CRC.
  64. this.recordsBuilder.append(timestamp, key, value, headers);
  65. this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
  66. recordsBuilder.compressionType(), key, value, headers));
  67. FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
  68. timestamp,
  69. key == null ? -1 : key.remaining(),
  70. value == null ? -1 : value.remaining(),
  71. Time.SYSTEM);
  72. // Chain the future to the original thunk.
  73. thunk.future.chain(future);
  74. this.thunks.add(thunk);
  75. this.recordCount++;
  76. return true;
  77. }
  78. }

3.done() 方法


1)设置发送结果状态ProducerBatch.FinalState,一共有3种结果:ABORTED, FAILED, SUCCEEDED,如果服务端返回了异常,那么设置为FAILED,否则就是SUCCEEDED;






KafkaProducer().send(ProducerRecord<K, V> record, Callback callback)

  1. @Override
  2. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  3. // intercept the record, which can be potentially modified; this method does not throw exceptions
  4. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  5. return doSend(interceptedRecord, callback);
  6. }

KafkaProducer().send包含:KafkaProducer().doSend(ProducerRecord<K, V> record, Callback callback)

  1. /**
  2. * Implementation of asynchronously send a record to a topic.
  3. */
  4. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  5. TopicPartition tp = null;
  6. try {
  7. throwIfProducerClosed();
  8. // first make sure the metadata for the topic is available
  9. long nowMs = time.milliseconds();
  10. ClusterAndWaitTime clusterAndWaitTime;
  11. try {
  12. clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
  13. } catch (KafkaException e) {
  14. if (metadata.isClosed())
  15. throw new KafkaException("Producer closed while send in progress", e);
  16. throw e;
  17. }
  18. nowMs += clusterAndWaitTime.waitedOnMetadataMs;
  19. long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
  20. Cluster cluster = clusterAndWaitTime.cluster;
  21. byte[] serializedKey;
  22. try {
  23. serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  24. } catch (ClassCastException cce) {
  25. throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
  26. " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
  27. " specified in key.serializer", cce);
  28. }
  29. byte[] serializedValue;
  30. try {
  31. serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  32. } catch (ClassCastException cce) {
  33. throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
  34. " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
  35. " specified in value.serializer", cce);
  36. }
  37. int partition = partition(record, serializedKey, serializedValue, cluster);
  38. tp = new TopicPartition(record.topic(), partition);
  39. setReadOnly(record.headers());
  40. Header[] headers = record.headers().toArray();
  41. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
  42. compressionType, serializedKey, serializedValue, headers);
  43. ensureValidRecordSize(serializedSize);
  44. long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
  45. if (log.isTraceEnabled()) {
  46. log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
  47. }
  48. // producer callback will make sure to call both 'callback' and interceptor callback
  49. Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  50. if (transactionManager != null && transactionManager.isTransactional()) {
  51. transactionManager.failIfNotReadyForSend();
  52. }
  53. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  54. serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
  55. if (result.abortForNewBatch) {
  56. int prevPartition = partition;
  57. partitioner.onNewBatch(record.topic(), cluster, prevPartition);
  58. partition = partition(record, serializedKey, serializedValue, cluster);
  59. tp = new TopicPartition(record.topic(), partition);
  60. if (log.isTraceEnabled()) {
  61. log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
  62. }
  63. // producer callback will make sure to call both 'callback' and interceptor callback
  64. interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  65. result = accumulator.append(tp, timestamp, serializedKey,
  66. serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
  67. }
  68. if (transactionManager != null && transactionManager.isTransactional())
  69. transactionManager.maybeAddPartitionToTransaction(tp);
  70. if (result.batchIsFull || result.newBatchCreated) {
  71. log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  72. this.sender.wakeup();
  73. }
  74. return result.future;
  75. // handling exceptions and record the errors;
  76. // for API exceptions return them in the future,
  77. // for other exceptions throw directly
  78. } catch (ApiException e) {
  79. log.debug("Exception occurred during message send:", e);
  80. if (callback != null)
  81. callback.onCompletion(null, e);
  82. this.errors.record();
  83. this.interceptors.onSendError(record, tp, e);
  84. return new FutureFailure(e);
  85. } catch (InterruptedException e) {
  86. this.errors.record();
  87. this.interceptors.onSendError(record, tp, e);
  88. throw new InterruptException(e);
  89. } catch (KafkaException e) {
  90. this.errors.record();
  91. this.interceptors.onSendError(record, tp, e);
  92. throw e;
  93. } catch (Exception e) {
  94. // we notify interceptor about all exceptions, since onSend is called before anything else in this method
  95. this.interceptors.onSendError(record, tp, e);
  96. throw e;
  97. }
  98. }

KafkaProducer().doSend包含:RecordAccumulater().append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs)


  1. /**
  2. * Add a record to the accumulator, return the append result
  3. * <p>
  4. * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
  5. * <p>
  6. *
  7. * @param tp The topic/partition to which this record is being sent
  8. * @param timestamp The timestamp of the record
  9. * @param key The key for the record
  10. * @param value The value for the record
  11. * @param headers the Headers for the record
  12. * @param callback The user-supplied callback to execute when the request is complete
  13. * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
  14. * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
  15. * running the partitioner's onNewBatch method before trying to append again
  16. * @param nowMs The current time, in milliseconds
  17. */
  18. public RecordAppendResult append(TopicPartition tp,
  19. long timestamp,
  20. byte[] key,
  21. byte[] value,
  22. Header[] headers,
  23. Callback callback,
  24. long maxTimeToBlock,
  25. boolean abortOnNewBatch,
  26. long nowMs) throws InterruptedException {
  27. // We keep track of the number of appending thread to make sure we do not miss batches in
  28. // abortIncompleteBatches().
  29. // 统计当前Accumulator中正在追加消息的执行次数,在方法的执行最后的finally块再进行减1操作;该值越大,说明并发越高、或者block在缓冲区的线程越多,比如可能由于内存没有空间分配,缓冲区的消息没能及时发送到kafka等多种原因;
  30. appendsInProgress.incrementAndGet();
  31. ByteBuffer buffer = null;
  32. if (headers == null) headers = Record.EMPTY_HEADERS;
  33. try {
  34. // check if we have an in-progress batch
  35. // 调用方法getOrCreateDeque获取或者创建topicPartition对应的ProducerBatch队列;
  36. Deque<ProducerBatch> dq = getOrCreateDeque(tp);
  37. // 锁住dq,保证只有一个线程执行tryAppend方法来追加当前消息到数据缓冲区,如果tryAppend成功返回,就直接返回append的结果的封装对象RecordAppendResult;
  38. synchronized (dq) {
  39. if (closed)
  40. throw new KafkaException("Producer closed while send in progress");
  41. // 如果tryAppend失败,比如在当前ProducerBatch空间不足或者队列中还没有可用的ProducerBatch时,如果标示abortOnNewBatch=true,标示放弃创建新的ProducerBatch,直接返回一个“空”的RecordAppendResult;
  42. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
  43. if (appendResult != null)
  44. return appendResult;
  45. }
  46. // we don't have an in-progress record batch try to allocate a new batch
  47. if (abortOnNewBatch) {
  48. // Return a result that will cause another call to append.
  49. return new RecordAppendResult(null, false, false, true);
  50. }
  51. byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  52. // 调用estimateSizeInBytesUpperBound方法是对追加的Record大小进行一个预估,根据上面Record格式的不同,采取的预估值不同,最终取的还是batch.size和预估值的最大值进行内存分配;
  53. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
  54. log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
  55. buffer = free.allocate(size, maxTimeToBlock);
  56. // Update the current time in case the buffer allocation blocked above.
  57. nowMs = time.milliseconds();
  58. synchronized (dq) {
  59. // Need to check if producer is closed again after grabbing the dequeue lock.
  60. if (closed)
  61. throw new KafkaException("Producer closed while send in progress");
  62. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
  63. if (appendResult != null) {
  64. // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
  65. return appendResult;
  66. }
  67. // 如果tryAppend成功返回,就直接返回append的结果的封装对象RecordAppendResult,如果tryAppend失败,那么就新创建ProducerBatch,这里就用到了上面的MemoryRecordsBuilder机制来实现RecordBatch缓冲区数据的追加及消息相关元数据的管理;
  68. 将新的ProducerBatch添加的batches队列,这样后续的消息就可以使用该ProducerBatch来追加消息了;
  69. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  70. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
  71. FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
  72. callback, nowMs));
  73. dq.addLast(batch);
  74. incomplete.add(batch);
  75. // Don't deallocate this buffer in the finally block as it's being used in the record batch
  76. // 没在finally执行了buffer = null,是因为在finally块如果在buffer不为空的时候会进行释放ByteBuffer内存,正常情况下因为消息成功的加入了缓冲区,不能进行释放,但是如果在执行的过程中发生了异常,没能成功加入缓冲区的情况下,要进行已分配ByteBuffer内存的释放;
  77. buffer = null;
  78. return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
  79. }
  80. } finally {
  81. if (buffer != null)
  82. free.deallocate(buffer);
  83. appendsInProgress.decrementAndGet();
  84. }
  85. }

RecordAccumulater().append包含:RecordAccumulater().tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs)



  1. /**
  2. * Try to append to a ProducerBatch.
  3. *
  4. * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
  5. * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
  6. * and memory records built) in one of the following cases (whichever comes first): right before send,
  7. * if it is expired, or when the producer is closed.
  8. */
  9. private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
  10. Callback callback, Deque<ProducerBatch> deque, long nowMs) {
  11. ProducerBatch last = deque.peekLast();
  12. if (last != null) {
  13. FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
  14. if (future == null)
  15. last.closeForRecordAppends();
  16. else
  17. return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
  18. }
  19. return null;
  20. }

RecordAccumulater().tryAppend包含:ProducerBatch().tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now)





  1. /**
  2. * Append the record to the current record set and return the relative offset within that record set
  3. *
  4. * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
  5. */
  6. public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
  7. if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
  8. return null;
  9. } else {
  10. this.recordsBuilder.append(timestamp, key, value, headers);
  11. this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
  12. recordsBuilder.compressionType(), key, value, headers));
  13. this.lastAppendTime = now;
  14. FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
  15. timestamp,
  16. key == null ? -1 : key.length,
  17. value == null ? -1 : value.length,
  18. Time.SYSTEM);
  19. // we have to keep every future returned to the users in case the batch needs to be
  20. // split to several new batches and resent.
  21. thunks.add(new Thunk(callback, future));
  22. this.recordCount++;
  23. return future;
  24. }

ProducerBatch().tryAppend调用:MemoryRecordsBuilder().hasRoomFor(long timestamp, byte[] key, byte[] value, Header[] headers)





  1. /**
  2. * Check if we have room for a new record containing the given key/value pair. If no records have been
  3. * appended, then this returns true.
  4. *
  5. * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
  6. * accurate if compression is used. When this happens, the following append may cause dynamic buffer
  7. * re-allocation in the underlying byte buffer stream.
  8. */
  9. public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
  10. if (isFull())
  11. return false;
  12. // We always allow at least one record to be appended (the ByteBufferOutputStream will grow as needed)
  13. if (numRecords == 0)
  14. return true;
  15. final int recordSize;
  16. if (magic < RecordBatch.MAGIC_VALUE_V2) {
  17. recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
  18. } else {
  19. int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
  20. long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
  21. recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
  22. }
  23. // Be conservative and not take compression of the new record into consideration.
  24. return this.writeLimit >= estimatedBytesWritten() + recordSize;
  25. }






  1. public boolean isFull() {
  2. // note that the write limit is respected only after the first record is added which ensures we can always
  3. // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0).
  4. return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
  5. }






  1. /**
  2. * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
  3. * @return The estimated number of bytes written
  4. */
  5. private int estimatedBytesWritten() {
  6. if (compressionType == CompressionType.NONE) {
  7. return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
  8. } else {
  9. // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
  10. return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR);
  11. }
  12. }

ProducerBatch().tryAppend包含:MemoryRecordsBuilder().append(long timestamp, byte[] key, byte[] value, Header[] headers)


MemoryRecordsBuilder().append包含:MemoryRecordsBuilder().appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers)

  1. /**
  2. * Append a new record at the given offset.
  3. */
  4. private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
  5. ByteBuffer value, Header[] headers) {
  6. try {
  7. if (isControlRecord != isControlBatch)
  8. throw new IllegalArgumentException("Control records can only be appended to control batches");
  9. if (lastOffset != null && offset <= lastOffset)
  10. throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
  11. "(Offsets must increase monotonically).", offset, lastOffset));
  12. if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
  13. throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
  14. if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
  15. throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
  16. if (baseTimestamp == null)
  17. baseTimestamp = timestamp;
  18. if (magic > RecordBatch.MAGIC_VALUE_V1) {
  19. appendDefaultRecord(offset, timestamp, key, value, headers);
  20. } else {
  21. appendLegacyRecord(offset, timestamp, key, value, magic);
  22. }
  23. } catch (IOException e) {
  24. throw new KafkaException("I/O exception when writing to the append stream, closing", e);
  25. }
  26. }

MemoryRecordsBuilder().appendWithOffset包含(V2版本):MemoryRecordsBuilder().appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers)

  1. private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
  2. Header[] headers) throws IOException {
  3. ensureOpenForRecordAppend();
  4. int offsetDelta = (int) (offset - baseOffset);
  5. long timestampDelta = timestamp - baseTimestamp;
  6. int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
  7. recordWritten(offset, timestamp, sizeInBytes);
  8. }


1 读写分离的设计CopyOnWriteMap


  • 线程要修改map内容,就复制一份map,在修改之后,把新的指针赋给map,且put方法是加了 synchronized修饰的,因此同一时间只能有一个线程修改内容。
  • 在修改的时候别的线程依然可以读取老的Map。




2 消息追加方法RecordAccumulator.append的线程安全和高并发效率的保证









  • RecordBatch 满了
  • 消息在RecordBatch中停留的时间超过了linger.ms;
  • 消息缓冲区内存不足存在线程等待分配空间;
  • RecordBatch写入流关闭了
  • 手动执行了KafkaProducer的flush方法:这会触发所有分区的ready来发送消息;

  • 当满了上面说的5个条件之一时,设置sendable=true
  • 在sendable=true且当前batch不属于重试时,就认为该分区Leader节点已经准备就绪了:
  • 否则可能就是sendable条件不满足或者是发送重试的batch,计算下次准备检查的时间nextReadyCheckDelayMs
  • 封装ready计算结果对象ReadyCheckResult






1 在Sender线程发送ProducerBatch到kafka后,发生了异常,在可以重试的时候,就将ProducerBatch重新加入队列,等待下次重试的时候再从队列drain;


2 Kafka返回“MESSAGE_TOO_LARGE”时,进行batch的拆分和冲入队列。




