消息拦截器在消息发送开始阶段进行拦截,this method does not throw exceptions注释加上代码可以看出即使拦截器抛出异常也不会中止我们的消息发送。
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- // intercept the record, which can be potentially modified; this method does not throw exceptions
- //1.拦截器对发送的消息拦截处理;
- ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
- return doSend(interceptedRecord, callback);
- }
- public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
- ProducerRecord<K, V> interceptRecord = record;
- for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
- try {
- interceptRecord = interceptor.onSend(interceptRecord);
- } catch (Exception e) {
- // do not propagate interceptor exception, log and continue calling other interceptors
- // be careful not to throw exception from here
- if (record != null)
- log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
- else
- log.warn("Error executing interceptor onSend callback", e);
- }
- }
- return interceptRecord;
- }
- try {
- //2.获取元数据信息;
- clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
- } catch (KafkaException e) {
- if (metadata.isClosed())
- throw new KafkaException("Producer closed while send in progress", e);
- throw e;
- }
- Type.LONG,
- 60 * 1000,
- atLeast(0),
- Importance.MEDIUM,
- metadata.add(topic, nowMs + elapsed);
- int version = metadata.requestUpdateForTopic(topic);
- //唤醒线程更新元数据
- sender.wakeup();
- try {
- //阻塞等待
- metadata.awaitUpdate(version, remainingWaitMs);
- } catch (TimeoutException ex) {
- // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
- throw new TimeoutException(
- String.format("Topic %s not present in metadata after %d ms.",
- topic, maxWaitMs));
- }
这里可以看一下 sender线程的初始化参数:
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) 初始化内存池的大小为32M;
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) 默认单条消息最大为1M;
- this.sender = newSender(logContext, kafkaClient, this.metadata);
- String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
- this.ioThread.start();
- KafkaProducer(ProducerConfig config,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- ProducerMetadata metadata,
- KafkaClient kafkaClient,
- ProducerInterceptors<K, V> interceptors,
- Time time) {
- try {
- ...
- this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
- this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
- this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
- this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
- int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
- this.apiVersions = new ApiVersions();
- this.transactionManager = configureTransactionState(config, logContext);
- this.accumulator = new RecordAccumulator(logContext,
- config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
- this.compressionType,
- lingerMs(config),
- retryBackoffMs,
- deliveryTimeoutMs,
- metrics,
- time,
- apiVersions,
- transactionManager,
- new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
- ...
- this.errors = this.metrics.sensor("errors");
- this.sender = newSender(logContext, kafkaClient, this.metadata);
- String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
- this.ioThread.start();
- config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
- log.debug("Kafka producer started");
- } catch (Throwable t) {
- // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
- close(Duration.ofMillis(0), true);
- // now propagate the exception
- throw new KafkaException("Failed to construct kafka producer", t);
- }
- }
.define(ACKS_CONFIG, Type.STRING, "all", in("all", "-1", "0", "1"), Importance.LOW, ACKS_DOC) 默认所有broker同步消息才算发送成功;
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, atLeast(1), Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) 默认允许最多5个连接来发送消息;如果需要保证顺序消息需要将其设置为1.
- Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
- int maxInflightRequests = configureInflightRequests(producerConfig);
- int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
- ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
- Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
- KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
- new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- this.metrics, time, "producer", channelBuilder, logContext),
- metadata,
- clientId,
- maxInflightRequests,
- producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
- producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
- producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- requestTimeoutMs,
- producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
- producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
- time,
- true,
- apiVersions,
- throttleTimeSensor,
- logContext);
- short acks = configureAcks(producerConfig, log);
- return new Sender(logContext,
- client,
- metadata,
- this.accumulator,
- maxInflightRequests == 1,
- producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
- acks,
- producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
- metricsRegistry.senderMetrics,
- time,
- requestTimeoutMs,
- producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
- this.transactionManager,
- apiVersions);
- }
- public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
- long currentTimeMs = time.milliseconds();
- long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
- time.waitObject(this, () -> {
- // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
- maybeThrowFatalException();
- return updateVersion() > lastVersion || isClosed();
- }, deadlineMs);
- if (isClosed())
- throw new KafkaException("Requested metadata update after close");
- }
- public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs) throws InterruptedException {
- synchronized (obj) {
- while (true) {
- if (condition.get())
- return;
- long currentTimeMs = milliseconds();
- if (currentTimeMs >= deadlineMs)
- throw new TimeoutException("Condition not satisfied before deadline");
- obj.wait(deadlineMs - currentTimeMs);
- }
- }
- }
- Sender
- void runOnce() {
- client.poll(pollTimeout, currentTimeMs);
- }
- NetWorkClient:
- public List<ClientResponse> poll(long timeout, long now) {
- ensureActive();
- if (!abortedSends.isEmpty()) {
- // If there are aborted sends because of unsupported version exceptions or disconnects,
- // handle them immediately without waiting for Selector#poll.
- List<ClientResponse> responses = new ArrayList<>();
- handleAbortedSends(responses);
- completeResponses(responses);
- return responses;
- }
- long metadataTimeout = metadataUpdater.maybeUpdate(now);
- try {
- this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
- } catch (IOException e) {
- log.error("Unexpected error during I/O", e);
- }
- // process completed actions
- long updatedNow = this.time.milliseconds();
- List<ClientResponse> responses = new ArrayList<>();
- handleCompletedSends(responses, updatedNow);
- handleCompletedReceives(responses, updatedNow);
- handleDisconnections(responses, updatedNow);
- handleConnections();
- handleInitiateApiVersionRequests(updatedNow);
- handleTimedOutConnections(responses, updatedNow);
- handleTimedOutRequests(responses, updatedNow);
- completeResponses(responses);
- return responses;
- }
如下代码 handleCompletedReceives处理返回元数据的响应,然后调用handleSuccessfulResponse处理成功的响应,最后调用ProducerMetadata更新本地元数据信息并唤醒了主线程。主线程获取到元数据后进行下面流程。
- //NetWorkClient
- private void handleCompletedReceives(List<ClientResponse> responses, long now) {
- ...
- if (req.isInternalRequest && response instanceof MetadataResponse)
- metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
- }
- public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
- ...
- this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);
- }
- //ProducerMetadata
- public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
- super.update(requestVersion, response, isPartialUpdate, nowMs);
- // Remove all topics in the response that are in the new topic set. Note that if an error was encountered for a
- // new topic's metadata, then any work to resolve the error will include the topic in a full metadata update.
- if (!newTopics.isEmpty()) {
- for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {
- newTopics.remove(metadata.topic());
- }
- }
- notifyAll();
- }
- byte[] serializedKey;
- try {
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
- System.out.println("serializedKey:" + Arrays.toString(serializedKey));
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in key.serializer", cce);
- }
- byte[] serializedValue;
- try {
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in value.serializer", cce);
- }
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
- int numPartitions) {
- if (keyBytes == null) {
- return stickyPartitionCache.partition(topic, cluster);
- }
- // hash the keyBytes to choose a partition
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs)创建一个新的批次放到队列中dq.addLast(batch);
- public RecordAppendResult append(TopicPartition tp,
- long timestamp,
- byte[] key,
- byte[] value,
- Header[] headers,
- Callback callback,
- long maxTimeToBlock,
- boolean abortOnNewBatch,
- long nowMs) throws InterruptedException {
- // We keep track of the number of appending thread to make sure we do not miss batches in
- // abortIncompleteBatches().
- appendsInProgress.incrementAndGet();
- ByteBuffer buffer = null;
- if (headers == null) headers = Record.EMPTY_HEADERS;
- try {
- // check if we have an in-progress batch
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
- if (appendResult != null)
- return appendResult;
- }
- // we don't have an in-progress record batch try to allocate a new batch
- if (abortOnNewBatch) {
- // Return a result that will cause another call to append.
- return new RecordAppendResult(null, false, false, true);
- }
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
- // 内存池中分配内存
- buffer = free.allocate(size, maxTimeToBlock);
- // Update the current time in case the buffer allocation blocked above.
- nowMs = time.milliseconds();
- synchronized (dq) {
- // Need to check if producer is closed again after grabbing the dequeue lock.
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
- if (appendResult != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
- return appendResult;
- }
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
- FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
- callback, nowMs));
- dq.addLast(batch);
- incomplete.add(batch);
- // Don't deallocate this buffer in the finally block as it's being used in the record batch
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
- }
- } finally {
- if (buffer != null)
- free.deallocate(buffer);
- appendsInProgress.decrementAndGet();
- }
- }
- result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
