赞
踩
- @Configuration
- public class KafkaConfiguration {
-
- /**
- * 解决批量消费的问题
- * @param properties 配置信息,springboot 从配置文件获取, 自动注入
- * @return 批量工厂类
- */
- @Bean
- public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
- Map<String, Object> consumerProperties = properties.buildConsumerProperties();
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new
- ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
- factory.setBatchListener(true); // 开启批量监听
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
- return factory;
- }
-
- }
kafka listener
- @KafkaListener(topics = "test", groupId = "group-1",containerFactory = "batchFactory")
- public void listenGroups(ConsumerRecords<Object,Object> records, Acknowledgment ack){
- for (ConsumerRecord<Object, Object> record : records) {
- System.out.println(record.value());
- }
- ack.acknowledge(); // 默认一次poll 500 条消息,消费完所有的 records 之后再commit,手动提交
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。