当前位置:   article > 正文

kafka enable.auto.commit和auto.offset.reset使用说明_kafka auto.offset.reset

kafka auto.offset.reset

enable.auto.commit

是否自动提交offset,默认是true。

auto.offset.reset

表示自动重置 offset。

auto.offset.reset 参数定义了当无法获取消费分区的位移时从何处开始消费。例如:当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时如何重置 Offset。

earliest:自动重置到 partition 的最小 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest:默认为 latest,表示自动重置到 partition 的最大 offset。
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常。
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

auto.offset.reset=none 使用说明

使用背景

不希望发生 offset 自动重置的情况,因为业务不允许发生大规模的重复消费。

注意:

此时消费组在第一次消费的时候就会找不到 offset 而报错,这时就需要在 catch 里手动设置 offset。

使用说明

auto.offset.reset 设置为 None 以后,可以避免 offset 自动重置的问题,但是当增加分区的时候,因为关闭了自动重置机制,客户端不知道新的分区要从哪里开始消费,则会产生异常,此时需要人工去设置消费分组 offset 并消费。

使用方式

消费者在消费时,当 consumer 设置 auto.offset.reset=none, 捕获到 NoOffsetForPartitionException 异常,在 catch 里自己设置 offset。您可以根据自身业务情况选择以下方式中的其中一种。

指定 offset,这里需要自己维护 offset,方便重试。

指定从头开始消费。

指定 offset 为最近可用的 offset。

根据时间戳获取 offset,设置 offset。

