当前位置:   article > 正文

Kafka (三) acks 与 手动提交 offset_kafka offset ack

kafka offset ack

新手解除 kafka,很容易把 acks 和提交 ack搞混了。

acks (0, 1, 'all')

代表kafka收到消息的答复数,0就是不要答复,爱收到没收到.1就是有一个leader broker答复就行,all是所有broker都要收到才行

  • 0: Producer不等待kafka服务器的答复,消息立刻发往socket buffer,这种方式不能保证kafka收到消息,设置成这个值的时候retries参数就失效了,因为producer不知道kafka收没收到消息,所以所谓的重试就没有意义了,发送返回值的offset全默认是-1.

  • 1: 等待leader记录数据到broker本地log即可.不等待leader同步到其他followers,那么假如此时刚好leader收到消息并答复后,leader突然挂了,其他fowller还没来得及复制消息呢,那么这条消息就会丢失了.

  • all:等待所有broker记录消息.保证消息不会丢失(只要从节点没全挂),这种方式是最高可用的 acks默认值是1.

而手动提交 offset,就是手动提交偏移量,这个和其他的消息队列是完全不同的。

代码如下,开启手动提交必须关闭自动提交,并且设置监听提交方式:

  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. producer:
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. #重试次数.
  8. retries: 10
  9. # 只需要 leader 许可
  10. acks: 1
  11. # 开启手动 ack
  12. listener:
  13. ack-mode: manual
  14. consumer:
  15. group-id: pay
  16. auto-offset-reset: latest
  17. enable-auto-commit: false
  18. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

其中 ack-mode,我要稍微补充一句,这个是提交方式,解释如下

  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 
    每次间隔ackTime的时间去commit
  • COUNT 
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

此外,消费端代码如下:

  1. /**
  2. * 这是手动提交的消费方式
  3. * @param record
  4. * @param ack
  5. * @throws Exception
  6. */
  7. @KafkaListener(topics = TopicConstants.COMMON_PAY,groupId = "写自己的消费组 id")
  8. public void listenXXXPay(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception {
  9. String msg = JSONObject.parseObject(record.value(), String.class);
  10. System.out.println(msg);
  11. if (new Random().nextInt(100)<50){
  12. logger.info(String.format("kafka 综合收费消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()));
  13. ack.acknowledge();
  14. }
  15. }

接下来问题来了, 如果代码中没有进行ack.acknowledge();会怎么办呢??

对于kafka消费后不提交offset情况的分析总结


  最近在使用kafka的过程中遇到了一个疑问,在查阅了一些资料和相关blog之后,做一下总结和记录。
  Question:消费者在消费消息的过程中,配置参数设置为不自动提交offset,在消费完数据之后如果不手动提交offset,那么在程序中和kafak中的数据会如何被处理呢?


  首先简单的介绍一下消费者对topic的订阅。客户端的消费者订阅了topic后,如果是单个消费者,那么消费者会顺序消费这些topic分区中的数据,如果是创建了消费组有多个消费者,那么kafak的服务端将这些topic平均分配给每个消费者。比如有2个topic,每个topic有2个分区,总共有4个分区,如果一个消费组开了四个消费者线程,那么每个消费者将被分配一个分区进行消费。一般建议是一个消费组里的消费者的个数与订阅的topic的总分区数相等,这样可以达到最高的吞吐量。如果消费者的个数大于订阅的topic的总分区,那么多出的消费者将分配不到topic的分区,等于是白白创建了一个消费者线程,浪费资源。

  我们进入正题,对开头提出的问题的总结如些:
  注意:一下情况均基于kafka的消费者关闭自动提交offset的条件下。

  1.如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。

  2.如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。

  3.消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/914574
推荐阅读
相关标签
  

闽ICP备14008679号