当前位置:   article > 正文

@KafkaListener

@kafkalistener

@KafkaListener源码:

  1. package org.springframework.kafka.annotation;
  2. import java.lang.annotation.Documented;
  3. import java.lang.annotation.ElementType;
  4. import java.lang.annotation.Repeatable;
  5. import java.lang.annotation.Retention;
  6. import java.lang.annotation.RetentionPolicy;
  7. import java.lang.annotation.Target;
  8. import org.springframework.messaging.handler.annotation.MessageMapping;
  9. @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  10. @Retention(RetentionPolicy.RUNTIME)
  11. @MessageMapping
  12. @Documented
  13. @Repeatable(KafkaListeners.class)
  14. public @interface KafkaListener {
  15. String id() default "";
  16. String containerFactory() default "";
  17. String[] topics() default {};
  18. String topicPattern() default "";
  19. TopicPartition[] topicPartitions() default {};
  20. String containerGroup() default "";
  21. String errorHandler() default "";
  22. String groupId() default "";
  23. boolean idIsGroup() default true;
  24. String clientIdPrefix() default "";
  25. String beanRef() default "__listener";
  26. String concurrency() default "";
  27. String autoStartup() default "";
  28. String[] properties() default {};
  29. boolean splitIterables() default true;
  30. String contentTypeConverter() default "";
  31. String batch() default "";
  32. }

@KafkaListener是Spring Framework中的一个注解,用于标识一个方法是一个Kafka消息监听器。当应用程序订阅了一个或多个Kafka主题时,可以使用@KafkaListener注解来指定要处理的消息。该注解可以放置在类级别或方法级别上。

在类级别上使用@KafkaListener注解,可以为所有方法定义一个默认的Kafka主题和其他配置属性。在方法级别上使用@KafkaListener注解,可以为每个方法定义自己的Kafka主题和其他配置属性。

当使用@KafkaListener注解标识一个方法时,Spring会自动创建一个Kafka消息监听器容器来监听指定的Kafka主题。当从该主题接收到消息时,监听器容器将调用带有@KafkaListener注解的方法来处理消息。

在底层,Spring使用Kafka Consumer API来实现Kafka消息消费者。当应用程序启动时,Spring会创建一个或多个Kafka Consumer实例,并将其配置为订阅指定的Kafka主题。当从该主题接收到消息时,Kafka Consumer将把消息传递给Spring的Kafka消息监听器容器。

具体来说,当使用@KafkaListener注解标识一个方法时,Spring会自动将该方法包装成一个Kafka消息监听器。该监听器会被注册到Kafka消息监听器容器中,以便在从指定的Kafka主题接收到消息时被调用。

当使用@KafkaListener注解时,需要配置Kafka相关的属性,例如Kafka服务器地址、消费者组ID、序列化器等。下面是一个示例代码:

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfig {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String bootstrapServers;
  6. @Value("${spring.kafka.consumer.group-id}")
  7. private String groupId;
  8. @Bean
  9. public Map<String, Object> consumerConfigs() {
  10. Map<String, Object> props = new HashMap<>();
  11. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  12. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  13. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  14. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  15. return props;
  16. }
  17. @Bean
  18. public ConsumerFactory<String, String> consumerFactory() {
  19. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  20. }
  21. @Bean
  22. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  23. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  24. factory.setConsumerFactory(consumerFactory());
  25. return factory;
  26. }
  27. @Bean
  28. public MyKafkaListener myKafkaListener() {
  29. return new MyKafkaListener();
  30. }
  31. }

在这个示例中,我们首先使用@EnableKafka注解启用Kafka支持。然后,我们定义了一个consumerConfigs()方法来设置Kafka消费者的属性,并使用@Bean注解将其注册到Spring容器中。接着,我们定义了一个consumerFactory()方法来创建Kafka消费者工厂,并使用@Bean注解将其注册到Spring容器中。最后,我们定义了一个kafkaListenerContainerFactory()方法来创建Kafka消息监听器容器,并使用@Bean注解将其注册到Spring容器中。

在这个示例中,我们还定义了一个MyKafkaListener类,并使用@Bean注解将其注册到Spring容器中。这个类带有@KafkaListener注解,并指定了要监听的Kafka主题。当从该主题接收到消息时,将调用带有@KafkaListener注解的方法来处理消息。

  1. public class MyKafkaListener {
  2. @KafkaListener(topics = "myTopic")
  3. public void listen(String message) {
  4. System.out.println("Received message: " + message);
  5. }
  6. }

在上面代码中,我们使用@KafkaListener注解来标识listen()方法,以便在从名为“myTopic”的Kafka主题接收到消息时调用该方法来处理消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/484007
推荐阅读
相关标签
  

闽ICP备14008679号