赞
踩
Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。
首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
接下来,我们需要在application.properties文件中配置Kafka的相关参数。
- # Kafka服务器地址
- spring.kafka.bootstrap-servers=localhost:9092
-
- # 消费者组ID
- spring.kafka.consumer.group-id=myGroup
-
- # 消费者自动提交偏移量
- spring.kafka.consumer.enable-auto-commit=true
现在我们可以创建一个Kafka生产者来发送消息。首先,我们需要注入KafkaTemplate。
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
然后,我们可以创建一个方法来发送消息。
- public void sendMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- }
同样,我们也可以创建一个Kafka消费者来接收消息。首先,我们需要注入ConsumerFactory和ConsumerConfig。
- @Autowired
- private ConsumerFactory<String, String> consumerFactory;
- @Autowired
- private ConsumerConfig<String, String> consumerConfig;
然后,我们可以创建一个方法来接收消息。
- public void consumeMessages() {
- this.consumer = consumerFactory.createConsumer(consumerConfig);
- this.consumer.subscribe(Arrays.asList("myTopic"));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
最后,我们可以创建一个主方法来测试我们的Kafka集成。首先,我们需要发送一条消息,然后消费这条消息。
- @SpringBootApplication
- public class KafkaDemoApplication implements CommandLineRunner {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- @Autowired
- private ConsumerFactory<String, String> consumerFactory;
- @Autowired
- private ConsumerConfig<String, String> consumerConfig;
- private final AtomicInteger count = new AtomicInteger(); //记录每个分区的消息数,用于后续统计各分区的消息总数。
-
- public static void main(String[] args) {
- SpringApplication.run(KafkaDemoApplication.class, args);
- }
-
- @Override
- public void run(String... args) throws Exception {
- // 向主题"test"发送消息
- sendMessage("test", "hello");
- // 启动消费者
- consumeMessages();
- }
-
- /**
- * 发送消息到Kafka
- */
- public void sendMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- count.incrementAndGet();
- }
-
- /**
- * 从Kafka消费消息
- */
- public void consumeMessages() {
- this.consumer = consumerFactory.createConsumer(consumerConfig);
- this.consumer.subscribe(Arrays.asList("test"));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
- }
这是Spring Boot集成Kafka栗子,在开发中根据实际情况调整。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。