赞
踩
当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
ackMode有以下7种值:
- public enum AckMode {
- // 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- RECORD,
- // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- BATCH,
- // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
- TIME,
- // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
- COUNT,
- // TIME | COUNT 有一个条件满足时提交
- COUNT_TIME,
- // 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
- MANUAL,
- // 手动调用Acknowledgment.acknowledge()后立即提交
- MANUAL_IMMEDIATE,
- }
如果设置 AckMode 模式为 MANUAL
或者 MANUAL_IMMEDIATE
,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交;
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer的run()方法和processCommits()方法使用:
- public void run() {
- ......
- while (isRunning()) {
- try {
- if (!this.autoCommit && !this.isRecordAck) {
- // 当 autoCommit 为false时且 ackMode不是record时 调用 processCommits 方法,判断如何手动提交
- processCommits();
- }
- processSeeks();
- ......
- } }
- }
- private void processCommits() {
- // acks 是一个LinkedBlockingQueue类型的阻塞队列,存放从kafka读取到的record数据
- this.count += this.acks.size();
- handleAcks();
- long now;
- AckMode ackMode = this.containerProperties.getAckMode();
- if (!this.isManualImmediateAck) { // 不是使用者手动调用
- if (!this.isManualAck) {
- updatePendingOffsets();
- }
- boolean countExceeded = this.count >= this.containerProperties.getAckCount();
- if (this.isManualAck || this.isBatchAck || this.isRecordAck
- || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
- ......
- commitIfNecessary();
- this.count = 0;
- }
- else {
- now = System.currentTimeMillis();
- boolean elapsed = now - this.last > this.containerProperties.getAckTime();
- if (ackMode.equals(AckMode.TIME) && elapsed) {
- ......
- commitIfNecessary();
- this.last = now;
- }
- else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
- ......
- commitIfNecessary();
- this.last = now;
- this.count = 0;
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。