赞
踩
废话少说,直接上总结。
1:KafkaProducer类,详细源码解析见:(4.1)kafka生产者源码——KafkaProducer类
作用:用于发送数据而提供的kafka 客户端,进行发送数据前的,集群连接配置,网络连接配置,用于向RecordAccumulator写数据。
2:RecordAccumulator消息累加器,用于数据缓存,内存管理,源码详情 (4.2)kafka生产者源码——RecordAccumulator类
3:Sender独立于KafkaProducer的线程,负责发送RecordAccumulator数据前的准备工作,创建网络io请求,操作网络io层NetworkClient。源码详见 (4.4)kafka生产者源码——Sender线程类
4:NetworkClient是封装的一层网络客户端,用于生产者和消费者发送请求和获取响应,调用网络实现层Selector。源码详见 kafka网络通信客户端-NetworkClient类
5:Selector,该类是 Kafka 网络层最重要最核心的实现,也是非常经典的工业级通信框架实现,用于连接服务端,处理请求。源码详见 (3.4)broker源码——Kafka 网络层实现机制之 Selector 多路复用器
kafka生产者流程图
下面进行详细分析。
1:开始生产数据demo详见生产者demo
KafkaProducer.send(new ProducerRecord<>(topic, messageNo,messageStr))
初始化KafkaProducer,调用其send方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null;/*记录topic-分区数*/ try { // first make sure the metadata for the topic is available ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey;/*数据序列化,转为 byte[] */ try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } //value的序列化 byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //1.1 对要发送的数据计算其要存储的topic-partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); /*序列化后的消息大小 */ int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); /*消息大小合格校验,单条默认最大不能超过1m,也不能超过缓冲区大小32M*/ ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback生产者回调将确保同时调用“回调”和拦截器回调 Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //1.2:向RecordAccumulator累加器内存缓冲区中的batch添加数据 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); //1.3:对累加器中的数据进行判断,是否达到batch发送的条件,符合发送要求,唤醒sender线程发送 /*也就是说处理消息和发送是两个分开的线程,主线程构造数据和更新回调函数FutureRecordMetadata。 *sender采用异步发送+ 获取FutureRecordMetadata回调函数确保数据发送的状态*/ if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } }
1.1:调用分区器 分区器分析文档,计算分区,没有指定key时默认采用轮询方式进行发送数据,所以指定key时可能发生数据倾斜现象。
1.2:进入RecordAccumulator累加器,对缓存区大小,batch批数据进行管理。
详见文档 (4.2)kafka生产者源码——RecordAccumulator类
append向 private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
队列写入数据
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null;/*生产者是多线程的,所以下面采用加锁synchronized和双重追加数据检查机制tryAppend处理dq*/ if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch检查该topic分区是否有正进行的批次,获取该分区对应的Deque队列,每个队列中可能存在着多批次未发送的数据 Deque<ProducerBatch> dq = getOrCreateDeque(tp); /*只有一个线程进入*/ synchronized (dq) {/*通过加锁给dq批次追加数据,防止多线程不安全*/ if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /*情况1:如果写入分区对应的没有任何一个批次存在,说明需要先创建批次,添加失败appendResult=null*/ RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } //情况2:写入的分区没有正在进行的记录批处理,尝试创建的批处理追加数据 byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); /*取批次(默认16k)设置的值和消息大小(默认1M)取最大值,所以两者一般需要调优*/ int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); /*分配内存:取16k和消息大小的最大值*/ buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } /*下面三层封装进行了一些属性的分层解析,没有一次定义多次属性,解耦了*/ /*将分配的内存空间buffer封装为MemoryRecordsBuilder,用于写入record记录*/ MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); /*初始化ProducerBatch,初始化内存空间大小=。时对MemoryRecordsBuilder封装*/ ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); /*batch.tryAppend:消息如何按照二进制的规范写入MemoryRecordsBuilder,再写到ProducerBatch*/ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); /*封装后的batch,写入队列*/ dq.addLast(batch); incomplete.add(batch); //使用完毕释放掉buffer,进入bufferpool循环使用,不要在final块中取消分配此缓冲区,因为它正在记录批处理中使用 buffer = null; //更新是否批的状态,检查是否需要发送 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { //使用完毕释放掉buffer,进入bufferpool循环使用 if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) { /*取出未发送批次中的最新批次的数据*/ ProducerBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future == null) last.closeForRecordAppends(); else return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } return null; }
1.3:队列中接入数据后,就可以进行判断是否可以发送数据了。进入sender线程,先汇总数据以broker-tp汇总batch;创建clientrequest
void run(long now) { if (transactionManager != null) {/*对事务管理器的一些判断,默认不开启,为null*/ try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } /*重点在sendProducerData中*/ long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); } /*发送请求和构建请求的实现*/ private long sendProducerData(long now) { /*更新元数据,topic->partitions->partition leader->isr*/ Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send /*遍历所有的batch,判断可以发送的batch和获取可以发送batch对应的partition leader*/ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // remove any nodes we are not ready to send to移除没有准备好发送数据的节点 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); /*检查客户端broker是否准备好,能否连接到node节点*/ if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // batch是否需要被发送算法实现 // create produce requests创建发送请求,把发往同一个broker的所有batch都放在一起,得到batches,减小网络io和请求次数 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); /*消息有序性,默认true,也就是所有分区都会保证*/ if (guaranteeMessageOrder) { // Mute all the partitions drained将所有的分区加到标识中,标识该分区有正在处理的批次。 for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /*过期的批次*/ List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // we need to reset the producer id here. if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } } sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; pollTimeout = 0; } /*开始发送请求,会在里面构建clientRequest请求*/ sendProduceRequests(batches, now); return pollTimeout; }
1.6:sender线程构建clientRequest完成,调用networkClient进行send发送到selector复用器等待处理请求。此处详见 (3.4)broker源码——Kafka 网络层实现机制之 Selector 多路复用器
public void send(Send send) { // 1. 从请求中获取 connectionId String connectionId = send.destination(); // 2. 从数据包中获取对应连接 KafkaChannel channel = openOrClosingChannelOrFail(connectionId); // 3. 如果关闭连接集合中存在该连接 if (closingChannels.containsKey(connectionId)) { // ensure notification via `disconnected`, leave channel in the state in which closing was triggered // 把 connectionId 放入 failedSends 集合里 this.failedSends.add(connectionId); } else { try { // 4. 将send封装到KafkaChannel中。暂存数据预发送,并没有真正的发送,一次只能发送一个 channel.setSend(send); } catch (Exception e) { // update the state for consistency, the channel will be discarded after `close` // 5. 更新 KafkaChannel 的状态为发送失败 channel.state(ChannelState.FAILED_SEND); // ensure notification via `disconnected` when `failedSends` are processed in the next poll // 6. 把 connectionId 放入 failedSends 集合里 this.failedSends.add(connectionId); // 7. 关闭连接 close(channel, CloseMode.DISCARD_NO_NOTIFY); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); throw e; } } } }
将send请求封装到KafkaChannel.send属性上,并新增监听OP_WRITE事件,此时算请求发送完成,重新回到send.run()中开始处理请求进入到 client.poll
中
/* 1:重点在sendProducerData中,此时会汇总batch,寻找需要发送的partition leader,构建clientRequest到Selector中*/
long pollTimeout = sendProducerData(now);
/* 2:请求构建完成,这里会调用java.nio.Selector.select()方法取处理我们构建的clientRequest生产请求
client.poll(pollTimeout, now);
2:此时就简单了触发写事件,写完移除请求和写事件即可
从源码可知道,是申请的内存大小超过了总缓存空间大小。
涉及参数batch.size(批大小)、buffer.memory(总缓存空间大小),消息实际大小。传入的size取batch.size和消息实际大小的最大值。
解决:batch.size设置不能超过buffer.memory;消息中有超过buffer.memory大小的数据。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//要申请的内存大小大于总缓存空间抛出异常
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。