赞
踩
在Spring Boot项目中使用@KafkaListener
注解,可以方便地监听和处理从Kafka主题中流入的消息。以下是一个详细的实战指南:
首先,确保在Spring Boot项目中引入了Spring Kafka的相关依赖。在pom.xml
中添加以下内容:
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <!-- 替换为你使用的Spring Kafka版本 --> <version>2.8.0</version> </dependency> <!-- 如果需要自动配置Kafka工厂bean,可以添加以下依赖 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <!-- 同上,确保版本一致 --> <version>2.8.0</version> <!-- 测试环境中才需要这个依赖 --> <scope>test</scope> </dependency> </dependencies>
在application.yml
或application.properties
中配置Kafka连接信息:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka集群地址
consumer:
group-id: my-consumer-group # 消费者组ID
auto-offset-reset: earliest # 默认偏移量位置(earliest/latest)
enable-auto-commit: false # 是否自动提交offset,一般建议关闭以便手动控制
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值序列化器
创建一个Java类,使用@KafkaListener
注解标注监听方法,该方法将在监听到相应主题的消息时被调用。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaConsumer { @KafkaListener(topics = "my-topic") public void listenToMessage(String message) { System.out.println("Received message from Kafka: " + message); // 在这里处理消息逻辑 } // 如果需要监听多个主题,可以添加更多的@KafkaListener注解方法 @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}") public void listenToMultipleTopics(String message) { // ... } }
在上面的示例中:
@Component
注解表明这是一个Spring Bean,因此会被Spring容器管理。@KafkaListener(topics = "my-topic")
注解标记了监听的主题名为my-topic
的方法。动态主题:可以通过SpEL表达式或者其他Spring Expression Language的方式来动态指定监听的主题。
并发消费:默认情况下,每个@KafkaListener
注解方法将会在一个单独的线程中运行。如果希望在同一方法中并发处理多个主题或者分区,可以配置concurrency
属性:
@KafkaListener(id = "multi-topic-listener", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "3")
public void listenToMultipleTopicsConcurrently(String message) {
// ...
}
这里的concurrency = "3"
意味着会创建3个工作线程并发处理来自指定主题的消息。
如果在配置中关闭了自动提交,那么在处理完消息后需要手动提交偏移量:
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.annotation.KafkaListener;
@KafkaListener(topics = "my-topic")
public void listenToMessage(String message, Acknowledgment acknowledgment) {
System.out.println("Processing message: " + message);
// ... 处理消息逻辑
acknowledgment.acknowledge(); // 手动提交偏移量
}
可以配置监听方法的异常处理器,以及消息重试策略,这些都是Spring Kafka提供的高级功能,可以根据业务需求进行配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。