总结:

  1. package com.tencent.tcb.operation.ckafka.plain;
  2. import com.google.common.collect.Lists;
  3. import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
  4. import java.time.Instant;
  5. import java.time.temporal.ChronoUnit;
  6. import java.util.ArrayList;
  7. import java.util.Collection;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.Map.Entry;
  12. import java.util.Properties;
  13. import org.apache.kafka.clients.CommonClientConfigs;
  14. import org.apache.kafka.clients.consumer.ConsumerConfig;
  15. import org.apache.kafka.clients.consumer.ConsumerRecord;
  16. import org.apache.kafka.clients.consumer.ConsumerRecords;
  17. import org.apache.kafka.clients.consumer.KafkaConsumer;
  18. import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
  19. import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
  20. import org.apache.kafka.clients.producer.ProducerConfig;
  21. import org.apache.kafka.common.PartitionInfo;
  22. import org.apache.kafka.common.TopicPartition;
  23. import org.apache.kafka.common.config.SaslConfigs;
  24. public class KafkaPlainConsumerDemo {
  25. public static void main(String args[]) {
  26. //设置JAAS配置文件的路径。
  27. JavaKafkaConfigurer.configureSaslPlain();
  28. //加载kafka.properties。
  29. Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
  30. Properties props = new Properties();
  31. //设置接入点,请通过控制台获取对应Topic的接入点。
  32. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
  33. //接入协议。
  34. props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  35. //Plain方式。
  36. props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
  37. //两次Poll之间的最大允许间隔。
  38. //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
  39. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  40. //每次Poll的最大数量。
  41. //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
  42. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
  43. //消息的反序列化方式。
  44. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  45. "org.apache.kafka.common.serialization.StringDeserializer");
  46. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  47. "org.apache.kafka.common.serialization.StringDeserializer");
  48. //当前消费实例所属的消费组,请在控制台申请之后填写。
  49. //属于同一个组的消费实例,会负载消费消息。
  50. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
  51. //消费offset的位置。注意!如果auto.offset.reset=none这样设置,消费组在第一次消费的时候 就会报错找不到offset,第一次这时候就需要在catch里手动设置offset。
  52. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
  53. //构造消费对象,也即生成一个消费实例。
  54. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  55. //设置消费组订阅的Topic,可以订阅多个。
  56. //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
  57. List<String> subscribedTopics = new ArrayList<String>();
  58. //如果需要订阅多个Topic,则在这里添加进去即可。
  59. //每个Topic需要先在控制台进行创建。
  60. String topicStr = kafkaProperties.getProperty("topic");
  61. String[] topics = topicStr.split(",");
  62. for (String topic : topics) {
  63. subscribedTopics.add(topic.trim());
  64. }
  65. consumer.subscribe(subscribedTopics);
  66. //循环消费消息。
  67. while (true) {
  68. try {
  69. ConsumerRecords<String, String> records = consumer.poll(1000);
  70. //必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 建议开一个单独的线程池来消费消息,然后异步返回结果。
  71. for (ConsumerRecord<String, String> record : records) {
  72. System.out.println(
  73. String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
  74. }
  75. } catch (NoOffsetForPartitionException e) {
  76. System.out.println(e.getMessage());
  77. //当auto.offset.reset设置为 none时,需要捕获异常 自己设置offset。您可以根据自身业务情况选择以下方式中的其中一种。
  78. //e.g 1 :指定offset, 这里需要自己维护offset,方便重试。
  79. Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
  80. Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
  81. consumer.seek(new TopicPartition(topicStr, 0), 0);
  82. //e.g 2:从头开始消费
  83. consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));
  84. //e.g 3:指定offset为最近可用的offset。
  85. consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));
  86. //e.g 4: 根据时间戳获取offset,就是根据时间戳去设置offset。例如重置到10分钟前的offset
  87. Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
  88. Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
  89. timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
  90. Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
  91. .offsetsForTimes(timestampsToSearch);
  92. for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
  93. .entrySet()) {
  94. TopicPartition topicPartition = entry.getKey();
  95. OffsetAndTimestamp entryValue = entry.getValue();
  96. consumer.seek(topicPartition, entryValue.offset()); // 指定offset, 这里需要自己维护offset,方便重试。
  97. }
  98. }
  99. }
  100. }
  101. /**
  102. * 获取topic的最早、最近的offset
  103. * @param consumer
  104. * @param topicStr
  105. * @param beginOrEnd true begin; false end
  106. * @return
  107. */
  108. private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
  109. boolean beginOrEnd) {
  110. Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
  111. List<TopicPartition> tp = new ArrayList<>();
  112. Map<Integer, Long> map = new HashMap<>();
  113. partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
  114. Map<TopicPartition, Long> topicPartitionLongMap;
  115. if (beginOrEnd) {
  116. topicPartitionLongMap = consumer.beginningOffsets(tp);
  117. } else {
  118. topicPartitionLongMap = consumer.endOffsets(tp);
  119. }
  120. topicPartitionLongMap.forEach((key, beginOffset) -> {
  121. int partition = key.partition();
  122. map.put(partition, beginOffset);
  123. });
  124. return map;
  125. }
  126. }

springboot项目下

  1. /**
  2. * enable-auto-commit: false 由spring提交
  3. * enable-auto-commit: true 由kafka提交
  4. */
  5. /**
  6. * enable-auto-commit: true 相同组下 (换组 会重置数据)
  7. * 如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费.
  8. */
  9. /**
  10. * enable-auto-commit: fasle 相同组下 (换组 会重置数据)
  11. * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=latest,将没消费的设置为提交消费,然后从最后开始消费
  12. * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=earliest,从没开始消费的offset开始消费
  13. */

非springboot项目下

  1. enable.auto.commit false
  2. auto.offset.reset earliest 第一次消费, 重启后消费 都会从第一条开始重新消费全部数据
  3. enable.auto.commit true
  4. auto.offset.reset earliest 第一次消费全部数据,重启后从提交处开始消费
  5. enable.auto.commit false
  6. auto.offset.reset latest 第一次,重启后会从最后一条开始消费,但没有提交,换成earliest 重新消费全部数据
  7. enable.auto.commit true
  8. auto.offset.reset latest 第一次从最后一条开始消费,重启后从提交处开始消费

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/923541
推荐阅读
相关标签
  

闽ICP备14008679号