赞
踩
在Spring Kafka中,使用@KafkaListener
注解处理批量信息时,首先需要开启批量监听模式,并配置相应的consumer参数来控制批量消费行为。以下是配置和处理批量消息的基本步骤:
配置Kafka消费者工厂:
设置batchListener
属性为true
,使@KafkaListener
支持批量消费。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 开启批量监听模式
factory.setBatchListener(true);
// 其他相关配置,比如并发度、错误处理等
return factory;
}
配置消费者参数:
设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
,指定每次poll
请求从Kafka服务器获取的最大记录数。并且关闭offset自动提交enable-auto-commit: false
# application.properties 或 application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: my-group
max-poll-records: 100
# 其他配置项,如enable-auto-commit, auto-offset-reset等
编写批量处理方法:
定义一个方法,其参数是一个包含多条消息的列表,@KafkaListener
注解下的方法将会接收到批量的消息。
@KafkaListener(topics = "my-topic") public void processMessages(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { try { // 处理批量消息 for (ConsumerRecord<String, String> record : records) { // 对每条消息进行处理 } // 成功处理后手动提交偏移量 acknowledgment.acknowledge(); } catch (Exception e) { // 错误处理,记录错误,考虑是否重试或者有其他补偿措施 log.error("Error processing message batch", e); } }
处理异常和偏移量提交:
当批量处理消息时,需要注意的是,一旦消息处理完成且没有错误,应当手动提交偏移量,以确认这些消息已经被成功消费。如果有消息处理失败,则可能需要根据业务需求选择不同的策略,比如重新尝试处理整个批次、跳过错误消息或者记录错误信息稍后处理。
通过以上步骤,@KafkaListener
就能按照批处理的方式接收并处理Kafka主题中的消息了。
批量消费Kafka中的消息,然后将这些消息放入队列中,最后利用线程池异步处理这些队列中的消息。这种方式有助于优化资源利用率,尤其是当消息处理逻辑较为耗时或者IO密集型时,可以有效提升系统的并行处理能力和吞吐量。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @Component public class BatchMessageProcessor { private final ThreadPoolTaskExecutor taskExecutor; private final BlockingQueue<ConsumerRecord<String, String>> messageQueue = new LinkedBlockingQueue<>(); public BatchMessageProcessor(ThreadPoolTaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } @KafkaListener(topics = "my-topic", batch = true) public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { for (ConsumerRecord<String, String> record : records) { // 将消费到的消息放入队列 messageQueue.offer(record); } // 异步处理消息队列 processMessageQueue(acknowledgment); } private void processMessageQueue(Acknowledgment acknowledgment) { List<ConsumerRecord<String, String>> messagesToProcess; synchronized (messageQueue) { // 从队列中批量取出消息 messagesToProcess = new ArrayList<>(messageQueue.size()); messageQueue.drainTo(messagesToProcess, 100); // 假设批量处理100条 } if (!messagesToProcess.isEmpty()) { ListenableFuture<?> future = taskExecutor.submit(() -> { for (ConsumerRecord<String, String> record : messagesToProcess) { // 实际处理消息的逻辑 processSingleMessage(record); } // 所有消息处理完毕后提交偏移量 acknowledgment.acknowledge(); }); // 可以添加回调函数,用于处理线程池任务执行后的结果 future.addCallback(new ListenableFutureCallback<Object>() { @Override public void onSuccess(Object result) { // 处理成功逻辑 } @Override public void onFailure(Throwable ex) { // 处理失败逻辑,如日志记录、重试等 } }); } } private void processSingleMessage(ConsumerRecord<String, String> record) { // 这里实现单个消息的具体处理逻辑 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。