赞
踩
@KafkaListener源码:
- package org.springframework.kafka.annotation;
-
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Repeatable;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
- import org.springframework.messaging.handler.annotation.MessageMapping;
-
- @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
- @Retention(RetentionPolicy.RUNTIME)
- @MessageMapping
- @Documented
- @Repeatable(KafkaListeners.class)
- public @interface KafkaListener {
- String id() default "";
-
- String containerFactory() default "";
-
- String[] topics() default {};
-
- String topicPattern() default "";
-
- TopicPartition[] topicPartitions() default {};
-
- String containerGroup() default "";
-
- String errorHandler() default "";
-
- String groupId() default "";
-
- boolean idIsGroup() default true;
-
- String clientIdPrefix() default "";
-
- String beanRef() default "__listener";
-
- String concurrency() default "";
-
- String autoStartup() default "";
-
- String[] properties() default {};
-
- boolean splitIterables() default true;
-
- String contentTypeConverter() default "";
-
- String batch() default "";
- }
@KafkaListener是Spring Framework中的一个注解,用于标识一个方法是一个Kafka消息监听器。当应用程序订阅了一个或多个Kafka主题时,可以使用@KafkaListener注解来指定要处理的消息。该注解可以放置在类级别或方法级别上。
在类级别上使用@KafkaListener注解,可以为所有方法定义一个默认的Kafka主题和其他配置属性。在方法级别上使用@KafkaListener注解,可以为每个方法定义自己的Kafka主题和其他配置属性。
当使用@KafkaListener注解标识一个方法时,Spring会自动创建一个Kafka消息监听器容器来监听指定的Kafka主题。当从该主题接收到消息时,监听器容器将调用带有@KafkaListener注解的方法来处理消息。
在底层,Spring使用Kafka Consumer API来实现Kafka消息消费者。当应用程序启动时,Spring会创建一个或多个Kafka Consumer实例,并将其配置为订阅指定的Kafka主题。当从该主题接收到消息时,Kafka Consumer将把消息传递给Spring的Kafka消息监听器容器。
具体来说,当使用@KafkaListener注解标识一个方法时,Spring会自动将该方法包装成一个Kafka消息监听器。该监听器会被注册到Kafka消息监听器容器中,以便在从指定的Kafka主题接收到消息时被调用。
当使用@KafkaListener注解时,需要配置Kafka相关的属性,例如Kafka服务器地址、消费者组ID、序列化器等。下面是一个示例代码:
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${spring.kafka.consumer.group-id}")
- private String groupId;
-
- @Bean
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
- }
-
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
-
- @Bean
- public MyKafkaListener myKafkaListener() {
- return new MyKafkaListener();
- }
- }
在这个示例中,我们首先使用@EnableKafka注解启用Kafka支持。然后,我们定义了一个consumerConfigs()方法来设置Kafka消费者的属性,并使用@Bean注解将其注册到Spring容器中。接着,我们定义了一个consumerFactory()方法来创建Kafka消费者工厂,并使用@Bean注解将其注册到Spring容器中。最后,我们定义了一个kafkaListenerContainerFactory()方法来创建Kafka消息监听器容器,并使用@Bean注解将其注册到Spring容器中。
在这个示例中,我们还定义了一个MyKafkaListener类,并使用@Bean注解将其注册到Spring容器中。这个类带有@KafkaListener注解,并指定了要监听的Kafka主题。当从该主题接收到消息时,将调用带有@KafkaListener注解的方法来处理消息。
- public class MyKafkaListener {
-
- @KafkaListener(topics = "myTopic")
- public void listen(String message) {
- System.out.println("Received message: " + message);
- }
- }
在上面代码中,我们使用@KafkaListener注解来标识listen()方法,以便在从名为“myTopic”的Kafka主题接收到消息时调用该方法来处理消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。