赞
踩
Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。
Kafka适用于以下场景:
springboot-kafka │ ├── src │ ├── main │ │ ├── java │ │ │ └── com.example.kafka │ │ │ ├── KafkaApplication.java │ │ │ ├── config │ │ │ │ └── KafkaConfig.java │ │ │ ├── producer │ │ │ │ └── KafkaProducer.java │ │ │ ├── consumer │ │ │ │ └── KafkaConsumer.java │ │ │ └── controller │ │ │ └── KafkaController.java │ │ └── resources │ │ ├── application.yml │ │ └── logback-spring.xml (可选) │ └── test │ └── java │ └── com.example.kafka │ └── KafkaApplicationTests.java └── pom.xml
pom.xml
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
application.yml
- spring:
- kafka:
- bootstrap-servers: localhost:9092
- consumer:
- group-id: my-group
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
KafkaConfig.java
- package com.example.kafka.config;
-
- import org.apache.kafka.clients.admin.NewTopic;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class KafkaConfig {
-
- @Bean
- public NewTopic myTopic() {
- return new NewTopic("my-topic", 1, (short) 1);
- }
- }
KafkaProducer.java
- package com.example.kafka.producer;
-
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
-
- @Service
- public class KafkaProducer {
-
- private final KafkaTemplate<String, String> kafkaTemplate;
-
- public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
- this.kafkaTemplate = kafkaTemplate;
- }
-
- public void sendMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
KafkaConsumer.java
- package com.example.kafka.consumer;
-
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
-
- @Service
- public class KafkaConsumer {
-
- @KafkaListener(topics = "my-topic", groupId = "my-group")
- public void listen(String message) {
- System.out.println("Received message: " + message);
- }
- }
KafkaController.java
- package com.example.kafka.controller;
-
- import com.example.kafka.producer.KafkaProducer;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class KafkaController {
-
- private final KafkaProducer kafkaProducer;
-
- public KafkaController(KafkaProducer kafkaProducer) {
- this.kafkaProducer = kafkaProducer;
- }
-
- @GetMapping("/send")
- public String sendMessage(@RequestParam String message) {
- kafkaProducer.sendMessage("my-topic", message);
- return "Message sent";
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
KafkaApplication.java
- package com.example.kafka;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class KafkaApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(KafkaApplication.class, args);
- }
- }
通过访问以下URL来发送消息:
http://localhost:8080/send?message=HelloKafka
为了更好地查看Kafka的日志,可以添加logback-spring.xml
配置:
logback-spring.xml
- <configuration>
- <springProfile name="default">
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern>
- </encoder>
- </appender>
- <logger name="org.apache.kafka" level="INFO"/>
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- </root>
- </springProfile>
- </configuration>
KafkaApplicationTests.java
- package com.example.kafka;
-
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class KafkaApplicationTests {
-
- @Test
- void contextLoads() {
- }
- }
至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。