当前位置:   article > 正文

Kafka开启批量消费_kafka批量消费

kafka批量消费

  1. @Configuration
  2. public class KafkaConfiguration {
  3. /**
  4. * 解决批量消费的问题
  5. * @param properties 配置信息,springboot 从配置文件获取, 自动注入
  6. * @return 批量工厂类
  7. */
  8. @Bean
  9. public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
  10. Map<String, Object> consumerProperties = properties.buildConsumerProperties();
  11. ConcurrentKafkaListenerContainerFactory<String, String> factory = new
  12. ConcurrentKafkaListenerContainerFactory<>();
  13. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
  14. factory.setBatchListener(true); // 开启批量监听
  15. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  16. return factory;
  17. }
  18. }

kafka listener

  1. @KafkaListener(topics = "test", groupId = "group-1",containerFactory = "batchFactory")
  2. public void listenGroups(ConsumerRecords<Object,Object> records, Acknowledgment ack){
  3. for (ConsumerRecord<Object, Object> record : records) {
  4. System.out.println(record.value());
  5. }
  6. ack.acknowledge(); // 默认一次poll 500 条消息,消费完所有的 records 之后再commit,手动提交
  7. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/419811
推荐阅读
相关标签
  

闽ICP备14008679号