当前位置:   article > 正文

Kafka批量消费_java kafka批量消费

java kafka批量消费

在Spring Kafka中,使用@KafkaListener注解处理批量信息时,首先需要开启批量监听模式,并配置相应的consumer参数来控制批量消费行为。以下是配置和处理批量消息的基本步骤:

  1. 配置Kafka消费者工厂
    设置batchListener属性为true,使@KafkaListener支持批量消费。

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 开启批量监听模式
        factory.setBatchListener(true);
        // 其他相关配置,比如并发度、错误处理等
        return factory;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  2. 配置消费者参数
    设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG,指定每次poll请求从Kafka服务器获取的最大记录数。并且关闭offset自动提交enable-auto-commit: false

    # application.properties 或 application.yml
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: my-group
          max-poll-records: 100
          # 其他配置项,如enable-auto-commit, auto-offset-reset等
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  3. 编写批量处理方法
    定义一个方法,其参数是一个包含多条消息的列表,@KafkaListener注解下的方法将会接收到批量的消息。

    @KafkaListener(topics = "my-topic")
    public void processMessages(List<ConsumerRecord<String, String>> records,
                                Acknowledgment acknowledgment) {
        try {
            // 处理批量消息
            for (ConsumerRecord<String, String> record : records) {
                // 对每条消息进行处理
            }
    
            // 成功处理后手动提交偏移量
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 错误处理,记录错误,考虑是否重试或者有其他补偿措施
            log.error("Error processing message batch", e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  4. 处理异常和偏移量提交
    当批量处理消息时,需要注意的是,一旦消息处理完成且没有错误,应当手动提交偏移量,以确认这些消息已经被成功消费。如果有消息处理失败,则可能需要根据业务需求选择不同的策略,比如重新尝试处理整个批次、跳过错误消息或者记录错误信息稍后处理。

通过以上步骤,@KafkaListener就能按照批处理的方式接收并处理Kafka主题中的消息了。

批量消费Kafka中的消息,然后将这些消息放入队列中,最后利用线程池异步处理这些队列中的消息。这种方式有助于优化资源利用率,尤其是当消息处理逻辑较为耗时或者IO密集型时,可以有效提升系统的并行处理能力和吞吐量。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class BatchMessageProcessor {

    private final ThreadPoolTaskExecutor taskExecutor;
    private final BlockingQueue<ConsumerRecord<String, String>> messageQueue = new LinkedBlockingQueue<>();

    public BatchMessageProcessor(ThreadPoolTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @KafkaListener(topics = "my-topic", batch = true)
    public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        for (ConsumerRecord<String, String> record : records) {
            // 将消费到的消息放入队列
            messageQueue.offer(record);
        }

        // 异步处理消息队列
        processMessageQueue(acknowledgment);
    }

    private void processMessageQueue(Acknowledgment acknowledgment) {
        List<ConsumerRecord<String, String>> messagesToProcess;
        synchronized (messageQueue) {
            // 从队列中批量取出消息
            messagesToProcess = new ArrayList<>(messageQueue.size());
            messageQueue.drainTo(messagesToProcess, 100); // 假设批量处理100条
        }

        if (!messagesToProcess.isEmpty()) {
            ListenableFuture<?> future = taskExecutor.submit(() -> {
                for (ConsumerRecord<String, String> record : messagesToProcess) {
                    // 实际处理消息的逻辑
                    processSingleMessage(record);
                }
                // 所有消息处理完毕后提交偏移量
                acknowledgment.acknowledge();
            });

            // 可以添加回调函数,用于处理线程池任务执行后的结果
            future.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onSuccess(Object result) {
                    // 处理成功逻辑
                }

                @Override
                public void onFailure(Throwable ex) {
                    // 处理失败逻辑,如日志记录、重试等
                }
            });
        }
    }

    private void processSingleMessage(ConsumerRecord<String, String> record) {
        // 这里实现单个消息的具体处理逻辑
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/391450
推荐阅读
相关标签
  

闽ICP备14008679号