赞
踩
在Kafka中,Producer实例是线程安全的,通常一个Producer的进程只需要生成一个Producer实例.
这样比一个进程中生成多个Producer实例的效率反而会更高.
在Producer的配置中,可以配置Producer的每个batch的内存缓冲区的大小默认16kb,或者多少ms提交一次,
这种设计参考了Tcp的Nagle算法,让网络传输尽可能的发送大的数据块.
Kafka 3.0开始,是否启用冥等性的enable.idempotence
配置默认为true.
此配置只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法保证多个分区的幂等性.
//构建生成`KafkaProducer`的配置项.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("linger.ms", 200);props.put("batch.size", 16384);
//serializer建议使用byteArray/byteBuffer.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生成实例并向kafka发送消息.
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
Integer.toString(i)));
//所有操作结束,关闭producer.
producer.close();
enable.idempotence
= true.transactional.id
. 此配置设置一个transactionId,当然最好能代表业务场景.min.insync.replicas
配置的值必须大于1.//构建生成事务型`KafkaProducer`的配置项. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); //serializer建议使用byteArray/byteBuffer. Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); //此时KafkaProducer的api并没有变化,只是通过producer直接开始事务即可. producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();
"bootstrap.servers" : brokerServer链接信息的配置,host:port,多个用","号分开. "buffer.memory" : 默认值32mb,Producer端的内存缓冲区的大小. "max.block.ms" : 默认值1分钟,当内存缓冲区被填满(生产速度大于了网络传输速度),producer的backOff时间. "batch.size" : 默认值(16kb),内存缓冲区内每一个batch的大小,当producer写入达到一个batch后,此batch将会被提交. "linger.ms" : 默认值(0),与"batch.size"配合使用,当batch未达到大小,batch的最大内存缓冲时间. 这个配置在根据node节点范围内有效,即对应node的partition中只要有一个超时,就会处理所有partition. "request.timeout.ms" 默认(30秒),producer等待请求响应的超时时间,应该大于broker中的`replica.lag.time.max.ms`配置时间. "delivery.timeout.ms" 默认(2分钟),send数据后(添加到内存缓冲区的时间),等待ack的超时时间, 这个值应该大于requestTimeout+lingerMs的和. "retry.backoff.ms" 默认值(100ms),请求失败后的重试间隔时间. "max.request.size" 默认值(1mb),单次网络请求的send数据的上限(建议是batchSize的倍数). "enable.idempotence" 默认值(true),是否启用冥等性. "transactional.id" 没有默认值,配置一个字符串值,用于记录此producer对应的事务ID. 跨多个producer的冥等性保证,但是broker节点最少需要三个. "transaction.timeout.ms" 默认值(1分钟),用于配置transaction的超时时间. "acks" 默认值(all/-1),可配置(all,0,1),producer响应ack的状态 0=>表示不管broker是否写入成功. 1=>表示只需要leader写入成功(这可能在副本切换时导致数据丢失) all/-1 => 需要所有副本都写入成功,冥等性必须设置为此值. "max.in.flight.requests.per.connection" 默认值(5),单个node可同时进行的请求的数量, 如果启用"enable.idempotence"时,这个值必须小于或等于5. "metadata.max.age.ms" 默认值(5分钟),定时刷新metadata的时间周期. "metadata.max.idle.ms" 默认值(5分钟),metadata的空闲时间,当超过这个时间metadata会被丢弃下次请求时重新获取. "partitioner.class" 用于对record进行partition的区分, Partitioner接口的实现. "partitioner.ignore.keys" 默认值(false),当设置为false,同时key不为null的情况下,使用hash分区, 如果指定了partitioner.class,这个配置无效. "partitioner.adaptive.partitioning.enable" 默认值(true),是否让处理更快的partition分区更多的处理消息. "partitioner.availability.timeout.ms" 默认值0,与上面的配置配合使用 如果Partition无法在指定的超时时间处理producer的消息,则认为parition不可用. "compression.type" record压缩算法,可配置zstd,lz4,snappy, gzip
Step=>1
根据是否配置enable.idempotence
,默认值true,如果配置为true时,初始化TransactionManager
实例.
this.transactionManager = configureTransactionState(config, logContext); //初始化TransactionManager实例. private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) { TransactionManager transactionManager = null; //只有`enable.idempotence`配置为`true`时,TransactionManager实例才会被初始化. if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); transactionManager = new TransactionManager( logContext,transactionalId, transactionTimeoutMs,retryBackoffMs,apiVersions ); //根据是否配置`transactional.id`来判断是否开启事务. if (transactionManager.isTransactional()) log.info("Instantiated a transactional producer."); else log.info("Instantiated an idempotent producer."); } else { // ignore unretrieved configurations related to producer transaction config.ignore(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); } return transactionManager; }
Step=>2
生成用于producer使用的内存缓冲区RecordAccumulator
,
所有对Producer的send操作都将向此accumulator的内存缓冲区内添加,由专门的Sender
线程来负责发送并释放内存.
其内部的BufferPool
即是accumulator使用的内存池,每一个batch都需要向此内存池申请内存.
在kafka中所有的消息写入都是以batch
为基础,标准batch
的大小由batch.size
配置,默认为16kb.
boolean enableAdaptivePartitioning = partitioner == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG); RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig( enableAdaptivePartitioning, config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG) ); this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs,partitionerConfig, metrics,PRODUCER_METRIC_GROUP_NAME,time, apiVersions,transactionManager, //环形内存缓冲区,其内部分为池化内存与非池化内存. new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME ) );
Step=>3
根据BOOTSTRAP_SERVERS_CONFIG
配置,初始化ProducerMetadata
实例,此实例用于维护metadata在producer端的cache信息.
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); if (metadata != null) { this.metadata = metadata; } else { this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses); }
Step=>4
生成producer向broker端发起请求的NetworkClient
实例,并根据实例初始化并启动Sender
线程.
此线程用于将RecordAccumulator
中已经完成的batch发送到对应partition的leaderBroker端.
注意:此线程是一个守护线程(daemon
).
this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); //newSender生成网络处理线程的实现. Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); //生成用于向`broker`发起网络请求的NetworkClient实例. KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), metadata, clientId, maxInflightRequests, producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), time, true, apiVersions, throttleTimeSensor, logContext); //生成用于发送网络请求的线程. short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG)); return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),//单次请求的最大bytes. acks, producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), //此配置通常保持默认值. metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); }
当Sender
线程在不断轮询过程中,在调用执行到NetworkClient.poll
函数或sendProducerData
时,
会执行metadataUpdater
(实现类DefaultMetadataUpdater
)中的maybeUpdate
函数,
此函数判断当前Producer中cache的metadata
是否过期,过期时间由metadata.max.age.ms
配置(默认5分钟).
(注意:如果是Producer初始化后的第一次轮询时,也表示超时.)
如果metadataCache过期后会发起MetadataRequest
请求,来获取producer需要的metadata信息(topicInfo,brokers).
//这里会向随机的一个由"bootstrap.servers"配置的broker节点(或metadataCache中的节点)发起请求(如果超时).
//==>`metadataupdater`的实现在Producer端默认为`NetworkClient.DefaultMetadataUpdater`.
long metadataTimeout = metadataUpdater.maybeUpdate(now);
..........
void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
//生成向broker请求的metadataRequest信息
protected MetadataRequest.Builder newMetadataRequestBuilder() {
return MetadataRequest.Builder.allTopics();
}
在Producer端,发起MetadataRequest
请求时,会设置topics参数的值为null,表示获取集群中所有的topic信息.
如果ProducerMetadata
实例中的newTopics
容器不为空时,会只请求此部分的topics的metadata信息.
此请求在Broker端接收到后,会直接由KafkaApis中的handleTopicMetadataRequest
进行处理.
而此请求的返回信息包含当前cluster中所有的topics信息与当前处于active状态的所有borker节点.
Producer发起的Metadata
请求,在broker端成功响应后,
会交由NetworkClient中DefaultMetadataUpdater实例的handleSuccessfulResponse
处理程序处理.
而在handleSuccessfulResponse
处理程序中,其核心处理代码如下所示:
this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);
可以看到,当DefaultMetadataUpdater接收到broker的响应后,直接交给了ProducerMetadata
实例进行处理.
而在Metadata.update
的处理程序中,主要根据请求的响应重新生成MetadataCache
实例,如下所示:
*=>1*, 更新metadata的刷新时间,此时间用于判断metadata是否过期.
//更新metadata的刷新时间.
this.lastRefreshMs = nowMs;
this.updateVersion += 1;
//判断是否是部分更新(newTopics容器不为空时,表示部分更新)
if (!isPartialUpdate) {
this.needFullUpdate = false;
this.lastSuccessfulRefreshMs = nowMs;
}
*=>2*, 根据Metadata
请求的响应结果(clusterId,activeBrokers,topics,partitions)生成MetadataCache
.
//根据请求的响应,生成MetadataCache.
this.cache = handleMetadataResponse(response, isPartialUpdate, nowMs);
//handleMetadataResponse中生成MetadataCache实例.
Map<Integer, Node> nodes = metadataResponse.brokersById();
//`isPartialUpdate == true`表示是增量更新(即partition的Leader发生切换后的增量metadata更新)
//==>或者producer要写入的record对应的topic在当前cache中不存在(新创建)
if (isPartialUpdate)
return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds,
(topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs));
//全是更新,直接根据response生成metadataCache.
else
return new MetadataCache(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds);
所谓的非事务场景即是Producer端配置有冥等性enable.idempotence == true
,但事务idtransactional.id
未配置的情况下,
此时Producer端会通过TransactionManager
组件来初始化获取当前Producer的ProducerId.
即:当Sender
线程启动后,runOne函数轮询过程时,
会在执行如下代码片段时判断ProducerId是否初始化,如果未初始化,发起InitProducerId
请求:
if (transactionManager != null) { try { ..................... //非事务场景下,获取冥等性支持的ProducerId的值(如果还未获取). //==>或broker响应`UNKNOWN_PRODUCER_ID`或`OUT_OF_ORDER_SEQUENCE_NUMBER`错误代码时. //==>此时会把`InitProducerId`请求生成 transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); //处理TransactionManager组件相关的请求,如`InitProducerId`等,向broker端发起请求. if (maybeSendAndPollTransactionalRequest()) { return; } } catch (AuthenticationException e) { log.trace("Authentication exception while processing transactional request", e); transactionManager.authenticationFailed(e); } }
当ProducerId未初始化时,在执行bumpIdempotentEpochAndResetIdIfNeeded
函数时会生成InitProducerId
请求.
如下部分是函数实现的部分代码片段:
if (currentState != State.INITIALIZING && !hasProducerId()) {
transitionTo(State.INITIALIZING);
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(Integer.MAX_VALUE);
InitProducerIdHandler handler = new InitProducerIdHandler(
new InitProducerIdRequest.Builder(requestData), false)
;
enqueueRequest(handler);
}
可以看到,针对InitProducerIdRequest
请求的处理程序实现为InitProducerIdHandler
.
在非事务的场景下,InitProducerIdHandler
的coordinatorType
为null
.因此:
在maybeSendAndPollTransactionalRequest
函数发送请求时,会从metadataCache
中随机获取一个broker节点.
并通过此broker节点发起InitProducerId
请求,如下代码片段:
//非事务场景下,直接调用NetworkClient中的leastLoadedNode来随机获取一个broker发起请求.
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
client.leastLoadedNode(time.milliseconds());
............
long currentTimeMs = time.milliseconds();
ClientRequest clientRequest = client.newClientRequest(
targetNode.idString(), requestBuilder, currentTimeMs,
true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {} with correlation ID {}"..);
client.send(clientRequest, currentTimeMs);
transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
此请求在broker端将由TransactionCoordinator
中的handleInitProducerId
处理程序进行处理.
当broker端接收到InitProducerId
请求后,会交由TransactionCoordinator
组件来进行处理.
此组件在每个broker进程中都包含一个,在没有事务的场景下,此组件用于管理producer对应的produdcerId信息(随机到任意的broker节点),
而在有事务的场景下,每个broker实例中的transactionCoordinator负责一组分配到此broker的事务id.
TransactionCoordinator.handleInitProducerId(非事务场景)
//非事务场景下,Producer获取ProducerId的处理.
if (transactionalId == null) {
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
}
从上面的代码片段中可以看到,在TransactionCoordinator
组件中,在处理非事务场景下的producerId的申请时,
只是简单的直接通过调用ProducerIdManager
中的generateProducerId
函数来生成producerId
,并响应给Producer端.
而在ProducerIdManager
组件中,会首先向activeController
发起一个AllocateProducerIds
请求.
在controller端会由ProducerIdControlManager
组件进行处理.
此AllocateProducerIds
请求会给broker生成一个连续的producerId的数据块.
当ProducerIdManager
中generateProducerId
分配producerId
达到连续数据块的90%时,会重新向controller请求一个新的数据块.
ProducerIdManager.generateProducerId函数
verride def generateProducerId(): Long = {
this synchronized {
//broker启动后首次生成p
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。