当前位置:   article > 正文

Kafka生产者_spring.kafka.properties.max.block.ms

spring.kafka.properties.max.block.ms

kafka中把产生消息的一方称为生产者(Producer),尽管消息的产生非常简单,但是消息的发送过程比较复杂

img

发送消息从创建一个ProducerRecord对象开始,此类是kafka中的一个核心类,表示kafka需要发送的K-V键值对,记录了要发送的topic、partition、key、value、timestamp

  1. public class ProducerRecord<K, V> {
  2. private final String topic;
  3. private final Integer partition;
  4. private final Headers headers;
  5. private final K key;
  6. private final V value;
  7. private final Long timestamp;
  8. }

在发送ProducerRecord的时候需要将对象序列化为字节数组,便于在网络上传输,之后消息达到分区器,若发送过程中指定了分区号,也就是partition,则在发送消息的时候将使用指定的分区,若发送过程中未制定分区,则根据topic和cluster中的partition数量顺序选择一个分区进行发送,分区选择器由接接口org.apache.kafka.clients.producer.Partitioner的实现类指定。

org.apache.kafka.clients.producer.KafkaProducer

  1. private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
  2. Integer partition = record.partition();
  3. return partition != null ?
  4. partition :
  5. partitioner.partition(
  6. record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
  7. }

org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1. // 选取分区
  2. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  3. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  4. int numPartitions = partitions.size();
  5. if (keyBytes == null) {
  6. // 顺序index
  7. int nextValue = nextValue(topic);
  8. List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  9. if (availablePartitions.size() > 0) {
  10. // 取模
  11. int part = Utils.toPositive(nextValue) % availablePartitions.size();
  12. return availablePartitions.get(part).partition();
  13. } else {
  14. // no partitions are available, give a non-available partition
  15. return Utils.toPositive(nextValue) % numPartitions;
  16. }
  17. } else {
  18. // hash the keyBytes to choose a partition
  19. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  20. }
  21. }

ProducerRecord内关联的时间戳timestamp,如果用户未指定,则使用KafkaProducer内的time的时间作为时间戳,但是kafka最终使用的时间戳取决于topic配置的时间戳类型:

  • topic为CreateTime,则消息记录中的时间戳由broker使用
  • topic为LogAppendTime,则消息记录中的时间戳会在追加到日志中时由broker重写

img

消息被放在一个记录批次里ProducerBatch,这个批次的所有消息都会被发送到相同的topic和partition上,由一个FutureRecordMetadata负责发送。

broker收到消息后会返回一个响应,如果发送正常的话,会返回一个RecordAppendResult对象,包含了topic、partition、offset、时间戳等信息,发送失败则会将失败的消息记录下来,然后后续重试发送。

org.apache.kafka.clients.producer.KafkaProducer

  1. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  2. TopicPartition tp = null;
  3. try {
  4. throwIfProducerClosed();
  5. // first make sure the metadata for the topic is available
  6. ClusterAndWaitTime clusterAndWaitTime;
  7. try {
  8. clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
  9. } catch (KafkaException e) {
  10. if (metadata.isClosed())
  11. throw new KafkaException("Producer closed while send in progress", e);
  12. throw e;
  13. }
  14. long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
  15. Cluster cluster = clusterAndWaitTime.cluster;
  16. // 序列化key
  17. byte[] serializedKey;
  18. try {
  19. serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  20. } catch (ClassCastException cce) {
  21. throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
  22. " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
  23. " specified in key.serializer", cce);
  24. }
  25. // 序列化value
  26. byte[] serializedValue;
  27. try {
  28. serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  29. } catch (ClassCastException cce) {
  30. throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
  31. " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
  32. " specified in value.serializer", cce);
  33. }
  34. // 决定要发送的partition
  35. int partition = partition(record, serializedKey, serializedValue, cluster);
  36. tp = new TopicPartition(record.topic(), partition);
  37. // 设置header
  38. setReadOnly(record.headers());
  39. Header[] headers = record.headers().toArray();
  40. int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
  41. compressionType, serializedKey, serializedValue, headers);
  42. ensureValidRecordSize(serializedSize);
  43. // 设置消息时间戳
  44. long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
  45. log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
  46. // producer callback will make sure to call both 'callback' and interceptor callback
  47. Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  48. // 事务
  49. if (transactionManager != null && transactionManager.isTransactional())
  50. transactionManager.maybeAddPartitionToTransaction(tp);
  51. // 发送消息,见下方代码
  52. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  53. serializedValue, headers, interceptCallback, remainingWaitMs);
  54. if (result.batchIsFull || result.newBatchCreated) {
  55. log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  56. this.sender.wakeup();
  57. }
  58. return result.future;
  59. } catch (ApiException e) {
  60. log.debug("Exception occurred during message send:", e);
  61. if (callback != null)
  62. callback.onCompletion(null, e);
  63. // 记录错误信息
  64. this.errors.record();
  65. this.interceptors.onSendError(record, tp, e);
  66. return new FutureFailure(e);
  67. } catch (InterruptedException e) {
  68. this.errors.record();
  69. this.interceptors.onSendError(record, tp, e);
  70. throw new InterruptException(e);
  71. } catch (BufferExhaustedException e) {
  72. this.errors.record();
  73. this.metrics.sensor("buffer-exhausted-records").record();
  74. this.interceptors.onSendError(record, tp, e);
  75. throw e;
  76. } catch (KafkaException e) {
  77. this.errors.record();
  78. this.interceptors.onSendError(record, tp, e);
  79. throw e;
  80. } catch (Exception e) {
  81. this.interceptors.onSendError(record, tp, e);
  82. throw e;
  83. }
  84. }

