当前位置:   article > 正文

spring kafka消费者配置介绍----ackMode

ackmode

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;

ackMode有以下7种值:

  1. public enum AckMode {
  2. // 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  3. RECORD,
  4. // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  5. BATCH,
  6. // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  7. TIME,
  8. // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  9. COUNT,
  10. // TIME | COUNT 有一个条件满足时提交
  11. COUNT_TIME,
  12. // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  13. MANUAL,
  14. // 手动调用Acknowledgment.acknowledge()后立即提交
  15. MANUAL_IMMEDIATE,
  16. }

如果设置 AckMode 模式为 MANUAL 或者 MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交;

在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer的run()方法和processCommits()方法使用:

  1. public void run() {
  2.   ......
  3.   while (isRunning()) {
  4.   try {
  5.   if (!this.autoCommit && !this.isRecordAck) { 
  6. // 当 autoCommit 为false时且 ackMode不是record时 调用 processCommits 方法,判断如何手动提交
  7.           processCommits();
  8.         }
  9.         processSeeks();
  10.         ......
  11.     }  }
  12. }

 

  1. private void processCommits() {
  2. // acks 是一个LinkedBlockingQueue类型的阻塞队列,存放从kafka读取到的record数据
  3. this.count += this.acks.size();  
  4. handleAcks();
  5. long now;
  6. AckMode ackMode = this.containerProperties.getAckMode();
  7. if (!this.isManualImmediateAck) {  // 不是使用者手动调用
  8. if (!this.isManualAck) {
  9. updatePendingOffsets();
  10. }
  11. boolean countExceeded = this.count >= this.containerProperties.getAckCount(); 
  12. if (this.isManualAck || this.isBatchAck || this.isRecordAck
  13. || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
  14. ......
  15. commitIfNecessary();
  16. this.count = 0;
  17. }
  18. else {
  19. now = System.currentTimeMillis();
  20. boolean elapsed = now - this.last > this.containerProperties.getAckTime();
  21. if (ackMode.equals(AckMode.TIME) && elapsed) {
  22. ......
  23.                commitIfNecessary();
  24. this.last = now;
  25. }
  26. else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
  27.               ......
  28. commitIfNecessary();
  29. this.last = now;
  30. this.count = 0;
  31. }
  32. }
  33. }
  34. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/623393
推荐阅读
相关标签
  

闽ICP备14008679号