当前位置:   article > 正文

@KafkaListener 详解及消息消费启停控制

@kafkalistener

参考:Kafka参数

一、@KafkaListener注解

  1. @KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
  2. public void listen(String msgData) {
  3. LOGGER.info("收到消息" + msgData);
  4. }
  5. @KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
  6. topics = Constants.TOPIC)
  7. public void listen2(String msgData) {
  8. LOGGER.info("收到消息" + msgData);
  9. }
  10. @KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
  11. public void listen3(String msgData) {
  12. LOGGER.info("收到消息" + msgData);
  13. }
  14. @KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
  15. public void listen4(String msgData) {
  16. LOGGER.info("收到消息" + msgData);
  17. }

(1) id: 默认是每个Listener实例的重要标识。

对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

(2) goupId: 每个消费者所属的组。

每个消费者都有自己所属的组。一个组中可以有多个消费者。

一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

  1. [Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
  2. [Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
  3. [Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
  4. [Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

(3) clientIdPrefix: 消费者clientId前缀

  1. @KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix", topics = Constants.TOPIC)
  2. public void listen2(String msgData) {
  3. LOGGER.info("收到消息" + msgData);
  4. }

如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

(4) autoStartup

  1. public @interface KafkaListener ...
  2. /**
  3. * Set to true or false, to override the default setting in the container factory. May
  4. * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
  5. * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
  6. * obtain the value.
  7. * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
  8. * @return true to auto start, false to not auto start.
  9. * @since 2.2
  10. */
  11. String autoStartup() default "";

是否自动启动,如果是 false,默认不生效,需要手动唤醒。

看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。

另外可以使用 ${} 占位符的形式,支持配置。

  1. application.yaml:
  2. listener:
  3. auto:
  4. startup: true
  5. java :
  6. @KafkaListener(... containerFactory = "batchContainerFactory",
  7. autoStartup = "${listener.auto.startup}")
  8. public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...
注:每个消费者实例对象内部持有两个属性。
boolean running
boolean paused
有几个改变状态的方法:
调用start()方法后,running转为true
调用stop()方法后,running转为false
调用pause()方法后,paused转为true
调用resume()方法后,paused转为false

只有running=true 、 paused=false 的消费者实例才能正常消费数据。
注解上的autoStartup改变的是running属性。
  1. @KafkaListener(id = "11111", groupId = "demo-group",
  2. topics = Constants.TOPIC, autoStartup = "false")
  3. public void listen(String msgData) throws InterruptedException {
  4. LOGGER.info("收到消息" + msgData);
  5. Thread.sleep(1000);
  6. }

二、Kafka Listener任务暂停及恢复

2.1 唤醒消费者实例, 示例代码:

  1. @Autowired
  2. private KafkaListenerEndpointRegistry registry;
  3. // 获取到id="11111" 的消费实例对象
  4. MessageListenerContainer listenerContainer =
  5. this.registry.getListenerContainer("11111");
  6. listenerContainer.pause(); //paused ==> true
  7. // listenerContainer.stop(); //running==> false

2.2 暂停消费者实例, 示例代码:

  1. @Autowired
  2. private KafkaListenerEndpointRegistry registry;
  3. // 获取到id="11111" 的消费实例对象
  4. MessageListenerContainer listenerContainer =
  5. this.registry.getListenerContainer("11111");
  6. listenerContainer.pause(); //paused ==> true
  7. // listenerContainer.stop(); //running==> false

2.3 定时任务自动启动

  1. @Autowired
  2. private KafkaListenerEndpointRegistry registry;
  3. // 定时器,每天凌晨0点开启监听
  4. @Scheduled(cron = "0 0 0 * * ?")
  5. public void startListener() {
  6. log.info("开启监听");
  7. // 判断监听容器是否启动,未启动则将其启动
  8. if (!registry.getListenerContainer("11111").isRunning()) {
  9. registry.getListenerContainer("11111").start();
  10. }
  11. registry.getListenerContainer("11111").resume();
  12. }
  13. // 定时器,每天早上10点关闭监听
  14. @Scheduled(cron = "0 0 10 * * ?")
  15. public void shutDownListener() {
  16. log.info("关闭监听");
  17. registry.getListenerContainer("11111").pause();
  18. }

三、@KafkaListener注解方法参数汇总

@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,主要依据kafka的具体配置。

  1. @KafkaListener(....)
  2. public void listen1(String data)
  3. @KafkaListener(....)
  4. public void listen2(ConsumerRecord<K,V> data)
  5. @KafkaListener(....)
  6. public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
  7. @KafkaListener(....)
  8. public void listen4(ConsumerRecord<K,V> data,
  9. Acknowledgment acknowledgment, Consumer<K,V> consumer)
  10. @KafkaListener(....)
  11. public void listen5(List<String> data)
  12. @KafkaListener(....)
  13. public void listen6(List<ConsumerRecord<K,V>> data)
  14. @KafkaListener(....)
  15. public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
  16. @KafkaListener(....)
  17. public void listen8(List<ConsumerRecord<K,V>> data,
  18. Acknowledgment acknowledgment, Consumer<K,V> consumer)

四、KafkaListenerContainerFactory配置

在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。

4.1 kafka参数默认封装对象

所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。

  1. @Autowired
  2. private KafkaProperties properties;

4.2 @KakfkaListener注解标记监听实例对象

如不特殊指定,默认使用在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。

监听器实例对象自动绑定到上述配置文件,是由于它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。

源码注释如下,如果不特殊指定,则默认的容器工厂将会被使用。

  1. package org.springframework.kafka.annotation;
  2. public @interface KafkaListener ...
  3. /**
  4. * The bean name of the {@link
  5. org.springframework.kafka.config.KafkaListenerContainerFactory}
  6. * to use to create the message listener container
  7. responsible to serve this endpoint.
  8. * <p>If not specified, the default container factory is used, if any.
  9. * @return the container factory bean name.
  10. */
  11. String containerFactory() default "";
默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。
  1. package org.springframework.boot.autoconfigure.kafka;
  2. class KafkaAnnotationDrivenConfiguration {
  3. @Bean
  4. @ConditionalOnMissingBean
  5. ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
  6. ConcurrentKafkaListenerContainerFactoryConfigurer configurer =
  7. new ConcurrentKafkaListenerContainerFactoryConfigurer();
  8. configurer.setKafkaProperties(this.properties);
  9. MessageConverter messageConverterToUse =
  10. (this.properties.getListener().getType().equals(Type.BATCH))
  11. ? this.batchMessageConverter : this.messageConverter;
  12. configurer.setMessageConverter(messageConverterToUse);
  13. configurer.setReplyTemplate(this.kafkaTemplate);
  14. configurer.setTransactionManager(this.transactionManager);
  15. configurer.setRebalanceListener(this.rebalanceListener);
  16. configurer.setErrorHandler(this.errorHandler);
  17. configurer.setBatchErrorHandler(this.batchErrorHandler);
  18. configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
  19. configurer.setRecordInterceptor(this.recordInterceptor);
  20. return configurer;
  21. }
  22. @Bean
  23. @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
  24. ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
  25. ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
  26. ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
  27. ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
  28. new ConcurrentKafkaListenerContainerFactory<>();
  29. configurer.configure(factory, kafkaConsumerFactory
  30. .getIfAvailable(() ->
  31. new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
  32. return factory;
  33. }

4.3 自定义容器工厂实例代码示例:

  1. @Autowired
  2. private KafkaProperties properties;
  3. @Bean("batchContainerFactory")
  4. public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
  5. ConcurrentKafkaListenerContainerFactory<?, ?> container =
  6. new ConcurrentKafkaListenerContainerFactory<>();
  7. Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
  8. stringObjectMap.put("enable.auto.commit", false);
  9. container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
  10. // 没有topic是否禁止系统启动
  11. container.setMissingTopicsFatal(true);
  12. // 并发
  13. container.setConcurrency(1);
  14. // 批量接收
  15. container.setBatchListener(true);
  16. // 如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
  17. container.getContainerProperties().setPollTimeout(5000);
  18. // 设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
  19. container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  20. // 设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
  21. // factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
  22. return container;
  23. }
  24. @KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
  25. public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
  26. LOGGER.info("4444收到消息" + list.size());
  27. acknowledgment.acknowledge();
  28. }

五、吞吐量

如下,这里我只列出了影响本例的几条参数。

  1. spring:
  2. kafka:
  3. consumer:
  4. enable-auto-commit: true
  5. # max-poll-records: 20
  6. listener:
  7. ack-mode: batch
  8. type: batch
  9. concurrency: 5

如果设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。

  1. @KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
  2. public void listen4(List<String> msgData) {
  3. LOGGER.info("收到消息" + msgData);
  4. }
  5. @KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
  6. public void listen5(List<String> msgData) {
  7. LOGGER.info("收到消息" + msgData);
  8. }
  9. @Bean
  10. public NewTopic newTopic() {
  11. return new NewTopic(Constants.TOPIC, 8, (short) 1);
  12. }
系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!
  1. [4444-2-C-1]: demo-group2: partitions assigned: []
  2. [4444-3-C-1]: demo-group2: partitions assigned: []
  3. [4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
  4. [4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
  5. [5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
  6. [5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
  7. [5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
  8. [4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
  9. [5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
  10. [5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]

六、 结论:

  1. concurrency值对应@KafkaListener的消费者实例线程数目,如果concurrency数量大于partition数量,多出的部分分配不到partition,会被闲置。

  1. 设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/483971
推荐阅读
相关标签
  

闽ICP备14008679号