赞
踩
public boolean producer(String key, String value) {
//在客户端调用send方法时,需要先构造好ProducerRecord对象
ProducerRecord kafkaMessage = new ProducerRecord(topic, key, value);
try {
//异步方式返回的是一个future对象,在这对象上调用get方法,将被阻塞直到返回结果,从而实现同步
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
RecordMetadata recordMetadata = (RecordMetadata)metadataFuture.get();
logger.info("Produce ok:" + recordMetadata.offset());
return true;
} catch (Exception var6) {
logger.error("Error occurred from topic:{},error:{}", topic, var6);
return false;
}
}
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
如果在发送时指定了partition,则消息将被保存到指定的tp分区队列,如果没有指定分区,将对key散列后来计算分区,相同key的消息将被写到同一个分区队列中,如果key是null,且使用默认的分区器,则分区器将用轮询的方法(Round Robin)将序列化后的消息均衡分布到不同的队列中,sender线程从Accumulator中取出批量数据组成一个batch发送
在producer端,存在2个线程,一个是producer主线程,用户端调用send消息时,是在主线程执行的,数据被缓存到RecordAccumulator中,send方法即刻返回,也就是说此时并不能确定消息是否真正的发送到broker。另外一个是sender IO线程,其不断轮询RecordAccumulator,满足一定条件后,就进行真正的网络IO发送,使用的是异步非阻塞的NIO。主线程的send方法提供了一个用于回调的参数,当sender线程发送完后,回调函数将被调用,可以用来处理成功,失败或异常的逻辑
sender线程会发送多种类型的请求,以下是kafka集群元数据的请求过程
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//执行用户自定义拦截器插件
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
//计算推送到哪个partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); //校验消息内容不能超过限制值 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // 生产者回调将确保调用'回调'和拦截回调 Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } //在record被加入到accumulator时,会根据record所在的tp找到RecordBatch队列,如果不存在,就新建一个队列,在队列中取出最后一个RecordBatch,如果这个batch还有空间,就把record新追加到缓存后面,这样1个batch可能会有多个record,如果batch空间不够,就新创建一个batch,重新分配一个Max(16k, recordsize)的buffer,如果这个record超过16k,则这个batch中只会保存这1个record RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) {//默认false int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } //发送事务处理 if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); if (result.batchIsFull || result.newBatchCreated) { //RecordBatch满了或者创建新的RecordBatch,触发sender线程。sender线程在KafkaProducer实例化时就已启动并阻塞在poll方法,wakeup()它的作用就是将 Sender 线程从poll方法的阻塞中唤醒,之后从accumulator取出RecordBatch并发送到broker log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
分区选择器计算出分区之后得到TopicPartition,在消息record(key、value)被加入到accumulator时,会根据TopicPartition找到RecordBatch队列,如果不存在,就新建一个队列,在队列中取出最后一个RecordBatch,如果这个batch还有空间,就把record新追加到缓存后面,这样1个batch可能会有多个record,如果batch空间不够,就新创建一个batch,重新分配一个Max(16k, recordsize)的buffer,如果这个record超过16k,则这个batch中只会保存这1个record
先看下kafka的网络设计,kafka的网络实际上是封装了java.nio的Selector、SocketChanel。
public class Sender implements Runnable { private final Logger log; //kafka 网络通信客户端,主要封装与 broker 的网络通信。 private final KafkaClient client; //消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。 private final RecordAccumulator accumulator; //元数据管理器,即 topic 的路由分区信息。 private final ProducerMetadata metadata; //是否需要保证消息的顺序性。 private final boolean guaranteeMessageOrder; //调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。 private final int maxRequestSize; //用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下0、-1、1. private final short acks; //重试次数。 private final int retries; /* the clock instance used for getting the time */ private final Time time; //该线程状态,为 true 表示运行中。 private volatile boolean running; //是否强制关闭,此时会忽略正在发送中的消息。 private volatile boolean forceClose; //消息发送相关的统计指标收集器。 private final SenderMetrics sensors; //请求的超时时间。 private final int requestTimeoutMs; //请求失败之在重试之前等待的时间。 private final long retryBackoffMs; //API版本信息。 private final ApiVersions apiVersions; //事务处理器。 private final TransactionManager transactionManager; // 正在执行发送相关的消息批次。 private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches; }
Sender中的wakeup()方法最终调用WindowsSelectorImpl(基于windows操作系统实现的多路复用选择器)的wakeup(),它的作用就是将 Sender 线程从poll方法的阻塞中唤醒。poll方法的作用是轮询注册在多路复用器上的 Channel,它会一直阻塞在这个方法上,除非满足下面条件中的一个:
否则 poll 将会一直轮询,阻塞在这个地方,直到条件满足。
因此,sender.wakeup() 方法的作用就是:当有新的 RecordBatch 创建后,旧的 RecordBatch 就可以发送了(或者此时有 Metadata 请求需要发送),如果线程阻塞在 select() 方法中,就将其唤醒,Sender 重新开始运行 run() 方法,在这个方法中,旧的 RecordBatch (或相应的 Metadata 请求)将会被选中,进而可以及时将这些请求发送出去。
public void run() { log.debug("Starting Kafka producer I/O thread."); while (running) { try { runOnce(); // @1 } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { // @2 try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // @3 log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); // @4 } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }
代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。
void runOnce() { if (transactionManager != null) { try { transactionManager.maybeResolveSequences(); // do not continue sending if the transaction manager is in a failed state if (transactionManager.hasFatalError()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } // Check whether we need a new producerId. If so, we will enqueue an InitProducerId // request which will be sent below transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); if (maybeSendAndPollTransactionalRequest()) { return; } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request", e); transactionManager.authenticationFailed(e); } } long currentTimeMs = time.milliseconds(); // 调用 sendProducerData 方法,将消息封装成List<ProducerBatch>,再组装成ClientRequest。然后最终会通过 NetworkClient#send 方法,将该批数据(InFlightRequest)设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。 long pollTimeout = sendProducerData(currentTimeMs); //触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker,并且接收响应进行处理 client.poll(pollTimeout, currentTimeMs); }
poll方法调用Selector的poll方法
/* check ready keys */ long startSelect = time.nanoseconds(); //调用nio.selector,获取准备好的socket数量,并将socket放入this.nioSelector.selectedKeys() int numReadyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set<SelectionKey> toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // 遍历readyKeys,处理注册的感兴趣事件 pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" }
消息最终的的发送是在sender线程的run()方法开始的,如图是执行过程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。