在前面已经完成win环境下zk(3.4.12版本)的运行,并对kafka源码编译, 参考:本地kafka源码的编译和调试,在idea的run-->debug-->中新增configuration来创建topic:yzg(3分区1备份),本地启动运行效果:
KafkaProducer在 org.apache.kafka.clients.producer的包下(所有关于生产者源码都在这包),在使用生产者类时要实例化KafkaProducer,其中定义了发送机制,KafkaProducer是Producer的子类,生产者实例(producer)通过实例化KafkaProducer类,并调用它的send()方法完成数据发送,梳理如下:
① 首先过一个拦截器;
② 调用KafkaProducer.send().doSend()方法,doSend首先把key和value按照指定的序列化器进行序列化;
③ partition()函数得到数据和序列化后的数据后,对数据进行分区;
④ 调用RecordAccumulator.append()方法,将处理后的数据扔进RecordAccumulator(缓存对象)的RecordAppendResult类属性中;
⑤ RecordAccumulator.append()方法首先将数据进行队列化放在Deque对象中,Deque包含多个ProducerBatch;
⑥ 上面流程完成后,调用this.sender.wakeup()唤醒sender线程,该线程就干一件事就是发数据,
- private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- try {
- Map<String, Object> userProvidedConfigs = config.originals();
- this.producerConfig = config;
- this.time = Time.SYSTEM;
- String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
- if (clientId.length() <= 0)
- clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
- this.clientId = clientId;
- String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
- (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
- LogContext logContext;
- if (transactionalId == null)
- logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
- else
- logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
- log = logContext.logger(KafkaProducer.class);
- log.trace("Starting the Kafka producer");
- Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
- MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
- .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
- .tags(metricTags);
- List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- reporters.add(new JmxReporter(JMX_PREFIX));
- this.metrics = new Metrics(metricConfig, reporters, time);
- ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
- this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
- long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
- if (keySerializer == null) {
- this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- Serializer.class));
- this.keySerializer.configure(config.originals(), true);
- } else {
- config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- this.keySerializer = ensureExtended(keySerializer);
- }
- if (valueSerializer == null) {
- this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- Serializer.class));
- this.valueSerializer.configure(config.originals(), false);
- } else {
- config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- this.valueSerializer = ensureExtended(valueSerializer);
- }
- // load interceptors and make sure they get clientId
- userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
- ProducerInterceptor.class);
- this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
- ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
- this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
- true, true, clusterResourceListeners);
- this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
- this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
- this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
- this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
- this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- this.transactionManager = configureTransactionState(config, logContext, log);
- int retries = configureRetries(config, transactionManager != null, log);
- int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
- short acks = configureAcks(config, transactionManager != null, log);
- this.apiVersions = new ApiVersions();
- this.accumulator = new RecordAccumulator(logContext,
- config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
- this.totalMemorySize,
- this.compressionType,
- config.getLong(ProducerConfig.LINGER_MS_CONFIG),
- retryBackoffMs,
- metrics,
- time,
- apiVersions,
- transactionManager);
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
- ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
- Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
- NetworkClient client = new NetworkClient(
- new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- this.metrics, time, "producer", channelBuilder, logContext),
- this.metadata,
- clientId,
- maxInflightRequests,
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
- config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
- config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- this.requestTimeoutMs,
- time,
- true,
- apiVersions,
- throttleTimeSensor,
- logContext);
- this.sender = new Sender(logContext,
- client,
- this.metadata,
- this.accumulator,
- maxInflightRequests == 1,
- config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
- acks,
- retries,
- metricsRegistry.senderMetrics,
- Time.SYSTEM,
- this.requestTimeoutMs,
- config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
- this.transactionManager,
- apiVersions);
- String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
- this.ioThread.start();
- this.errors = this.metrics.sensor("errors");
- config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
- log.debug("Kafka producer started");
- } catch (Throwable t) {
- // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
- close(0, TimeUnit.MILLISECONDS, true);
- // now propagate the exception
- throw new KafkaException("Failed to construct kafka producer", t);
- }
- }
① 生产者producer在拿到props后实例化KafkaProducer,然后多线程调用send(),KafkaProducer如果没有定义拦截器interceptors(ProducerInterceptors类的实例)数据record保持不变,若定义了interceptors就调用拦截器的ProducerInterceptors.onSend()方法过滤数据record,这个拦截器就是用来自定义的,源码里面没有过滤方法;
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- // intercept the record, which can be potentially modified; this method does not throw exceptions
- ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
- return doSend(interceptedRecord, callback);
- }
② 接下来调用doSend方法,方法体内首先调用partition()方法,入参是原始的record数据,以及key和value序列化结果;
- // 确认topic和集群信息正确
- ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
- Cluster cluster = clusterAndWaitTime.cluster;
- // 分区器
- int partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- //如果没有指定分区,就使用内置的分区器partitioner.partition()
- private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
- Integer partition = record.partition();
- return partition != null ?
- partition :
- partitioner.partition(
- record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
- }
③ 接下来调用KafkaProducer类持有的RecordAccumulator对象的RecordAccumulator.append()方法,返回RecordAppendResult对象;
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs);
- // append方法的实现,返回RecordAppendResult 对象
- public RecordAppendResult append(TopicPartition tp,
- long timestamp,
- byte[] key,
- byte[] value,
- Header[] headers,
- Callback callback,
- long maxTimeToBlock) throws InterruptedException {
- appendsInProgress.incrementAndGet();
- ByteBuffer buffer = null;
- if (headers == null) headers = Record.EMPTY_HEADERS;
- try {
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null)
- return appendResult;
- }
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
- buffer = free.allocate(size, maxTimeToBlock);
- synchronized (dq) {
- if (closed)
- throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null) {
- return appendResult;
- }
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
- dq.addLast(batch);
- incomplete.add(batch);
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
- }
- } finally {
- if (buffer != null)
- free.deallocate(buffer);
- appendsInProgress.decrementAndGet();
- }
- }
④ 继续上面的代码,生产者本地维护一个未发送数据的缓存池,也是一个后台IO线程用来将records转换为网络请求,这就是RecordAccumulator,RecordAccumulator持有RecordAppendResult对象,其future作为整个producer.send()方法的返回值;
- public final static class RecordAppendResult {
- public final FutureRecordMetadata future;
- public final boolean batchIsFull;
- public final boolean newBatchCreated;
- public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
- this.future = future;
- this.batchIsFull = batchIsFull;
- this.newBatchCreated = newBatchCreated;
- }
- }
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
至此 KafkaProducer.send()方法的逻辑结束,也就是原始数据经过逻辑转换后放在本地的Deque队列中;
- // 网络请求的构造器
- NetworkClient client = new NetworkClient(
- new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- this.metrics, time, "producer", channelBuilder, logContext),
- this.metadata,
- clientId,
- maxInflightRequests,
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
- config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
- config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- this.requestTimeoutMs,
- time,
- true,
- apiVersions,
- throttleTimeSensor,
- logContext);
- // run方法中对于网络请求的逻辑
- void run(long now) {
- if (transactionManager != null) {
- try {
- if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
- // Check if the previous run expired batches which requires a reset of the producer state.
- transactionManager.resetProducerId();
- if (!transactionManager.isTransactional()) {
- // this is an idempotent producer, so make sure we have a producer id
- maybeWaitForProducerId();
- } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
- transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
- "some previously sent messages and can no longer retry them. It isn't safe to continue."));
- } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
- // as long as there are outstanding transactional requests, we simply wait for them to return
- client.poll(retryBackoffMs, now);
- return;
- }
- // do not continue sending if the transaction manager is in a failed state or if there
- // is no producer id (for the idempotent case).
- if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
- RuntimeException lastError = transactionManager.lastError();
- if (lastError != null)
- maybeAbortBatches(lastError);
- client.poll(retryBackoffMs, now);
- return;
- } else if (transactionManager.hasAbortableError()) {
- accumulator.abortUndrainedBatches(transactionManager.lastError());
- }
- } catch (AuthenticationException e) {
- // This is already logged as error, but propagated here to perform any clean ups.
- log.trace("Authentication exception while processing transactional
源码编译运行后相当于本地搭建了kafka集群,在源码examples包下 producer类来了解数据发送流程,首先定义kafka提供的KafkaProducer类,再调用它的send()方法发送数据;很多工作是在KafkaProducer类实例化的时候已经做了;
- package demo;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- public class DemoForProducer extends Thread{
- public static void main(String[] args) {
- //System.out.println("hello");
- DemoForProducer dfp=new DemoForProducer("yezonggang",true);
- dfp.run();
- }
- private final KafkaProducer<Integer, String> producer;
- private final String topic;
- private final Boolean isAsync;
- public DemoForProducer(String topic, Boolean isAsync) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "");
- props.put("client.id", "DemoForProducer");
- props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer<>(props);
- this.topic = topic;
- this.isAsync = isAsync;
- }
- public DemoForProducer(KafkaProducer<Integer, String> producer, String topic, Boolean isAsync) {
- this.producer = producer;
- this.topic = topic;
- this.isAsync = isAsync;
- }
- // 线程类的执行方法(while死循环),判断是异步还是同步发送配置(高版本默认都异步),调用send方法发送数据,send方法的第1个参数是ProducerRecord,第2个是messageNo记录发送批次,DemoCallBack是回执函数
- public void run() {
- int messageNo = 1;
- while (true) {
- String messageStr = "Message_" + messageNo;
- long startTime = System.currentTimeMillis();
- if (isAsync) { // Send asynchronously
- producer.send(new ProducerRecord<>(topic,
- messageNo,
- messageStr), new DemoForCallBack(startTime, messageNo, messageStr));
- } else { // Send synchronously
- try {
- producer.send(new ProducerRecord<>(topic,
- messageNo,
- messageStr)).get();
- System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- ++messageNo;
- }
- }
- }
在 KafkaProducer类实例化后,idea中运行的本地kafka集群就已经拿到了producerconfig设置,如下,client.id=DemoForProducer已经说明该生产者已经被kafka集群捕获,即使当前send()方法还未启动;
ProducerRecord(topic=yezonggang, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=Message_1, timestamp=null)
- INFO ProducerConfig values:
- acks = 1
- batch.size = 16384
- bootstrap.servers = []
- buffer.memory = 33554432
- client.id = DemoForProducer
- compression.type = none
- connections.max.idle.ms = 540000
- enable.idempotence = false
- interceptor.classes = null
- key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
- linger.ms = 0
- max.block.ms = 60000
- max.in.flight.requests.per.connection = 5
- max.request.size = 1048576
- metadata.max.age.ms = 300000
- metric.reporters = []
- metrics.num.samples = 2
- metrics.recording.level = INFO
- metrics.sample.window.ms = 30000
- partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
- receive.buffer.bytes = 32768
- reconnect.backoff.max.ms = 1000
- reconnect.backoff.ms = 50
- request.timeout.ms = 30000
- retries = 0
- retry.backoff.ms = 100
- sasl.jaas.config = null
- sasl.kerberos.kinit.cmd = /usr/bin/kinit
- sasl.kerberos.min.time.before.relogin = 60000
- sasl.kerberos.service.name = null
- sasl.kerberos.ticket.renew.jitter = 0.05
- sasl.kerberos.ticket.renew.window.factor = 0.8
- sasl.mechanism = GSSAPI
- security.protocol = PLAINTEXT
- send.buffer.bytes = 131072
- ssl.cipher.suites = null
- ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
- ssl.endpoint.identification.algorithm = null
- ssl.key.password = null
- ssl.keymanager.algorithm = SunX509
- ssl.keystore.location = null
- ssl.keystore.password = null
- ssl.keystore.type = JKS
- ssl.protocol = TLS
- ssl.provider = null
- ssl.secure.random.implementation = null
- ssl.trustmanager.algorithm = PKIX
- ssl.truststore.location = null
- ssl.truststore.password = null
- ssl.truststore.type = JKS
- transaction.timeout.ms = 60000
- transactional.id = null
- value.serializer = class org.apache.kafka.common.serialization.StringSerializer(org.apache.kafka.clients.producer.ProducerConfig)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。