赞
踩
本文是使用Spring体系中的Kafka库(官网地址),里面有一个比较重要的信息是Spring-kafka、kafka-clients、Spring Boot三者的版本对应关系,版本对应不上会出现奇奇怪怪问题
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
消费者consumer的属性配置包含内容比较多,会有单独的章节进行详细介绍
kafka.properties
#这里配置单机集群都可以 spring.kafka.consumer.bootstrap-servers=10.17.70.6:9092,10.17.70.6:9093,10.17.70.6:9094 spring.kafka.consumer.topic=centerm-cluster-topic # consumer group 标识 spring.kafka.consumer.group-id=centerm-cluster-topic-group # 是否自动提交offset(告知kafka当前consumer group读取到的消息位置) spring.kafka.consumer.enable-auto-commit=true # 提交offset频率 spring.kafka.consumer.auto-commit-interval-ms=1000 # 并发数 spring.kafka.consumer.concurrency=5 # 是否开启批量获取消息 spring.kafka.consumer.batch-listener=true # 一次获取最大记录数 spring.kafka.consumer.max-poll-records=10000 # 最小获取消息字节数 spring.kafka.consumer.fetch-min-bytes=1048576 # 最大的等待获取消息时间(最多多久就要获取一次)与fetch-min-bytes满足一个条件,broker就会发送消息给consumer spring.kafka.consumer.fetch-max-wait-ms=1000 # 获取记录超时时间 spring.kafka.consumer.max-poll-time-out=30000 # Session超时时间(用于断点检测,心跳) spring.kafka.consumer.session-time-out-ms=30000 # 偏移量失效后latest-从最新的消息读 earliest-从最早的消息开始读 spring.kafka.consumer.auto-offset-reset=earliest # 轮询策略org.apache.kafka.clients.consumer.RangeAssignor 和 org.apache.kafka.clients.consumer.RoundRobinAssignor spring.kafka.consumer.partition-assignment-strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
KafkaConsumerConfig.java
@Configuration @EnableKafka @Slf4j @Profile({ "kafkaReceiver-single","kafkaReceiver-batch"}) public class KafkaConsumerConfig { private static final Integer CONSUMER_CONFIGS_COUNT = 15; @Value("${spring.kafka.consumer.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean autoCommit; @Value("${spring.kafka.consumer.auto-commit-interval-ms}") private Integer autoCommitIntervalMs; @Value("${spring.kafka.consumer.concurrency}") private Integer concurrency; @Value("${spring.kafka.consumer.batch-listener}") private boolean batchListener; @Value
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。