当前位置:   article > 正文

【Kafka】SpringBoot整合Kafka详细介绍及代码示例_spring boot kafka

spring boot kafka

Kafka介绍

Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。

  • Producer:生产者,负责将数据发送到Kafka集群。
  • Consumer:消费者,从Kafka集群中读取数据。
  • Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
  • Topic:主题,数据按主题进行分类。
  • Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
  • Offset:偏移量,每个消息在其分区中的唯一标识。

使用场景

Kafka适用于以下场景:

  1. 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
  2. 消息队列:作为高吞吐量、低延迟的消息队列系统。
  3. 数据流处理:实时处理数据流,用于实时分析、监控和处理。
  4. 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
  5. 流数据管道:构建数据管道,连接数据源和数据存储系统。

Spring Boot整合Kafka 

项目结构

  1. springboot-kafka
  2. ├── src
  3. │ ├── main
  4. │ │ ├── java
  5. │ │ │ └── com.example.kafka
  6. │ │ │ ├── KafkaApplication.java
  7. │ │ │ ├── config
  8. │ │ │ │ └── KafkaConfig.java
  9. │ │ │ ├── producer
  10. │ │ │ │ └── KafkaProducer.java
  11. │ │ │ ├── consumer
  12. │ │ │ │ └── KafkaConsumer.java
  13. │ │ │ └── controller
  14. │ │ │ └── KafkaController.java
  15. │ │ └── resources
  16. │ │ ├── application.yml
  17. │ │ └── logback-spring.xml (可选)
  18. │ └── test
  19. │ └── java
  20. │ └── com.example.kafka
  21. │ └── KafkaApplicationTests.java
  22. └── pom.xml

1. 创建Spring Boot项目并添加依赖

pom.xml
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-kafka</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-test</artifactId>
  17. <scope>test</scope>
  18. </dependency>
  19. </dependencies>

2. 配置Kafka

application.yml
  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. consumer:
  5. group-id: my-group
  6. auto-offset-reset: earliest
  7. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9. producer:
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 创建Kafka配置类

KafkaConfig.java
  1. package com.example.kafka.config;
  2. import org.apache.kafka.clients.admin.NewTopic;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class KafkaConfig {
  7. @Bean
  8. public NewTopic myTopic() {
  9. return new NewTopic("my-topic", 1, (short) 1);
  10. }
  11. }

4. 创建Kafka生产者

KafkaProducer.java
  1. package com.example.kafka.producer;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaProducer {
  6. private final KafkaTemplate<String, String> kafkaTemplate;
  7. public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  8. this.kafkaTemplate = kafkaTemplate;
  9. }
  10. public void sendMessage(String topic, String message) {
  11. kafkaTemplate.send(topic, message);
  12. }
  13. }

5. 创建Kafka消费者

KafkaConsumer.java
  1. package com.example.kafka.consumer;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumer {
  6. @KafkaListener(topics = "my-topic", groupId = "my-group")
  7. public void listen(String message) {
  8. System.out.println("Received message: " + message);
  9. }
  10. }

6. 创建控制器发送消息

KafkaController.java
  1. package com.example.kafka.controller;
  2. import com.example.kafka.producer.KafkaProducer;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestParam;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class KafkaController {
  8. private final KafkaProducer kafkaProducer;
  9. public KafkaController(KafkaProducer kafkaProducer) {
  10. this.kafkaProducer = kafkaProducer;
  11. }
  12. @GetMapping("/send")
  13. public String sendMessage(@RequestParam String message) {
  14. kafkaProducer.sendMessage("my-topic", message);
  15. return "Message sent";
  16. }
  17. }

7. 创建Spring Boot主类

KafkaApplication.java
  1. package com.example.kafka;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaApplication.class, args);
  8. }
  9. }

8. 测试应用

通过访问以下URL来发送消息:

http://localhost:8080/send?message=HelloKafka

9. 日志配置(可选)

为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:

logback-spring.xml
  1. <configuration>
  2. <springProfile name="default">
  3. <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  4. <encoder>
  5. <pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern>
  6. </encoder>
  7. </appender>
  8. <logger name="org.apache.kafka" level="INFO"/>
  9. <root level="INFO">
  10. <appender-ref ref="STDOUT"/>
  11. </root>
  12. </springProfile>
  13. </configuration>

10. 测试类(可选)

KafkaApplicationTests.java
  1. package com.example.kafka;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.boot.test.context.SpringBootTest;
  4. @SpringBootTest
  5. class KafkaApplicationTests {
  6. @Test
  7. void contextLoads() {
  8. }
  9. }

至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/863894
推荐阅读
相关标签
  

闽ICP备14008679号