赞
踩
在kafka中把产生消息的一方称为生产者(Producer),尽管消息的产生非常简单,但是消息的发送过程比较复杂
发送消息从创建一个ProducerRecord对象开始,此类是kafka中的一个核心类,表示kafka需要发送的K-V键值对,记录了要发送的topic、partition、key、value、timestamp等
- public class ProducerRecord<K, V> {
- private final String topic;
- private final Integer partition;
- private final Headers headers;
- private final K key;
- private final V value;
- private final Long timestamp;
- }
在发送ProducerRecord的时候需要将对象序列化为字节数组,便于在网络上传输,之后消息达到分区器,若发送过程中指定了分区号,也就是partition,则在发送消息的时候将使用指定的分区,若发送过程中未制定分区,则根据topic和cluster中的partition数量顺序选择一个分区进行发送,分区选择器由接接口org.apache.kafka.clients.producer.Partitioner
的实现类指定。
org.apache.kafka.clients.producer.KafkaProducer
- private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
- Integer partition = record.partition();
- return partition != null ?
- partition :
- partitioner.partition(
- record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
- }
org.apache.kafka.clients.producer.internals.DefaultPartitioner
- // 选取分区
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- if (keyBytes == null) {
- // 顺序index
- int nextValue = nextValue(topic);
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- if (availablePartitions.size() > 0) {
- // 取模
- int part = Utils.toPositive(nextValue) % availablePartitions.size();
- return availablePartitions.get(part).partition();
- } else {
- // no partitions are available, give a non-available partition
- return Utils.toPositive(nextValue) % numPartitions;
- }
- } else {
- // hash the keyBytes to choose a partition
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
- }
ProducerRecord内关联的时间戳timestamp,如果用户未指定,则使用KafkaProducer内的time的时间作为时间戳,但是kafka最终使用的时间戳取决于topic配置的时间戳类型:
消息被放在一个记录批次里ProducerBatch
,这个批次的所有消息都会被发送到相同的topic和partition上,由一个FutureRecordMetadata负责发送。
broker收到消息后会返回一个响应,如果发送正常的话,会返回一个RecordAppendResult
对象,包含了topic、partition、offset、时间戳等信息,发送失败则会将失败的消息记录下来,然后后续重试发送。
org.apache.kafka.clients.producer.KafkaProducer
- private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- TopicPartition tp = null;
- try {
- throwIfProducerClosed();
- // first make sure the metadata for the topic is available
- ClusterAndWaitTime clusterAndWaitTime;
- try {
- clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
- } catch (KafkaException e) {
- if (metadata.isClosed())
- throw new KafkaException("Producer closed while send in progress", e);
- throw e;
- }
- long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
- Cluster cluster = clusterAndWaitTime.cluster;
- // 序列化key
- byte[] serializedKey;
- try {
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in key.serializer", cce);
- }
- // 序列化value
- byte[] serializedValue;
- try {
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in value.serializer", cce);
- }
- // 决定要发送的partition
- int partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
-
- // 设置header
- setReadOnly(record.headers());
- Header[] headers = record.headers().toArray();
-
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
- compressionType, serializedKey, serializedValue, headers);
- ensureValidRecordSize(serializedSize);
- // 设置消息时间戳
- long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
- log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
- // producer callback will make sure to call both 'callback' and interceptor callback
- Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
-
- // 事务
- if (transactionManager != null && transactionManager.isTransactional())
- transactionManager.maybeAddPartitionToTransaction(tp);
-
- // 发送消息,见下方代码
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs);
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
- return result.future;
- } catch (ApiException e) {
- log.debug("Exception occurred during message send:", e);
- if (callback != null)
- callback.onCompletion(null, e);
- // 记录错误信息
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- return new FutureFailure(e);
- } catch (InterruptedException e) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- throw new InterruptException(e);
- } catch (BufferExhaustedException e) {
- this.errors.record();
- this.metrics.sensor("buffer-exhausted-records").record();
- this.interceptors.onSendError(record, tp, e);
- throw e;
- } catch (KafkaException e) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- throw e;
- } catch (Exception e) {
- this.interceptors.onSendError(record, tp, e);
- throw e;
- }
- }
org.apache.kafka.clients.producer.internals.RecordAccumulator
- public RecordAppendResult append(TopicPartition tp,
- long timestamp,
- byte[] key,
- byte[] value,
- Header[] headers,
- Callback callback,
- long maxTimeToBlock) throws InterruptedException {
- appendsInProgress.incrementAndGet();
- ByteBuffer buffer = null;
- if (headers == null) headers = Record.EMPTY_HEADERS;
- try {
- Deque<ProducerBatch> dq = getOrCreateDeque(tp);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null)
- return appendResult;
- }
-
- byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
- // 申请一个缓冲区,将消息数据写入到缓冲区中
- buffer = free.allocate(size, maxTimeToBlock);
- synchronized (dq) {
- if (closed)
- throw new KafkaException("Producer closed while send in progress");
-
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
- if (appendResult != null) {
- return appendResult;
- }
-
- MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
- // 将消息分批处理
- ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
-
- dq.addLast(batch);
- incomplete.add(batch);
-
- // 清空缓冲区
- buffer = null;
- return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
- }
- } finally {
- if (buffer != null)
- free.deallocate(buffer);
- appendsInProgress.decrementAndGet();
- }
- }
简单发送
kafka最简单的消息发送是只指定topic和key及value,分区及时间戳均使用默认值,send()方法会返回一个Future<RecordMetadata>
对象,如果不需要关心返回值,则可以忽略这个返回值,否则必须关注此值,方法返回的异常信息可能有InterruptedException(发送线程中断异常)
,BufferExhaustedException(缓冲区已满)
,SerializationException(序列化异常)
- ProducerRecord<String,String> record =
- new ProducerRecord<>("cc_test","cc","chuanchuan");
- producer.send(record);
同步发送
第一种简单发送方式的前提是我们不在意发送的结果,但是我们在正常的情况下都会等待broker的反馈。我们从发送的源码中看到send()方法返回的Future<RecordMetadata>
对象,我们可以调用Future的get()方法阻塞主线程等待broker的响应,如果返回错误,则我们调用get()方法的时候会抛出异常,如果没发生异常,则顺利获取到RecordMetadata
对象,使用该对象查看消息的详细信息:topic、key和value的序列化后的大小、offset、partition。
生产者发送过程中一般会出现两类错误:一类可以通过重试解决,一类无法通过重试解决。比如连接错误、无Leader错误等都可以通过重试来实现,而消息过大这类错误KafkaProducer会直接抛出异常,不会重试,因为不管重试多少次都是消息过大。
- ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
- try{
- RecordMetadata rm = producer.send(record).get();
- System.out.println(rm.offset());
- } catch(Exception e) {
- log.error("occur error", e);
- }
异步发送
消息同步发送会造成同一时间只能有一条消息在发送中,在其有返回之前,其他的消息都需要一直等待,这样会造成消息堵塞滞后,无法让kafka发挥更大的效益,若一个消息发送需要20ms,发送五十条消息就需要1s,如果我们使用异步这种方式,那么发送五十条可能只需要30ms,甚至更少。异步发送的原理是在我们调用send()方法时传入一个接口org.apache.kafka.clients.producer.Callback
的实现类的对象,由ProducerBatch的私有方法completeFutureAndFireCallbacks
完成回调
- ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
- producer.send(record, );
-
- class CcProducerCallback implements Callback {
- public void onCompletion(RecordMetadata metadata,Exception exception){
- if(exception != null){
- exception.printStackTrace();
- }
- }
- }
org.apache.kafka.clients.producer.internals.ProducerBatch
- private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
- produceFuture.set(baseOffset, logAppendTime, exception);
-
- // execute callbacks
- for (Thunk thunk : thunks) {
- try {
- // 发生异常
- if (exception == null) {
- RecordMetadata metadata = thunk.future.value();
- if (thunk.callback != null)
- thunk.callback.onCompletion(metadata, null);
- } else {
- // 正常
- if (thunk.callback != null)
- thunk.callback.onCompletion(null, exception);
- }
- } catch (Exception e) {
- log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
- }
- }
-
- produceFuture.done();
- }
kafka对于数据的读写是以partition为粒度的,partition可以分布在不同的broker上,每个节点都可以独立的实现消息的读写,并且能够通过新增新的broker来提升kafka集群的吞吐量,partition部署在多个broker来实现负载均衡。
kafka的分区策略其实指的就是Producer将消息发送到哪个分区的算法,kafka提供了默认的分区策略,同时也支持我们自定义分区策略,所有的策略都实现于接口org.apache.kafka.clients.producer.Partitioner
- public interface Partitioner extends Configurable, Closeable {
-
- /**
- * 提供消息信息计算partition
- *
- * @param topic topic名称
- * @param key key名称
- * @param keyBytes key序列化字节数组
- * @param value value值
- * @param valueBytes value序列化字节数组
- * @param cluster 集群
- */
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
-
- /**
- * 关闭partitioner
- */
- public void close();
-
- }
消息发送到哪一个partition上涉及到分区选择机制,主要有顺序、随机、按key分配、自定义分配等方式,具体的实现方法就是public int partition()
。
顺序轮询
顺序分配就是消息均匀的发送给每一个partition,每个partition存储一次消息,kafka的默认策略。
随机策略
随机策略可以先计算出topic的总的partition数,然后使用ThreadLocalRandom.current().nextInt()
方法来获取一个小于分区总数的随机值,随机策略会导致消息分布不均匀。虽然是随机的,但是单个分区内也是有序的。
策略代码
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
key分配策略
这个策略也叫做 key-ordering策略,kafka中每条消息都会有自己的key,一旦消息被定义了 key,那么你就可以保证同一个key的所有消息都进入到相同的partition里面,因为每个partition下的消息处理都是有顺序的,所以这个策略也被称为按消息键保序策略
策略代码
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- // Math.abs()的原因是hashCode可能是负数
- return Math.abs(key.hashCode()) % partitions.size();
自定义分配策略
自由发挥吧,只要实现Partitioner接口就成了
application.properties
- # org.apache.kafka.clients.producer.ProducerConfig类中定义了各类参数配置信息
- spring.kafka.properties.partitioner.class=cc.kevinlu.springboot.kafka.partitioners.CcPartitioner
CcPartitioner
- package cc.kevinlu.springboot.kafka.partitioners;
-
- import java.util.Map;
-
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
-
- import lombok.extern.slf4j.Slf4j;
-
- @Slf4j
- public class CcPartitioner implements Partitioner {
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- if (log.isDebugEnabled()) {
- log.debug("{}------------{}", topic, cluster.availablePartitionsForTopic(topic).size());
- }
- // 永远都打到partition 0上
- return 0;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- }
- }
retry.backoff.ms
指定org.apache.kafka.common.serialization.Serializer
的实现类org.apache.kafka.common.serialization.Serializer
的实现类Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。