当前位置:   article > 正文

kafka多线程并发消费,批量消费_kafka spring pull多线程消费

kafka spring pull多线程消费

注:上篇博客说了怎么搭建kafka集群,并实现springboot集成kafka实现消息单条消费,但是如果有场景需要批量消费,那么就需要对消费者进行配置了,并可以实现并发消费,即多个KafkaListener同时消费消息,以此达到批量消费和提高速度的目的。

1.上消费者配置

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfig {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String service;
  6. @Value("${spring.kafka.consumer.group-id}")
  7. private String groupid;
  8. @Value("${spring.kafka.consumer.enable-auto-commit}")
  9. private String autoCommit;
  10. @Value("${spring.kafka.consumer.auto-commit-interval}")
  11. private String interval;
  12. // 默认发送心跳时间为10000ms,超时时间需要大于发送心跳时间
  13. @Value("10000")
  14. private String timeout;
  15. @Value("${spring.kafka.consumer.key-deserializer}")
  16. private String keyDeserializer;
  17. @Value("${spring.kafka.consumer.value-deserializer}")
  18. private String valueDeserializer;
  19. @Value("${spring.kafka.consumer.auto-offset-reset}")
  20. private String offsetReset;
  21. /**
  22. * 获取kafka配置
  23. * @return 配置map
  24. */
  25. private Map<String,Object> consumerConfig(){
  26. Map<String,Object> props = new HashMap<>();
  27. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,service);
  28. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
  29. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  30. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, interval);
  31. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,timeout);
  32. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
  33. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
  34. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,offsetReset);
  35. return props;
  36. }
  37. /**
  38. * 获取工厂
  39. * @return kafka工厂
  40. */
  41. private ConsumerFactory<String,String> consumerFactory(){
  42. Map<String, Object> props = consumerConfig();
  43. // 日志过滤入库一批量为1500条消息
  44. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1500);// 单次poll的数量,批量消费时配置
  45. return new DefaultKafkaConsumerFactory<>(consumerConfig());
  46. }
  47. /**
  48. * 实时推送使用的消费者工厂
  49. * @return kafka消费者工厂
  50. */
  51. private ConsumerFactory<String,String> infoPushConsumerFactory(){
  52. Map<String, Object> props = consumerConfig();
  53. // 实时推送单次批量拉取数据设置为150
  54. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,150);
  55. return new DefaultKafkaConsumerFactory<>(props);
  56. }
  57. /**
  58. * 获取kafka实例,该方法为单条消费
  59. * @return kafka实例
  60. */
  61. @Bean(name = "kafkaListenerContainerFactory1")
  62. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory1(){
  63. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  64. factory.setConsumerFactory(consumerFactory());
  65. factory.setConcurrency(1); // 连接池中消费者数量
  66. factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
  67. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
  68. System.out.println("调用的是自定义消费者池");
  69. return factory;
  70. }
  71. /**
  72. * 获取kafka实例,该实例为批量消费
  73. * @return kafka实例
  74. */
  75. @Bean(name = "kafkaListenerContainerFactory2")
  76. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory2(){
  77. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  78. factory.setConsumerFactory(consumerFactory());
  79. factory.setConcurrency(2); // 连接池中消费者数量
  80. factory.setBatchListener(true); // 是否并发消费
  81. factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
  82. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
  83. return factory;
  84. }
  85. /**
  86. * 实时推送获取的kafka实例,该实例为批量消费
  87. * @return kafka实例
  88. */
  89. @Bean(name = "infoPushKafkaListenerContainerFactory")
  90. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> infoPushKafkaListenerContainerFactory(){
  91. ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  92. factory.setConsumerFactory(infoPushConsumerFactory());
  93. factory.setConcurrency(10); // 连接池中消费者数量
  94. factory.setBatchListener(true); // 是否并发消费
  95. factory.getContainerProperties().setPollTimeout(4000); // 拉取topic的超时时间
  96. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 每次poll之前提交一次偏移
  97. // factory.getConsumerFactory().getConfigurationProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); // 单次poll的数量
  98. return factory;
  99. }
  100. }

上述配置中,有多个消费者实例,在使用时指定bean名称即可

factory.setBatchListener(true)这行代码开启并发消费

factory.setConcurrency(2)这行代码声明有几个KafkaListener同时监听,达到多线程目的

如何手动管理偏移量?

先将自动提交设置为false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//这里的autoCommit为变量,我设置的false

然后设置消费工厂的提交模式

  1. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
  2. //
  3. AckMode 如下:
  4. RECORD :当listener一读到消息,就提交offset
  5. BATCH : poll() 函数读取到的所有消息,就提交offset
  6. TIME : 当超过设置的ackTime ,即提交Offset
  7. COUNT :当超过设置的COUNT,即提交Offset
  8. COUNT_TIME :TIME和COUNT两个条件都满足,提交offset
  9. MANUAL : Acknowledgment.acknowledge()即提交Offset,和Batch类似
  10. MANUAL_IMMEDIATE: Acknowledgment.acknowledge()被调用即提交Offset

设置完后,可以再listen中进行回调,手动提交偏移量

  1. //批量消费
  2. @KafkaListener(topics = {"topicone"},containerFactory="kafkaListenerContainerFactory2")
  3. public void batchConsumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
  4. System.out.println("此线程消费"+records.size()+"条消息----线程名:"+Thread.currentThread().getName());
  5. records.forEach(record -> System.out.println("topic名称:"+record.topic()+"\n"+"分区位置:"+record.partition()+"\n"+"key:"+record.key()+"\n"+"偏移量:"+record.offset()+"\n"+"消息内容:"+record.value()));
  6. ack.acknowledge();
  7. }

下面看结果:

  1. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key2, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@45]
  2. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key3, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@46]
  3. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key4, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@47]
  4. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key5, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@48]
  5. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key0, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@37]
  6. topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key1, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@38]
  7. 此线程消费4条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1
  8. 此线程消费2条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/419797
推荐阅读
相关标签
  

闽ICP备14008679号