赞
踩
位置:org.apache.kafka.clients.producer;
作用:kafka提供的进行数据发送的客户端,进行数据发送前的配置,网络连接配置等,是位于producer和网络通信之间的桥梁。
主要属性和方法如下
private final Partitioner partitioner;/*指定分区器。发送消息时,消息如何选择分区,默认分区器为随机分配*/ private final int maxRequestSize; private final long totalMemorySize; /*生产者允许的最大用于消息缓存的内存大小*/ private final Metadata metadata;/*集群元数据*/ private final RecordAccumulator accumulator;/*记录累加器,对批的发送条件进行记录,保证消息达到累加器的要求能够正常发送出去*/ private final Sender sender;/*发送的线程类*/ private final Thread ioThread;/*发送的线程名称,内部调用Sender*/ private final CompressionType compressionType;/*消息 的压缩方式*/ private final Sensor errors; private final Time time; private final ExtendedSerializer<K> keySerializer;/*序列化器,将消息转为byte[]字节数组*/ private final ExtendedSerializer<V> valueSerializer; private final ProducerConfig producerConfig;/*生产端的配置参数*/ private final long maxBlockTimeMs; private final int requestTimeoutMs;/*客户端发出请求后等待响应的时间*/ private final ProducerInterceptors<K, V> interceptors;/*拦截器,对应的有ConsumerInterceptors*/ private final ApiVersions apiVersions; /*版本*/ private final TransactionManager transactionManager;/*事务管理器,默认关闭*/
发送初始化时,进行初始化
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),/*默认的批大小,默认16k*/
this.totalMemorySize,/*分配的总的用于缓存数据的内存大小,默认16M*/
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(...)
this.sender = new Sender(...)
发送数据真正调用的方法send 在0.9版本后只有异步发送了
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } /*异步发送,有回调函数*/ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions /*拦截器对非法消息的一些判断处理*/ ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } /** * Implementation of asynchronously send a record to a topic.异步发送记录到主题的实现。 */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null;/*记录topic-分区数*/ try { // first make sure the metadata for the topic is available ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey;/*数据序列化,转为 byte[] */ try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } /*对要发送的数据计算其要存储的topic-partition*/ int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); /*序列化后的消息大小 */ int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); /*消息大小合格校验,单条默认最大不能超过1m,也不能超过缓冲区大小32M*/ ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback生产者回调将确保同时调用“回调”和拦截器回调 Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); //对累加器中的数据进行判断,是否达到batch发送的条件,符合发送要求,唤醒sender线程发送 /*也就是说处理消息和发送是两个分开的线程,主线程构造数据和更新回调函数FutureRecordMetadata。 *sender采用异步发送+ 获取FutureRecordMetadata回调函数确保数据发送的状态*/ if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); //触发发送消息的机制,唤醒sender线程进行运行,进入sender.run()中 this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
sender.run()详见 kafka生产者源码——Sender线程类
/*消息发送失败时重试的一些配置*/ private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) { boolean userConfiguredRetries = false;/*默认不进行重试*/ if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) { userConfiguredRetries = true; } if (idempotenceEnabled && !userConfiguredRetries) { // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make // this the default. log.info("Overriding the default retries config to the recommended value of {} since the idempotent " + "producer is enabled.", Integer.MAX_VALUE); return Integer.MAX_VALUE; } if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) { throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); } return config.getInt(ProducerConfig.RETRIES_CONFIG); } /*单个分区允许等待请求响应的个数,可见最大不能超过5,否则抛出异常*/ private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) { if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); } return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。