赞
踩
Kafka生产者在发送消息时主要存在两个线程:主线程 和 Sender线程。
通过这种暂存机制,可以提升Kafka的吞吐量:
Kafka生产者发送消息的整体流程如下:
由此,我们可以引出KafkaProducer的几个核心组件:
接下来,我们将从主线程和Sender线程两个方面来分析KafkaProducer的消息发送过程
我们从该发送消息的方法入口进入阅读源码
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
在调用发送消息方法后,会先进行一些准备工作:
这些准备工作不是我们这里关注的重点,感兴趣的可以自己去看一下源码
做完准备工作后,生产者接下来要开始发送消息了,首先就需要确认消息发送给topic下的哪个partition这
这里就用到了上文提到过的的一个组件——partitioner
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { ...... // 确定目标Partition int partition = partition(record, serializedKey, serializedValue, cluster); ...... } private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { // 若ProducerRecord中强制指定了partition, 则以该值为准 Integer partition = record.partition(); // 否则调用Partitioner动态计算对应的partition return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
在创建KafkaProducer时,可以通过"partitioner.class"配置来指定Partitioner的实现类。若未指定,则使用Kafka内置实现类——DefaultPartitioner。来看DefaultPartitioner的具体实现:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
// 如果没指定key,会调用stickyPartitionCache缓存,发送到该topic某个特定的partition上
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// 如果指定了key,则根据key的hash值确定一个分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
StickyPartitionCache实际就是一个缓存,其内部维护了一个 ConcurrentMap,用以保存某topic应该发送到哪个分区
public class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache; public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) { Integer part = indexCache.get(topic); if (part == null) { return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // 这里的prevPartition用于第一次写失败了,第二次会尽量换一个分区 if (oldPart == null || oldPart == prevPartition) { // 从集群的元数据中获取可以使用的分区 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { // 只有一个分区则使用当前分区 newPart = availablePartitions.get(0).partition(); } else { // 保证最后一定换了一个分区 while (newPart == null || newPart.equals(oldPart)) { // 使用ThreadLocalRandom来随机获取一个可用的分区, // 不同线程可能会产生不同的newPart,但是putIfAbsent保证设置时只会设置一次,从而保证线程安全 Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) { // 多个线程也只会设置一次 indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } }
通过以上代码,可以知道DefaultPartitioner的分区策略如下:
RecordAccumulator作为消息暂存者,其思想是将目的地Partition相同的消息放到一起,并按一定的"规格"(由"batch.size"配置指定)划分成多个"批次"(ProducerBatch),然后以批次为单位进行数据压缩&发送。示意图如下:
因此每一个TopicPartition都对应一个Batch队列,RecordAccumulator中使用一个ConcurrentHashMap进行存储:ConcurrentMap<TopicPartition, Deque> batches;
RecordAccumulator有两个核心方法,分别对应"存"和"取":
来看看append方法
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException { // appendsInProgress是一个AtomicInteger,用于记录正在进行存操作的线程数 appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // 从batches中获取该partition对应的队列,没有则创建,ConcurrentHashMap会保证多个线程获取的是同一个队列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 以 partition 为粒度进行同步的存操作!!!!! synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); // 尝试将内容写入batch,如果写入成功则直接返回 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) return appendResult; } // 如果写失败了,如果不允许创建新的批次,则先返回,等待下一次append //(第一次写默认为true,不允许创建新批次;第二次写默认为false) if (abortOnNewBatch) { // 第二个参数表示批次是否已满,第三个参数表示是否创建了新的批次 return new RecordAppendResult(null, false, false, true); } // 如果允许创建新的批次: // 开始在缓冲区内分配合适的空间 byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); // 更新当前时间 nowMs = time.milliseconds(); // 以partition为粒度开始第二次写入 synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); // 开始第二次写入,看看这时候空间释放了没有 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) { return appendResult; } // 在缓冲区上创建batch,在该batch上append,返回一个future MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callback, nowMs)); // 将新创建的批次放入队列中和未完成的批次集合中 dq.addLast(batch); incomplete.add(batch); buffer = null; // 将该future存入RecordAppendResult中返回,第三个参数为true表示已经创建了新的批次 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { // 写入完毕释放临时缓冲区 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
来看 tryAppend 方法,就是先从队列中获取最后一个批次ProducerBatch,只要批次存在,就在这个批次中写入数据,返回一个future,主题逻辑跟上面一样,都是通过ProducerBatch进行写数据
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
那么具体是如何在这个批次上进行数据写入的呢,我们来看 ProducerBatch 的 tryAppend 方法
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { // 首先检查该recordsBuilder是否有充足的空间写入 if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } else { // 在recordsBuilder上写入数据(包含数据压缩),并返回写入数据的CRC Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); // 更新最大记录大小和更新时间 this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); this.lastAppendTime = now; // 返回这条写入记录的Future,其中produceFuture为这个ProducerBatch公用的Future, // recordCount为这条记录的偏移量,checksum为这条记录的CRC // 此外,还包括了这条记录key和value的大小,通过这些数据可以唯一确认和校验一条记录!!!! FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM); thunks.add(new Thunk(callback, future)); // 更新偏移量 this.recordCount++; return future; } }
可以看到 ProducerBatch又通过recordsBuilder进行数据写入,这里就不再继续往下看了,内部实际就是对要发送的消息进行了数据压缩,并通过计算得到这批数据的CRC校验值。
写完后返回了这条数据的RecordMetadata,其中包含了这条数据的偏移量,校验值,内容长度,通过这些数据可以唯一确认和校验一条记录,此外,还包含了各个批次一个公有的produceFuture,这个Future是什么稍后再看
append完成后,我们再回到 KafkaProducer的 doSend()方法
// 如果在上面创建了新的批次,result里的abortForNewBatch为false // 如果在上面不允许创建批次(默认情况),接着往下走 if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, // 第二次写abortOnNewBatch参数修改为false serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } // 处理事务 if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); // 如果发现批次以及满了或者创建了新的批次,则唤醒sender线程!!!! if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } // 返回该batch的future return result.future;
可以看到,如果我们这一批写满了或者新的批次被创建出来了(实际就是上一批写满了),就会去唤醒Sender线程,告诉它我们暂存的数据够多了,该把这些数据发送给Server了!
最后,发送完毕后,返回的是我们上面方法创建的FutureRecordMetadata
那么这个返回的future到底是什么,它实际就是我们使用消费者线程发送消息后返回的Future<RecordMetadata>,RecordMetadata包含了消息发送的分区,分配的offset和消息的时间戳
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
来看看他内部包含的信息是如何写入的:
首先,在使用 ProducerBatch 写信息前,会在构建函数中将该Batch对应的topicPartition放入Future中
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
...
this.produceFuture = new ProduceRequestResult(topicPartition);
...
}
之后,在消息发送完毕后,也会将该消息对应的偏移量、logAppendTime, exception写入到该Future中。
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
}
由此,就可以通过该Future获取我们所需要的信息
以上主线程执行完毕,消息被写入了topicpartition的一个又一个batch中。Sender线程就开始负责发送数据了
Sender线程需要用到一个基础通信类NetworkClient,NetworkClient中有一个核心属性 Selectable selector,进行网络通讯时的 send和 poll 操作都是通过 selector实现的,org.apache.kafka.common.network.Selector 内部则通过 java.nio.channels.Selector 来实现,因此 Kafka内部本质是通过NIO进行网络通讯的
在KafkaProducer中,和Sender线程相关的有两个属性:
他们在 KafkaProducer 构造函数中被创建:
this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); ... return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
来看Sender的run方法,实际就是通过sendProducerData方法发送消息
public void run() { log.debug("Starting Kafka producer I/O thread."); while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } ..... } void runOnce() { ... ... // 1. 发送请求,并确定下一步的阻塞超时时间 long pollTimeout = sendProducerData(now); // 2. 处理端口事件,poll的timeout为上一步计算结果 client.poll(pollTimeout, now); }
来看sendProducerData方法:
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
进入accumulator的ready方法
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { Deque<ProducerBatch> deque = entry.getValue(); // 以分区为粒度,同步地获取各个分区准备好的Server节点 synchronized (deque) { // 先检查队列里是否有数据,没有数据直接跳过 ProducerBatch batch = deque.peekFirst(); if (batch != null) { TopicPartition part = entry.getKey(); // 寻找集群中该分区对应的Leader节点 Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !isMuted(part)) { long waitedTimeMs = batch.waitedTimeMs(nowMs); boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { // 没问题则将Leader节点加入准备好的Server节点的集合中 readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 检查客户端Client是否能连接到远程Server节点,连接不上就先移除
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
进入drain方法
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
// 把准备好的节点的batch返回
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
drainBatchesForOneNode()方法会去找准备好的节点存储的分区,然后找到分区对应的队列,从队列的头部取出批次,然后将该批次放入返回结果,并关闭该batch,释放批次占用的空间
返回的结果为<Node,List<ProducerBatch>>形式,其中Node表示Kafka集群的broker节点
Deque<ProducerBatch> deque = getDeque(tp); if (deque == null) continue; synchronized (deque) { ProducerBatch first = deque.peekFirst(); if (first == null) continue; boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; if (backoff) continue; // 控制发送批次数据的大小 if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due to // compression; in this case we will still eventually send this batch in a single request break; } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; boolean isTransactional = transactionManager != null && transactionManager.isTransactional(); ProducerIdAndEpoch producerIdAndEpoch = transactionManager != null ? transactionManager.producerIdAndEpoch() : null; ProducerBatch batch = deque.pollFirst(); if (producerIdAndEpoch != null && !batch.hasSequence()) { batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + "{} being sent to partition {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.baseSequence(), tp); transactionManager.addInFlightBatch(batch); } batch.close(); size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); }
有个疑问,drain获取的批次一定是放满数据的批次吗?谁明白的可以给我解答一下
........
sendProduceRequests(batches, now);
return pollTimeout;
取出批次后 sendProduceRequests 会将batch中的信息构建成 ClientRequest 通过 KafkaClient 的 send方法将消息发送至指定的partition,在发送完毕后会有一个 callback 来进行异步的处理
由此一来,Kafka整个消息的发送逻辑就完成了
参考文献:https://zhuanlan.zhihu.com/p/371361083
https://blog.csdn.net/wanger61?spm=1000.2115.3001.5343
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。