当前位置:   article > 正文

Kafka 实战 - SpringBoot 中使用@KafkaListener详解与使用_springboot kafkalistener

springboot kafkalistener

在Spring Boot项目中使用@KafkaListener注解,可以方便地监听和处理从Kafka主题中流入的消息。以下是一个详细的实战指南:

引入依赖

首先,确保在Spring Boot项目中引入了Spring Kafka的相关依赖。在pom.xml中添加以下内容:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <!-- 替换为你使用的Spring Kafka版本 -->
        <version>2.8.0</version>
    </dependency>
    <!-- 如果需要自动配置Kafka工厂bean,可以添加以下依赖 -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <!-- 同上,确保版本一致 -->
        <version>2.8.0</version>
        <!-- 测试环境中才需要这个依赖 -->
        <scope>test</scope>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

配置Kafka连接

application.ymlapplication.properties中配置Kafka连接信息:

spring:
  kafka:
    bootstrap-servers: localhost:9092 # Kafka集群地址
    consumer:
      group-id: my-consumer-group # 消费者组ID
      auto-offset-reset: earliest # 默认偏移量位置(earliest/latest)
      enable-auto-commit: false # 是否自动提交offset,一般建议关闭以便手动控制
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值序列化器
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

创建消费者类

创建一个Java类,使用@KafkaListener注解标注监听方法,该方法将在监听到相应主题的消息时被调用。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaConsumer {

    @KafkaListener(topics = "my-topic")
    public void listenToMessage(String message) {
        System.out.println("Received message from Kafka: " + message);
        // 在这里处理消息逻辑
    }

    // 如果需要监听多个主题,可以添加更多的@KafkaListener注解方法
    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
    public void listenToMultipleTopics(String message) {
        // ...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在上面的示例中:

  • @Component注解表明这是一个Spring Bean,因此会被Spring容器管理。
  • @KafkaListener(topics = "my-topic")注解标记了监听的主题名为my-topic的方法。
  • 方法参数类型根据实际消息内容进行设定,这里假设是字符串类型。
  • 若需要动态监听多个主题,可以使用SpEL表达式从配置文件中读取主题列表。

动态主题和并发消费

  • 动态主题:可以通过SpEL表达式或者其他Spring Expression Language的方式来动态指定监听的主题。

  • 并发消费:默认情况下,每个@KafkaListener注解方法将会在一个单独的线程中运行。如果希望在同一方法中并发处理多个主题或者分区,可以配置concurrency属性:

@KafkaListener(id = "multi-topic-listener", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "3")
public void listenToMultipleTopicsConcurrently(String message) {
    // ...
}
  • 1
  • 2
  • 3
  • 4

这里的concurrency = "3"意味着会创建3个工作线程并发处理来自指定主题的消息。

手动提交偏移量

如果在配置中关闭了自动提交,那么在处理完消息后需要手动提交偏移量:

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.annotation.KafkaListener;

@KafkaListener(topics = "my-topic")
public void listenToMessage(String message, Acknowledgment acknowledgment) {
    System.out.println("Processing message: " + message);
    // ... 处理消息逻辑
    acknowledgment.acknowledge(); // 手动提交偏移量
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

错误处理与重试

可以配置监听方法的异常处理器,以及消息重试策略,这些都是Spring Kafka提供的高级功能,可以根据业务需求进行配置。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/926467
推荐阅读
相关标签
  

闽ICP备14008679号