org.apache.kafka.clients.producer.internals.RecordAccumulator

  1. public RecordAppendResult append(TopicPartition tp,
  2. long timestamp,
  3. byte[] key,
  4. byte[] value,
  5. Header[] headers,
  6. Callback callback,
  7. long maxTimeToBlock) throws InterruptedException {
  8. appendsInProgress.incrementAndGet();
  9. ByteBuffer buffer = null;
  10. if (headers == null) headers = Record.EMPTY_HEADERS;
  11. try {
  12. Deque<ProducerBatch> dq = getOrCreateDeque(tp);
  13. synchronized (dq) {
  14. if (closed)
  15. throw new KafkaException("Producer closed while send in progress");
  16. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  17. if (appendResult != null)
  18. return appendResult;
  19. }
  20. byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  21. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
  22. log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  23. // 申请一个缓冲区,将消息数据写入到缓冲区中
  24. buffer = free.allocate(size, maxTimeToBlock);
  25. synchronized (dq) {
  26. if (closed)
  27. throw new KafkaException("Producer closed while send in progress");
  28. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  29. if (appendResult != null) {
  30. return appendResult;
  31. }
  32. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  33. // 将消息分批处理
  34. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
  35. FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
  36. dq.addLast(batch);
  37. incomplete.add(batch);
  38. // 清空缓冲区
  39. buffer = null;
  40. return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  41. }
  42. } finally {
  43. if (buffer != null)
  44. free.deallocate(buffer);
  45. appendsInProgress.decrementAndGet();
  46. }
  47. }

消息发送类型

  1. 简单发送

    kafka最简单的消息发送是只指定topic和key及value,分区及时间戳均使用默认值,send()方法会返回一个Future<RecordMetadata>对象,如果不需要关心返回值,则可以忽略这个返回值,否则必须关注此值,方法返回的异常信息可能有InterruptedException(发送线程中断异常)BufferExhaustedException(缓冲区已满)SerializationException(序列化异常)

    1. ProducerRecord<String,String> record =
    2. new ProducerRecord<>("cc_test","cc","chuanchuan");
    3. producer.send(record);
  2. 同步发送

    第一种简单发送方式的前提是我们不在意发送的结果,但是我们在正常的情况下都会等待broker的反馈。我们从发送的源码中看到send()方法返回的Future<RecordMetadata>对象,我们可以调用Future的get()方法阻塞主线程等待broker的响应,如果返回错误,则我们调用get()方法的时候会抛出异常,如果没发生异常,则顺利获取到RecordMetadata对象,使用该对象查看消息的详细信息:topic、key和value的序列化后的大小、offset、partition。

    生产者发送过程中一般会出现两类错误:一类可以通过重试解决,一类无法通过重试解决。比如连接错误、无Leader错误等都可以通过重试来实现,而消息过大这类错误KafkaProducer会直接抛出异常,不会重试,因为不管重试多少次都是消息过大。

    1. ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
    2. try{
    3. RecordMetadata rm = producer.send(record).get();
    4. System.out.println(rm.offset());
    5. } catch(Exception e) {
    6. log.error("occur error", e);
    7. }
  3. 异步发送

    消息同步发送会造成同一时间只能有一条消息在发送中,在其有返回之前,其他的消息都需要一直等待,这样会造成消息堵塞滞后,无法让kafka发挥更大的效益,若一个消息发送需要20ms,发送五十条消息就需要1s,如果我们使用异步这种方式,那么发送五十条可能只需要30ms,甚至更少。异步发送的原理是在我们调用send()方法时传入一个接口org.apache.kafka.clients.producer.Callback的实现类的对象,由ProducerBatch的私有方法completeFutureAndFireCallbacks完成回调

    1. ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
    2. producer.send(record, );
    3. class CcProducerCallback implements Callback {
    4. public void onCompletion(RecordMetadata metadata,Exception exception){
    5. if(exception != null){
    6. exception.printStackTrace();
    7. }
    8. }
    9. }

    org.apache.kafka.clients.producer.internals.ProducerBatch

    1. private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
    2. produceFuture.set(baseOffset, logAppendTime, exception);
    3. // execute callbacks
    4. for (Thunk thunk : thunks) {
    5. try {
    6. // 发生异常
    7. if (exception == null) {
    8. RecordMetadata metadata = thunk.future.value();
    9. if (thunk.callback != null)
    10. thunk.callback.onCompletion(metadata, null);
    11. } else {
    12. // 正常
    13. if (thunk.callback != null)
    14. thunk.callback.onCompletion(null, exception);
    15. }
    16. } catch (Exception e) {
    17. log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
    18. }
    19. }
    20. produceFuture.done();
    21. }

