当前位置:   article > 正文

springboot整合kafka_springboot+kafka

springboot+kafka

本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka。Kafka是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量。 在本教程中,我们将使用 Spring Boot 2.7.2 和Kafka 2.8.1。

步骤一:添加依赖项

在 pom.xml 中添加以下依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

步骤二:配置 Kafka

在 application.yml 文件中添加以下配置:

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.79.104:9092
    producer: # 生产者
      retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
  redis:
    host: 192.168.79.104
    port: 6379
    password: 123321
    lettuce:
      pool:
        max-active: 10
        max-idle: 10
        min-idle: 1
        time-between-eviction-runs: 10s

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这里我们配置了 Kafka 的服务地址为 192.168.79.104:9092,配置了一个消费者组 ID 为 default-group。在生产者方面,我们配置了消息序列化程序为 StringSerializer。

步骤三:创建一个生产者

现在,我们将创建一个 Kafka 生产者,用于发送消息到 Kafka 服务器。在这里,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。
首先,我们将创建一个 KafkaProducerConfig 类,用于配置 Kafka 生产者:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

在上面的代码中,我们使用 @Configuration 注解将 KafkaProducerConfig 类声明为配置类。然后,我们使用 @Value 注解注入配置文件中的 bootstrap-servers 属性。
接下来,我们创建了一个 producerConfigs 方法,用于设置 Kafka 生产者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、KEY_SERIALIZER_CLASS_CONFIG 和 VALUE_SERIALIZER_CLASS_CONFIG 三个属性。
然后,我们创建了一个 producerFactory 方法,用于创建 Kafka 生产者工厂。在这里,我们使用了 DefaultKafkaProducerFactory 类,并传递了我们的配置。
最后,我们创建了一个 kafkaTemplate 方法,用于创建 KafkaTemplate 实例。在这里,我们使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例。
接下来,我们将创建一个 RESTful 端点,用于接收 POST 请求并将消息发送到 Kafka。在这里,我们将使用 @RestController 注解创建一个 RESTful 控制器:

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在上面的代码中,我们使用 @Autowired 注解将 KafkaTemplate 实例注入到 KafkaController 类中。然后,我们创建了一个 sendMessage 方法,用于发送消息到 Kafka。
在这里,我们使用 kafkaTemplate.send 方法发送消息到 my-topic 主题。send 方法返回一个 ListenableFuture 对象,用于异步处理结果。

步骤四:创建一个消费者

现在,我们将创建一个 Kafka 消费者,用于从 Kafka 服务器接收消息。在这里,我们将创建一个消费者组,并将其配置为从 my-topic 主题读取消息。
首先,我们将创建一个 KafkaConsumerConfig 类,用于配置 Kafka 消费者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

在上面的代码中,我们使用 @Configuration 注解将 KafkaConsumerConfig 类声明为配置类,并使用 @EnableKafka 注解启用 Kafka。
然后,我们使用 @Value 注解注入配置文件中的 bootstrap-servers 和 consumer.group-id 属性。
接下来,我们创建了一个 consumerConfigs 方法,用于设置 Kafka 消费者的配置。在这里,我们设置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG、AUTO_OFFSET_RESET_CONFIG、KEY_DESERIALIZER_CLASS_CONFIG 和 VALUE_DESERIALIZER_CLASS_CONFIG 五个属性。
然后,我们创建了一个 consumerFactory 方法,用于创建 Kafka 消费者工厂。在这里,我们使用了 DefaultKafkaConsumerFactory 类,并传递了我们的配置。
最后,我们创建了一个 kafkaListenerContainerFactory 方法,用于创建一个 ConcurrentKafkaListenerContainerFactory 实例。在这里,我们将消费者工厂注入到 kafkaListenerContainerFactory 实例中。

接下来,我们将创建一个 Kafka 消费者类 KafkaConsumer,用于监听 my-topic 主题并接收消息:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "default-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为default-group。
现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。
这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/951047
推荐阅读
相关标签
  

闽ICP备14008679号