赞
踩
注:上篇博客说了怎么搭建kafka集群,并实现springboot集成kafka实现消息单条消费,但是如果有场景需要批量消费,那么就需要对消费者进行配置了,并可以实现并发消费,即多个KafkaListener同时消费消息,以此达到批量消费和提高速度的目的。
1.上消费者配置
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String service;
-
- @Value("${spring.kafka.consumer.group-id}")
- private String groupid;
-
- @Value("${spring.kafka.consumer.enable-auto-commit}")
- private String autoCommit;
-
- @Value("${spring.kafka.consumer.auto-commit-interval}")
- private String interval;
-
- // 默认发送心跳时间为10000ms,超时时间需要大于发送心跳时间
- @Value("10000")
- private String timeout;
-
- @Value("${spring.kafka.consumer.key-deserializer}")
- private String keyDeserializer;
-
- @Value("${spring.kafka.consumer.value-deserializer}")
- private String valueDeserializer;
-
- @Value("${spring.kafka.consumer.auto-offset-reset}")
- private String offsetReset;
-
-
- /**
- * 获取kafka配置
- * @return 配置map
- */
- private Map<String,Object> consumerConfig(){
- Map<String,Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,service);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, interval);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,timeout);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,offsetReset);
- return props;
- }
-
- /**
- * 获取工厂
- * @return kafka工厂
- */
- private ConsumerFactory<String,String> consumerFactory(){
- Map<String, Object> props = consumerConfig();
- // 日志过滤入库一批量为1500条消息
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1500);// 单次poll的数量,批量消费时配置
- return new DefaultKafkaConsumerFactory<>(consumerConfig());
- }
-
- /**
- * 实时推送使用的消费者工厂
- * @return kafka消费者工厂
- */
- private ConsumerFactory<String,String> infoPushConsumerFactory(){
- Map<String, Object> props = consumerConfig();
- // 实时推送单次批量拉取数据设置为150
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,150);
- return new DefaultKafkaConsumerFactory<>(props);
- }
-
- /**
- * 获取kafka实例,该方法为单条消费
- * @return kafka实例
- */
- @Bean(name = "kafkaListenerContainerFactory1")
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory1(){
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(1); // 连接池中消费者数量
- factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
- System.out.println("调用的是自定义消费者池");
- return factory;
- }
-
- /**
- * 获取kafka实例,该实例为批量消费
- * @return kafka实例
- */
- @Bean(name = "kafkaListenerContainerFactory2")
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory2(){
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(2); // 连接池中消费者数量
- factory.setBatchListener(true); // 是否并发消费
- factory.getContainerProperties().setPollTimeout(4000); //拉取topic的超时时间
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
- return factory;
- }
-
- /**
- * 实时推送获取的kafka实例,该实例为批量消费
- * @return kafka实例
- */
- @Bean(name = "infoPushKafkaListenerContainerFactory")
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> infoPushKafkaListenerContainerFactory(){
- ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(infoPushConsumerFactory());
- factory.setConcurrency(10); // 连接池中消费者数量
- factory.setBatchListener(true); // 是否并发消费
- factory.getContainerProperties().setPollTimeout(4000); // 拉取topic的超时时间
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 每次poll之前提交一次偏移
- // factory.getConsumerFactory().getConfigurationProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); // 单次poll的数量
- return factory;
- }
- }
上述配置中,有多个消费者实例,在使用时指定bean名称即可
factory.setBatchListener(true)这行代码开启并发消费
factory.setConcurrency(2)这行代码声明有几个KafkaListener同时监听,达到多线程目的
如何手动管理偏移量?
先将自动提交设置为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);//这里的autoCommit为变量,我设置的false
然后设置消费工厂的提交模式
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移
-
-
-
- //
- AckMode 如下:
-
- RECORD :当listener一读到消息,就提交offset
-
- BATCH : poll() 函数读取到的所有消息,就提交offset
-
- TIME : 当超过设置的ackTime ,即提交Offset
-
- COUNT :当超过设置的COUNT,即提交Offset
-
- COUNT_TIME :TIME和COUNT两个条件都满足,提交offset
-
- MANUAL : Acknowledgment.acknowledge()即提交Offset,和Batch类似
-
- MANUAL_IMMEDIATE: Acknowledgment.acknowledge()被调用即提交Offset
设置完后,可以再listen中进行回调,手动提交偏移量
- //批量消费
- @KafkaListener(topics = {"topicone"},containerFactory="kafkaListenerContainerFactory2")
- public void batchConsumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
- System.out.println("此线程消费"+records.size()+"条消息----线程名:"+Thread.currentThread().getName());
- records.forEach(record -> System.out.println("topic名称:"+record.topic()+"\n"+"分区位置:"+record.partition()+"\n"+"key:"+record.key()+"\n"+"偏移量:"+record.offset()+"\n"+"消息内容:"+record.value()));
- ack.acknowledge();
- }
下面看结果:
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key2, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@45]
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key3, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@46]
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key4, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@47]
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key5, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@48]
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key0, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@37]
- topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key1, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@38]
- 此线程消费4条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1
- 此线程消费2条消息----线程名:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。