当前位置:   article > 正文

Spring Boot集成Kafka详解_spring boot 集成kafaka

spring boot 集成kafaka

Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。

图片

1. 添加依赖

首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. </dependencies>

2. 配置Kafka

接下来,我们需要在application.properties文件中配置Kafka的相关参数。

  1. # Kafka服务器地址
  2. spring.kafka.bootstrap-servers=localhost:9092
  3. # 消费者组ID
  4. spring.kafka.consumer.group-id=myGroup
  5. # 消费者自动提交偏移量
  6. spring.kafka.consumer.enable-auto-commit=true

3. 创建Kafka生产者

现在我们可以创建一个Kafka生产者来发送消息。首先,我们需要注入KafkaTemplate。

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;

然后,我们可以创建一个方法来发送消息。

  1. public void sendMessage(String topic, String message) {
  2. kafkaTemplate.send(topic, message);
  3. }

4. 创建Kafka消费者

同样,我们也可以创建一个Kafka消费者来接收消息。首先,我们需要注入ConsumerFactory和ConsumerConfig。

  1. @Autowired
  2. private ConsumerFactory<String, String> consumerFactory;
  3. @Autowired
  4. private ConsumerConfig<String, String> consumerConfig;

然后,我们可以创建一个方法来接收消息。

  1. public void consumeMessages() {
  2. this.consumer = consumerFactory.createConsumer(consumerConfig);
  3. this.consumer.subscribe(Arrays.asList("myTopic"));
  4. while (true) {
  5. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  6. for (ConsumerRecord<String, String> record : records) {
  7. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  8. }
  9. }
  10. }

5. 测试Kafka集成

最后,我们可以创建一个主方法来测试我们的Kafka集成。首先,我们需要发送一条消息,然后消费这条消息。

  1. @SpringBootApplication
  2. public class KafkaDemoApplication implements CommandLineRunner {
  3. @Autowired
  4. private KafkaTemplate<String, String> kafkaTemplate;
  5. @Autowired
  6. private ConsumerFactory<String, String> consumerFactory;
  7. @Autowired
  8. private ConsumerConfig<String, String> consumerConfig;
  9. private final AtomicInteger count = new AtomicInteger(); //记录每个分区的消息数,用于后续统计各分区的消息总数。
  10. public static void main(String[] args) {
  11. SpringApplication.run(KafkaDemoApplication.class, args);
  12. }
  13. @Override
  14. public void run(String... args) throws Exception {
  15. // 向主题"test"发送消息
  16. sendMessage("test", "hello");
  17. // 启动消费者
  18. consumeMessages();
  19. }
  20. /**
  21. * 发送消息到Kafka
  22. */
  23. public void sendMessage(String topic, String message) {
  24. kafkaTemplate.send(topic, message);
  25. count.incrementAndGet();
  26. }
  27. /**
  28. * 从Kafka消费消息
  29. */
  30. public void consumeMessages() {
  31. this.consumer = consumerFactory.createConsumer(consumerConfig);
  32. this.consumer.subscribe(Arrays.asList("test"));
  33. while (true) {
  34. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  35. for (ConsumerRecord<String, String> record : records) {
  36. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  37. }
  38. }
  39. }
  40. }

图片

这是Spring Boot集成Kafka栗子,在开发中根据实际情况调整。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/152013
推荐阅读
相关标签
  

闽ICP备14008679号