1. producer初始化:加载默认配置,以及配置的参数,开启网络线程
2. 拦截器拦截
3. 序列化器进行消息key, value序列化
4. 进行分区
5. kafka broker集群 获取metaData
6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)
7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient
RecordBatch ==>RequestClient 发送消息体,
8. 与分区相同broker建立网络连接,发送到对应broker
1. send()方法参数producerRecord对象:
b.不指定分区,k值没有传入,使用黏性分区(sticky partition)
第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法
c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。
如 k hash = 5 topic目前的分区数2 则 分区为:1
k hash =6 topic目前的分区数2 则 分区为:0
2. KafkaProducer 异步, 同步发送api:
kafka 的send方法核心逻辑:
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- return this.send(record, (Callback)null);
- }
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- // 拦截器集合。多个拦截对象循环遍历
- ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
- return this.doSend(interceptedRecord, callback);
- }
- private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- TopicPartition tp = null;
- // 获取集群信息metadata
- try {
- this.throwIfProducerClosed();
- long nowMs = this.time.milliseconds();
- ClusterAndWaitTime clusterAndWaitTime;
- try {
- clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
- } catch (KafkaException var22) {
- if (this.metadata.isClosed()) {
- throw new KafkaException("Producer closed while send in progress", var22);
- }
- throw var22;
- }
- nowMs += clusterAndWaitTime.waitedOnMetadataMs;
- long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
- Cluster cluster = clusterAndWaitTime.cluster;
- // 序列化器 key序列化
- byte[] serializedKey;
- try {
- serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
- } catch (ClassCastException var21) {
- throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
- }
- // 序列化器 value序列化
- byte[] serializedValue;
- try {
- serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
- } catch (ClassCastException var20) {
- throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
- }
- // 分区
- int partition = this.partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- this.setReadOnly(record.headers());
- Header[] headers = record.headers().toArray();
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
- this.ensureValidRecordSize(serializedSize);
- long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
- if (this.log.isTraceEnabled()) {
- this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
- }
- Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
- // RecordAccumulator.append() 添加数据转 ProducerBatch
- RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
- if (result.abortForNewBatch) {
- int prevPartition = partition;
- this.partitioner.onNewBatch(record.topic(), cluster, partition);
- partition = this.partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- if (this.log.isTraceEnabled()) {
- this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
- }
- interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
- result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
- }
- if (this.transactionManager != null) {
- this.transactionManager.maybeAddPartition(tp);
- }
- // 判断是否满了,满了唤醒sender , sender继承了runnable
- if (result.batchIsFull || result.newBatchCreated) {
- this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
- return result.future;
- } catch (ApiException var23) {
- this.log.debug("Exception occurred during message send:", var23);
- if (tp == null) {
- tp = ProducerInterceptors.extractTopicPartition(record);
- }
- Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
- interceptCallback.onCompletion((RecordMetadata)null, var23);
- this.errors.record();
- this.interceptors.onSendError(record, tp, var23);
- return new FutureFailure(var23);
- } catch (InterruptedException var24) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, var24);
- throw new InterruptException(var24);
- } catch (KafkaException var25) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, var25);
- throw var25;
- } catch (Exception var26) {
- this.interceptors.onSendError(record, tp, var26);
- throw var26;
- }
- }

Sender类 run()方法:
- public void run() {
- this.log.debug("Starting Kafka producer I/O thread.");
- while(this.running) {
- try {
- this.runOnce();
- } catch (Exception var5) {
- this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
- }
- }
- this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
- while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
- try {
- this.runOnce();
- } catch (Exception var4) {
- this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
- }
- }
- while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
- if (!this.transactionManager.isCompleting()) {
- this.log.info("Aborting incomplete transaction due to shutdown");
- this.transactionManager.beginAbort();
- }
- try {
- this.runOnce();
- } catch (Exception var3) {
- this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
- }
- }
- if (this.forceClose) {
- if (this.transactionManager != null) {
- this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
- this.transactionManager.close();
- }
- this.log.debug("Aborting incomplete batches due to forced shutdown");
- this.accumulator.abortIncompleteBatches();
- }
- try {
- this.client.close();
- } catch (Exception var2) {
- this.log.error("Failed to close network client", var2);
- }
- this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
- }
- void runOnce() {
- if (this.transactionManager != null) {
- try {
- this.transactionManager.maybeResolveSequences();
- if (this.transactionManager.hasFatalError()) {
- RuntimeException lastError = this.transactionManager.lastError();
- if (lastError != null) {
- this.maybeAbortBatches(lastError);
- }
- this.client.poll(this.retryBackoffMs, this.time.milliseconds());
- return;
- }
- this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
- if (this.maybeSendAndPollTransactionalRequest()) {
- return;
- }
- } catch (AuthenticationException var5) {
- this.log.trace("Authentication exception while processing transactional request", var5);
- this.transactionManager.authenticationFailed(var5);
- }
- }
- long currentTimeMs = this.time.milliseconds();
- // 发送数据
- long pollTimeout = this.sendProducerData(currentTimeMs);
- this.client.poll(pollTimeout, currentTimeMs);
- }

