赞
踩
创建KafkaProducer对象,进入构造方法
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) { try { this.producerConfig = config; // 用户定义的配置 this.time = time; // 当前时间 String transactionalId = config.getString("transactional.id"); // 用户定义的事务id this.clientId = config.getString("client.id"); // 用户定义的的clientId LogContext logContext; // 记录日志 if (transactionalId == null) { logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId)); } else { logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId)); } this.log = logContext.logger(KafkaProducer.class); this.log.trace("Starting the Kafka producer"); // clientId的监控map Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId); // 监控配置,包括样本量,取样窗口时间,记录级别 MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags); // 监控数据上报类 List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId)); JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(config.originals(Collections.singletonMap("client.id", this.clientId))); reporters.add(jmxReporter); MetricsContext metricsContext = new KafkaMetricsContext("kafka.producer", config.originalsWithPrefix("metrics.context.")); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); // 获取分区器 this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class, Collections.singletonMap("client.id", this.clientId)); long retryBackoffMs = config.getLong("retry.backoff.ms"); // key序列化器 if (keySerializer == null) { this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true); } else { config.ignore("key.serializer"); this.keySerializer = keySerializer; } // value序列化器 if (valueSerializer == null) { this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false); } else { config.ignore("value.serializer"); this.valueSerializer = valueSerializer; } // 拦截器 List<ProducerInterceptor<K, V>> interceptorList = config.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class, Collections.singletonMap("client.id", this.clientId)); if (interceptors != null) { this.interceptors = interceptors; } else { this.interceptors = new ProducerInterceptors(interceptorList); // 拦截器是空的封装一下 } ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); // 一个请求大小,字节 this.maxRequestSize = config.getInt("max.request.size"); // 缓存总大小 this.totalMemorySize = config.getLong("buffer.memory"); // 压缩类型 this.compressionType = CompressionType.forName(config.getString("compression.type")); this.maxBlockTimeMs = config.getLong("max.block.ms"); int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log); this.apiVersions = new ApiVersions(); // 事务管理器 this.transactionManager = this.configureTransactionState(config, logContext); // 用户发送消息的收集器 this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics")); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup")); if (metadata != null) { // 元数据 this.metadata = metadata; } else { // 每隔一段时间都要更新集群的元数据,默认是5分钟 this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); // 发送者,调用NetworkClient的方法进行消息发送,是tcpq请求 this.sender = this.newSender(logContext, kafkaClient, this.metadata); String ioThreadName = "kafka-producer-network-thread | " + this.clientId; // 设置线程,参数一是参数名,参数二是serder,参数三设置为守护线程 this.ioThread = new KafkaThread(ioThreadName, this.sender, true); // 启动发送消息线程 this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds()); this.log.debug("Kafka producer started"); } catch (Throwable var22) { this.close(Duration.ofMillis(0L), true); throw new KafkaException("Failed to construct kafka producer", var22); } }
发送消息
无论是同步发送还是异步发送,都进入这个方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 调用拦截器
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 发送消息
return this.doSend(interceptedRecord, callback);
}
进入拦截器onSend()
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { // 消息 ProducerRecord<K, V> interceptRecord = record; // 遍历拦截器 Iterator var3 = this.interceptors.iterator(); while(var3.hasNext()) { ProducerInterceptor interceptor = (ProducerInterceptor)var3.next(); try { // 给消息执行拦截 interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception var6) { if (record != null) { log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", new Object[]{record.topic(), record.partition(), var6}); } else { log.warn("Error executing interceptor onSend callback", var6); } } } return interceptRecord; }
进入return this.doSend(interceptedRecord,acllback)
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; // 创建主题分区 try { this.throwIfProducerClosed(); long nowMs = this.time.milliseconds(); KafkaProducer.ClusterAndWaitTime clusterAndWaitTime; try { // 获取topic的元数据,确保topic的元数据可用 clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs); } catch (KafkaException var22) { if (this.metadata.isClosed()) { throw new KafkaException("Producer closed while send in progress", var22); } throw var22; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { // key序列化 serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException var21) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21); } byte[] serializedValue; try { // value序列化 serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException var20) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20); } // 调用分区器,分区的分配 int partition = this.partition(record, serializedKey, serializedValue, cluster); // 将消息发送到哪个主题哪个分区 tp = new TopicPartition(record.topic(), partition); // 设置只读 this.setReadOnly(record.headers()); // 设置头消息 Header[] headers = record.headers().toArray(); // 计算序列化后的大小 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers); this.ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (this.log.isTraceEnabled()) { this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition}); } // 消息从broker确认后,要调用的拦截器 Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.failIfNotReadyForSend(); } // 将消息追加到消息累加器 RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } // 如果是事务消息,设置事务管理器 if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } // 如果消息累加器中对应的分区中够一个批次,则唤醒发送线程 // 如果linger.ms时间到了,则需要创建新的批次,则需要唤醒发送线程 if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } // 返回future对象 return result.future; } catch (ApiException var23) { this.log.debug("Exception occurred during message send:", var23); if (callback != null) { callback.onCompletion((RecordMetadata)null, var23); } this.errors.record(); this.interceptors.onSendError(record, tp, var23); return new KafkaProducer.FutureFailure(var23); } catch (InterruptedException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw new InterruptException(var24); } catch (KafkaException var25) { this.errors.record(); this.interceptors.onSendError(record, tp, var25); throw var25; } catch (Exception var26) { this.interceptors.onSendError(record, tp, var26); throw var26; } }
指明 partition 的情况下,直接将指明的值直接作为 partiton 值
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到partition 值,也就是常说的 round-robin 算法
Producer 默认使用的 partitioner 是org.apache.kafka.clients.producer.internals.DefaultPartitioner
MetaData更新机制
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。