分区机制

kafka对于数据的读写是以partition为粒度的,partition可以分布在不同的broker上,每个节点都可以独立的实现消息的读写,并且能够通过新增新的broker来提升kafka集群的吞吐量,partition部署在多个broker来实现负载均衡。

kafka的分区策略其实指的就是Producer将消息发送到哪个分区的算法,kafka提供了默认的分区策略,同时也支持我们自定义分区策略,所有的策略都实现于接口org.apache.kafka.clients.producer.Partitioner

  1. public interface Partitioner extends Configurable, Closeable {
  2. /**
  3. * 提供消息信息计算partition
  4. *
  5. * @param topic topic名称
  6. * @param key key名称
  7. * @param keyBytes key序列化字节数组
  8. * @param value value值
  9. * @param valueBytes value序列化字节数组
  10. * @param cluster 集群
  11. */
  12. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  13. /**
  14. * 关闭partitioner
  15. */
  16. public void close();
  17. }

消息发送到哪一个partition上涉及到分区选择机制,主要有顺序、随机、按key分配、自定义分配等方式,具体的实现方法就是public int partition()

  1. 顺序轮询

    顺序分配就是消息均匀的发送给每一个partition,每个partition存储一次消息,kafka的默认策略。

    image-20200417033430230
  2. 随机策略

    随机策略可以先计算出topic的总的partition数,然后使用ThreadLocalRandom.current().nextInt()方法来获取一个小于分区总数的随机值,随机策略会导致消息分布不均匀。虽然是随机的,但是单个分区内也是有序的。

    image-20200417035130292

    策略代码

    1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    2. return ThreadLocalRandom.current().nextInt(partitions.size());
  3. key分配策略

    这个策略也叫做 key-ordering策略,kafka中每条消息都会有自己的key,一旦消息被定义了 key,那么你就可以保证同一个key的所有消息都进入到相同的partition里面,因为每个partition下的消息处理都是有顺序的,所以这个策略也被称为按消息键保序策略

    image-20200417035625713

    策略代码

    1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    2. // Math.abs()的原因是hashCode可能是负数
    3. return Math.abs(key.hashCode()) % partitions.size();
  4. 自定义分配策略

    自由发挥吧,只要实现Partitioner接口就成了

    application.properties

    1. # org.apache.kafka.clients.producer.ProducerConfig类中定义了各类参数配置信息
    2. spring.kafka.properties.partitioner.class=cc.kevinlu.springboot.kafka.partitioners.CcPartitioner

    CcPartitioner

    1. package cc.kevinlu.springboot.kafka.partitioners;
    2. import java.util.Map;
    3. import org.apache.kafka.clients.producer.Partitioner;
    4. import org.apache.kafka.common.Cluster;
    5. import lombok.extern.slf4j.Slf4j;
    6. @Slf4j
    7. public class CcPartitioner implements Partitioner {
    8. @Override
    9. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    10. if (log.isDebugEnabled()) {
    11. log.debug("{}------------{}", topic, cluster.availablePartitionsForTopic(topic).size());
    12. }
    13. // 永远都打到partition 0上
    14. return 0;
    15. }
    16. @Override
    17. public void close() {
    18. }
    19. @Override
    20. public void configure(Map<String, ?> configs) {
    21. }
    22. }

Producer Property

  1. retries:消息重试次数,若消息发送过程中出现错误,但是可通过重新发送来弥补错误,比如Leader缺失,则生产者会不断的重发消息,直到重发次数达到此参数指定的值后放弃重试并返回错误,默认情况下每次重试间隔100ms,通过参数retry.backoff.ms指定
  2. acks:指定要有多少个partition副本接收消息,生产者才认为消息是成功写入,acks能够控制消息丢失概率。
    • acks=0:表示生产者只管发不管服务器是否接收了,非常容易丢消息
    • acks=1:只要集群的Leader收到了消息就立刻反馈给生产者,消息可能会丢失
    • acks=all:只有当所有的参与复制的节点都接收到消息时,broker才会反馈给生产者,能够保证消息绝不丢失,但是延迟更高
  3. key.serializer:key的序列化类,需是接口org.apache.kafka.common.serialization.Serializer的实现类
  4. value.serializer:value的序列化类,需是接口org.apache.kafka.common.serialization.Serializer的实现类
  5. compression.type:消息压缩类型,默认为none, 可选值有none、gzip、snappy、lz4、zstd
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/618431
推荐阅读
相关标签
  

闽ICP备14008679号