赞
踩
整理kafka消费者批量消费消息开发笔记。
kafka使用的是2.1.11.RELEASE版本
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.1.11.RELEASE</version>
- </dependency>
Springboot项目启动类屏蔽掉自动配置
@SpringBootApplication(scanBasePackages ={"com.pengyingjun"},exclude = {KafkaAutoConfiguration.class})
新增kafka相关配置项
- kafka.bootstrap-servers = kakfa.*.*.com:9092
- kafka.consumer.auto-commit-interval = 1000
- kafka.consumer.max-poll-records = 1000
- kafka.consumer.enable-auto-commit = true
- kafka.consumer.concurrency = 5
- kafka.consumer.group-id = pengyingjun_log
- kafka.consumer.auto-offset-reset = earliest
- kafka.consumer.log_topic = pengyingjun
新增kafka消费者配置类
- @Configuration
- @EnableKafka
- @Slf4j
- public class KafkaConsumerConfig {
- /** 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 */
- @Value("${kafka.bootstrap-servers}")
- private String servers;
- /** 如果为true,则消费者的偏移量将在后台定期提交,默认值为true */
- @Value("${kafka.consumer.enable-auto-commit}")
- private boolean enableAutoCommit;
- /** 心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000 */
- @Value("${kafka.consumer.auto-commit-interval}")
- pri
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。