sendProducerData() :
ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback); this.client.send(clientRequest, now);
- private long sendProducerData(long now) {
- Cluster cluster = this.metadata.fetch();
- RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
- Iterator iter;
- if (!result.unknownLeaderTopics.isEmpty()) {
- iter = result.unknownLeaderTopics.iterator();
- while(iter.hasNext()) {
- String topic = (String)iter.next();
- this.metadata.add(topic, now);
- }
- this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
- this.metadata.requestUpdate();
- }
- iter = result.readyNodes.iterator();
- long notReadyTimeout = Long.MAX_VALUE;
- while(iter.hasNext()) {
- Node node = (Node)iter.next();
- if (!this.client.ready(node, now)) {
- iter.remove();
- notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
- }
- }
- Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
- this.addToInflightBatches(batches);
- List expiredBatches;
- Iterator var11;
- ProducerBatch expiredBatch;
- if (this.guaranteeMessageOrder) {
- Iterator var9 = batches.values().iterator();
- while(var9.hasNext()) {
- expiredBatches = (List)var9.next();
- var11 = expiredBatches.iterator();
- while(var11.hasNext()) {
- expiredBatch = (ProducerBatch)var11.next();
- this.accumulator.mutePartition(expiredBatch.topicPartition);
- }
- }
- }
- this.accumulator.resetNextBatchExpiryTime();
- List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
- expiredBatches = this.accumulator.expiredBatches(now);
- expiredBatches.addAll(expiredInflightBatches);
- if (!expiredBatches.isEmpty()) {
- this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
- }
- var11 = expiredBatches.iterator();
- while(var11.hasNext()) {
- expiredBatch = (ProducerBatch)var11.next();
- String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
- this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
- if (this.transactionManager != null && expiredBatch.inRetry()) {
- this.transactionManager.markSequenceUnresolved(expiredBatch);
- }
- }
- this.sensors.updateProduceRequestMetrics(batches);
- long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
- pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
- pollTimeout = Math.max(pollTimeout, 0L);
- if (!result.readyNodes.isEmpty()) {
- this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
- pollTimeout = 0L;
- }
- this.sendProduceRequests(batches, now);
- return pollTimeout;
- }
- private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
- Iterator var4 = collated.entrySet().iterator();
- while(var4.hasNext()) {
- Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
- this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
- }
- }
- private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
- if (!batches.isEmpty()) {
- Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
- byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
- Iterator var9 = batches.iterator();
- while(var9.hasNext()) {
- ProducerBatch batch = (ProducerBatch)var9.next();
- if (batch.magic() < minUsedMagic) {
- minUsedMagic = batch.magic();
- }
- }
- ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
- Iterator var16 = batches.iterator();
- while(var16.hasNext()) {
- ProducerBatch batch = (ProducerBatch)var16.next();
- TopicPartition tp = batch.topicPartition;
- MemoryRecords records = batch.records();
- if (!records.hasMatchingMagic(minUsedMagic)) {
- records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
- }
- ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
- if (tpData == null) {
- tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
- tpd.add(tpData);
- }
- tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
- recordsByPartition.put(tp, batch);
- }
- String transactionalId = null;
- if (this.transactionManager != null && this.transactionManager.isTransactional()) {
- transactionalId = this.transactionManager.transactionalId();
- }
- ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
- RequestCompletionHandler callback = (response) -> {
- this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
- };
- String nodeId = Integer.toString(destination);
- ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
- // this.client 为KafkaClient接口 实现类:NetworkClient对象
- this.client.send(clientRequest, now);
- this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
- }
- }

NetworkClient send()方法:
- public void send(ClientRequest request, long now) {
- this.doSend(request, false, now);
- }
- private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
- this.ensureActive();
- String nodeId = clientRequest.destination();
- if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
- throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
- } else {
- AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
- try {
- NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
- short version;
- if (versionInfo == null) {
- version = builder.latestAllowedVersion();
- if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
- this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
- }
- } else {
- version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
- }
- this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
- } catch (UnsupportedVersionException var9) {
- this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
- ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
- if (!isInternalRequest) {
- this.abortedSends.add(clientResponse);
- } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
- this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
- }
- }
- }
- }
- private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
- String destination = clientRequest.destination();
- RequestHeader header = clientRequest.makeHeader(request.version());
- if (this.log.isDebugEnabled()) {
- this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
- }
- Send send = request.toSend(header);
- // clientRequest convert InFlightRequest 对象
- InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
- this.inFlightRequests.add(inFlightRequest);
- // nio channel。。。selector 发送消息信息
- //this.selector is Selectable interface KafkaChannel is implement
- this.selector.send(new NetworkSend(clientRequest.destination(), send));
- }

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。 核心==>
