当前位置:   article > 正文

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解_java kafka 生产者推送

java kafka 生产者推送

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

     1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

     2. 拦截器拦截

     3. 序列化器进行消息key, value序列化

     4. 进行分区

     5. kafka broker集群 获取metaData

     6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

     7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

         RecordBatch ==>RequestClient 发送消息体,

      8. 与分区相同broker建立网络连接,发送到对应broker

 1. send()方法参数producerRecord对象:

    关于分区:

      a.指定分区,则发送到该分区    

      b.不指定分区,k值没有传入,使用黏性分区(sticky partition

                 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

      c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

      如 k hash = 5 topic目前的分区数2  则 分区为:1

          k  hash =6  topic目前的分区数2  则 分区为:0

2. KafkaProducer 异步, 同步发送api:

    异步发送:

                    producer.send(producerRecord对象);

    同步发送则send()方法后面.get()



kafka 的send方法核心逻辑:

  1. public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
  2. return this.send(record, (Callback)null);
  3. }
  4. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  5. // 拦截器集合。多个拦截对象循环遍历
  6. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  7. return this.doSend(interceptedRecord, callback);
  8. }
  9. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  10. TopicPartition tp = null;
  11. // 获取集群信息metadata
  12. try {
  13. this.throwIfProducerClosed();
  14. long nowMs = this.time.milliseconds();
  15. ClusterAndWaitTime clusterAndWaitTime;
  16. try {
  17. clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
  18. } catch (KafkaException var22) {
  19. if (this.metadata.isClosed()) {
  20. throw new KafkaException("Producer closed while send in progress", var22);
  21. }
  22. throw var22;
  23. }
  24. nowMs += clusterAndWaitTime.waitedOnMetadataMs;
  25. long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
  26. Cluster cluster = clusterAndWaitTime.cluster;
  27. // 序列化器 key序列化
  28. byte[] serializedKey;
  29. try {
  30. serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
  31. } catch (ClassCastException var21) {
  32. throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
  33. }
  34. // 序列化器 value序列化
  35. byte[] serializedValue;
  36. try {
  37. serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
  38. } catch (ClassCastException var20) {
  39. throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
  40. }
  41. // 分区
  42. int partition = this.partition(record, serializedKey, serializedValue, cluster);
  43. tp = new TopicPartition(record.topic(), partition);
  44. this.setReadOnly(record.headers());
  45. Header[] headers = record.headers().toArray();
  46. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
  47. this.ensureValidRecordSize(serializedSize);
  48. long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
  49. if (this.log.isTraceEnabled()) {
  50. this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
  51. }
  52. Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
  53. // RecordAccumulator.append() 添加数据转 ProducerBatch
  54. RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
  55. if (result.abortForNewBatch) {
  56. int prevPartition = partition;
  57. this.partitioner.onNewBatch(record.topic(), cluster, partition);
  58. partition = this.partition(record, serializedKey, serializedValue, cluster);
  59. tp = new TopicPartition(record.topic(), partition);
  60. if (this.log.isTraceEnabled()) {
  61. this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
  62. }
  63. interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
  64. result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
  65. }
  66. if (this.transactionManager != null) {
  67. this.transactionManager.maybeAddPartition(tp);
  68. }
  69. // 判断是否满了,满了唤醒sender , sender继承了runnable
  70. if (result.batchIsFull || result.newBatchCreated) {
  71. this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  72. this.sender.wakeup();
  73. }
  74. return result.future;
  75. } catch (ApiException var23) {
  76. this.log.debug("Exception occurred during message send:", var23);
  77. if (tp == null) {
  78. tp = ProducerInterceptors.extractTopicPartition(record);
  79. }
  80. Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
  81. interceptCallback.onCompletion((RecordMetadata)null, var23);
  82. this.errors.record();
  83. this.interceptors.onSendError(record, tp, var23);
  84. return new FutureFailure(var23);
  85. } catch (InterruptedException var24) {
  86. this.errors.record();
  87. this.interceptors.onSendError(record, tp, var24);
  88. throw new InterruptException(var24);
  89. } catch (KafkaException var25) {
  90. this.errors.record();
  91. this.interceptors.onSendError(record, tp, var25);
  92. throw var25;
  93. } catch (Exception var26) {
  94. this.interceptors.onSendError(record, tp, var26);
  95. throw var26;
  96. }
  97. }

  Sender类 run()方法:

  1. public void run() {
  2. this.log.debug("Starting Kafka producer I/O thread.");
  3. while(this.running) {
  4. try {
  5. this.runOnce();
  6. } catch (Exception var5) {
  7. this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
  8. }
  9. }
  10. this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
  11. while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
  12. try {
  13. this.runOnce();
  14. } catch (Exception var4) {
  15. this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
  16. }
  17. }
  18. while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
  19. if (!this.transactionManager.isCompleting()) {
  20. this.log.info("Aborting incomplete transaction due to shutdown");
  21. this.transactionManager.beginAbort();
  22. }
  23. try {
  24. this.runOnce();
  25. } catch (Exception var3) {
  26. this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
  27. }
  28. }
  29. if (this.forceClose) {
  30. if (this.transactionManager != null) {
  31. this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
  32. this.transactionManager.close();
  33. }
  34. this.log.debug("Aborting incomplete batches due to forced shutdown");
  35. this.accumulator.abortIncompleteBatches();
  36. }
  37. try {
  38. this.client.close();
  39. } catch (Exception var2) {
  40. this.log.error("Failed to close network client", var2);
  41. }
  42. this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
  43. }
  44. void runOnce() {
  45. if (this.transactionManager != null) {
  46. try {
  47. this.transactionManager.maybeResolveSequences();
  48. if (this.transactionManager.hasFatalError()) {
  49. RuntimeException lastError = this.transactionManager.lastError();
  50. if (lastError != null) {
  51. this.maybeAbortBatches(lastError);
  52. }
  53. this.client.poll(this.retryBackoffMs, this.time.milliseconds());
  54. return;
  55. }
  56. this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
  57. if (this.maybeSendAndPollTransactionalRequest()) {
  58. return;
  59. }
  60. } catch (AuthenticationException var5) {
  61. this.log.trace("Authentication exception while processing transactional request", var5);
  62. this.transactionManager.authenticationFailed(var5);
  63. }
  64. }
  65. long currentTimeMs = this.time.milliseconds();
  66. // 发送数据
  67. long pollTimeout = this.sendProducerData(currentTimeMs);
  68. this.client.poll(pollTimeout, currentTimeMs);
  69. }

  sendProducerData() :

      最终转换为ClientRequest对象

         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
         this.client.send(clientRequest, now);
  1. private long sendProducerData(long now) {
  2. Cluster cluster = this.metadata.fetch();
  3. RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  4. Iterator iter;
  5. if (!result.unknownLeaderTopics.isEmpty()) {
  6. iter = result.unknownLeaderTopics.iterator();
  7. while(iter.hasNext()) {
  8. String topic = (String)iter.next();
  9. this.metadata.add(topic, now);
  10. }
  11. this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
  12. this.metadata.requestUpdate();
  13. }
  14. iter = result.readyNodes.iterator();
  15. long notReadyTimeout = Long.MAX_VALUE;
  16. while(iter.hasNext()) {
  17. Node node = (Node)iter.next();
  18. if (!this.client.ready(node, now)) {
  19. iter.remove();
  20. notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
  21. }
  22. }
  23. Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
  24. this.addToInflightBatches(batches);
  25. List expiredBatches;
  26. Iterator var11;
  27. ProducerBatch expiredBatch;
  28. if (this.guaranteeMessageOrder) {
  29. Iterator var9 = batches.values().iterator();
  30. while(var9.hasNext()) {
  31. expiredBatches = (List)var9.next();
  32. var11 = expiredBatches.iterator();
  33. while(var11.hasNext()) {
  34. expiredBatch = (ProducerBatch)var11.next();
  35. this.accumulator.mutePartition(expiredBatch.topicPartition);
  36. }
  37. }
  38. }
  39. this.accumulator.resetNextBatchExpiryTime();
  40. List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
  41. expiredBatches = this.accumulator.expiredBatches(now);
  42. expiredBatches.addAll(expiredInflightBatches);
  43. if (!expiredBatches.isEmpty()) {
  44. this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
  45. }
  46. var11 = expiredBatches.iterator();
  47. while(var11.hasNext()) {
  48. expiredBatch = (ProducerBatch)var11.next();
  49. String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
  50. this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
  51. if (this.transactionManager != null && expiredBatch.inRetry()) {
  52. this.transactionManager.markSequenceUnresolved(expiredBatch);
  53. }
  54. }
  55. this.sensors.updateProduceRequestMetrics(batches);
  56. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  57. pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
  58. pollTimeout = Math.max(pollTimeout, 0L);
  59. if (!result.readyNodes.isEmpty()) {
  60. this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
  61. pollTimeout = 0L;
  62. }
  63. this.sendProduceRequests(batches, now);
  64. return pollTimeout;
  65. }
  66. private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
  67. Iterator var4 = collated.entrySet().iterator();
  68. while(var4.hasNext()) {
  69. Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
  70. this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
  71. }
  72. }
  73. private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  74. if (!batches.isEmpty()) {
  75. Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
  76. byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
  77. Iterator var9 = batches.iterator();
  78. while(var9.hasNext()) {
  79. ProducerBatch batch = (ProducerBatch)var9.next();
  80. if (batch.magic() < minUsedMagic) {
  81. minUsedMagic = batch.magic();
  82. }
  83. }
  84. ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
  85. Iterator var16 = batches.iterator();
  86. while(var16.hasNext()) {
  87. ProducerBatch batch = (ProducerBatch)var16.next();
  88. TopicPartition tp = batch.topicPartition;
  89. MemoryRecords records = batch.records();
  90. if (!records.hasMatchingMagic(minUsedMagic)) {
  91. records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
  92. }
  93. ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
  94. if (tpData == null) {
  95. tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
  96. tpd.add(tpData);
  97. }
  98. tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
  99. recordsByPartition.put(tp, batch);
  100. }
  101. String transactionalId = null;
  102. if (this.transactionManager != null && this.transactionManager.isTransactional()) {
  103. transactionalId = this.transactionManager.transactionalId();
  104. }
  105. ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
  106. RequestCompletionHandler callback = (response) -> {
  107. this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
  108. };
  109. String nodeId = Integer.toString(destination);
  110. ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
  111. // this.client 为KafkaClient接口 实现类:NetworkClient对象
  112. this.client.send(clientRequest, now);
  113. this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
  114. }
  115. }

 NetworkClient send()方法:

  1. public void send(ClientRequest request, long now) {
  2. this.doSend(request, false, now);
  3. }
  4. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
  5. this.ensureActive();
  6. String nodeId = clientRequest.destination();
  7. if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
  8. throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
  9. } else {
  10. AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
  11. try {
  12. NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
  13. short version;
  14. if (versionInfo == null) {
  15. version = builder.latestAllowedVersion();
  16. if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
  17. this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
  18. }
  19. } else {
  20. version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
  21. }
  22. this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
  23. } catch (UnsupportedVersionException var9) {
  24. this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
  25. ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
  26. if (!isInternalRequest) {
  27. this.abortedSends.add(clientResponse);
  28. } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
  29. this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
  30. }
  31. }
  32. }
  33. }
  34. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
  35. String destination = clientRequest.destination();
  36. RequestHeader header = clientRequest.makeHeader(request.version());
  37. if (this.log.isDebugEnabled()) {
  38. this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
  39. }
  40. Send send = request.toSend(header);
  41. // clientRequest convert InFlightRequest 对象
  42. InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
  43. this.inFlightRequests.add(inFlightRequest);
  44. // nio channel。。。selector 发送消息信息
  45. //this.selector is Selectable interface KafkaChannel is implement
  46. this.selector.send(new NetworkSend(clientRequest.destination(), send));
  47. }

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。  核心==>

   本文第一张图片

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/839425
推荐阅读
相关标签
  

闽ICP备14008679号