当前位置:   article > 正文

【Kafka】SpringBoot 整合 Kafka 以及 @KafkaListener 注解的使用_kafkalistener注解

kafkalistener注解

一、前提已经安装好 kafka

我是在 windows 中安装的 Kafka,用于在本地测试用的

Windows 安装 kafka

二、新建 SpringBoot 项目

1、添加项目依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2、添加配置文件 application.properties

配置中用了批量消费

  1. # 指定kafka server的地址,集群配多个,中间,逗号隔开
  2. spring.kafka.bootstrap-servers=127.0.0.1:9092
  3. #重试次数
  4. spring.kafka.producer.retries=3
  5. #批量发送的消息数量
  6. spring.kafka.producer.batch-size=1000
  7. #32MB的批处理缓冲区
  8. spring.kafka.producer.buffer-memory=33554432
  9. #默认消费者组
  10. spring.kafka.consumer.group-id=crm-microservice-newperformance
  11. #最早未被消费的offset
  12. spring.kafka.consumer.auto-offset-reset=earliest
  13. #批量一次最大拉取数据量
  14. spring.kafka.consumer.max-poll-records=4000
  15. #是否自动提交
  16. spring.kafka.consumer.enable-auto-commit=false
  17. #自动提交时间间隔,单位ms
  18. spring.kafka.consumer.auto-commit-interval=1000
  19. #批消费并发量,小于或等于Topic的分区数
  20. spring.kafka.consumer.batch.concurrency = 3

3、创建一个 KafkaConfiguration 配置类

  1. package com.example.kafkademo.config;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  11. import org.springframework.kafka.core.*;
  12. import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
  13. import org.springframework.kafka.listener.ContainerProperties;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. /**
  17. * @author Frederic.Hu
  18. * @date 2022/05/25 18:00
  19. */
  20. @Configuration
  21. public class KafkaConfiguration {
  22. @Value("${spring.kafka.bootstrap-servers}")
  23. private String bootstrapServers;
  24. @Value("${spring.kafka.producer.retries}")
  25. private Integer retries;
  26. @Value("${spring.kafka.producer.batch-size}")
  27. private Integer batchSize;
  28. @Value("${spring.kafka.producer.buffer-memory}")
  29. private Integer bufferMemory;
  30. @Value("${spring.kafka.consumer.group-id}")
  31. private String groupId;
  32. @Value("${spring.kafka.consumer.auto-offset-reset}")
  33. private String autoOffsetReset;
  34. @Value("${spring.kafka.consumer.max-poll-records}")
  35. private Integer maxPollRecords;
  36. @Value("${spring.kafka.consumer.batch.concurrency}")
  37. private Integer batchConcurrency;
  38. @Value("${spring.kafka.consumer.enable-auto-commit}")
  39. private Boolean autoCommit;
  40. @Value("${spring.kafka.consumer.auto-commit-interval}")
  41. private Integer autoCommitInterval;
  42. /**
  43. * 生产者配置信息
  44. */
  45. @Bean
  46. public Map<String, Object> producerConfigs() {
  47. Map<String, Object> props = new HashMap<>();
  48. props.put(ProducerConfig.ACKS_CONFIG, "0");
  49. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  50. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  51. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  52. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  53. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  54. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  55. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  56. return props;
  57. }
  58. /**
  59. * 生产者工厂
  60. */
  61. @Bean
  62. public ProducerFactory<String, String> producerFactory() {
  63. return new DefaultKafkaProducerFactory<>(producerConfigs());
  64. }
  65. /**
  66. * 生产者模板
  67. */
  68. @Bean
  69. public KafkaTemplate<String, String> kafkaTemplate() {
  70. return new KafkaTemplate<>(producerFactory());
  71. }
  72. /**
  73. * 消费者配置信息
  74. */
  75. @Bean
  76. public Map<String, Object> consumerConfigs() {
  77. Map<String, Object> props = new HashMap<>();
  78. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  79. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  80. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  81. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  82. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  83. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  84. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
  85. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  86. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  87. return props;
  88. }
  89. /**
  90. * 消费者批量工厂
  91. */
  92. @Bean
  93. public KafkaListenerContainerFactory<?> batchFactory() {
  94. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  95. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  96. //设置并发量,小于或等于Topic的分区数
  97. factory.setConcurrency(batchConcurrency);
  98. factory.getContainerProperties().setPollTimeout(1500);
  99. //配置监听手动提交 ack,消费一条数据完后,立即提交
  100. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  101. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  102. factory.setBatchListener(true);
  103. return factory;
  104. }
  105. /**
  106. * 异常处理器
  107. */
  108. @Bean
  109. public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
  110. return (message,exception,consumer)->{
  111. System.out.println("消费异常:"+message.getPayload());
  112. return null;
  113. };
  114. }
  115. }

4、写一个向 Kafka 推送消费的测试类(生产者 producer)

  1. package com.example.kafkademo;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.test.context.junit4.SpringRunner;
  11. import java.util.*;
  12. /**
  13. * @author Frederic.Hu
  14. * @Description
  15. * @date 2022/05/25 17:46
  16. */
  17. @RunWith(SpringRunner.class)
  18. @SpringBootTest
  19. public class KafkaProducerTest {
  20. private final Logger logger = LoggerFactory.getLogger(getClass());
  21. @Autowired
  22. private KafkaTemplate<String, String> kafkaTemplate;
  23. @Test
  24. public void testSend(){
  25. Map<String, Object> map = new LinkedHashMap<>();
  26. map.put("username", "小明");
  27. map.put("userid", 1);
  28. map.put("age", 12);
  29. kafkaTemplate.send("test4", JSONObject.toJSONString(map)).addCallback(success -> {
  30. // 消息在分区内的offset
  31. long offset = success.getRecordMetadata().offset();
  32. logger.info("产线发送消息到kafka队列成功:{}, offset为:{}", JSONObject.toJSONString(map), offset);
  33. }, failure -> {
  34. logger.error("产线发送消息到kafka队列失败:{}, 报错信息为:{}", JSONObject.toJSONString(map), failure.getMessage());
  35. });
  36. }
  37. }

5、创建一个消费者(消费者 consumer)

  1. package com.example.kafkademo.listener;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.kafka.support.Acknowledgment;
  7. import org.springframework.stereotype.Component;
  8. import java.util.List;
  9. /**
  10. * @author Frederic.Hu
  11. * @Description
  12. * @date 2022/05/25 17:43
  13. */
  14. @Component
  15. public class BigDataTopicListener {
  16. private final Logger logger = LoggerFactory.getLogger(getClass());
  17. /**
  18. * 监听kafka数据(批量消费)
  19. * @param consumerRecords
  20. * @param ack
  21. */
  22. @KafkaListener(id = "operation", topics = {"test4"}, containerFactory = "batchFactory", errorHandler="consumerAwareErrorHandler")
  23. public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
  24. long start = System.currentTimeMillis();
  25. //...
  26. //db.batchSave(consumerRecords);//批量插入或者批量更新数据
  27. for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
  28. logger.info("消费的每条数据为:{}", consumerRecord.value());
  29. }
  30. //手动提交
  31. ack.acknowledge();
  32. logger.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
  33. }
  34. }

6、启动测试类,查看控制台

三、过程中的遇到的一些坑及总结

1、Kafka 中 topic 不存在的话,启动项目会报错

解决办法:启动项目之前,先在 Kafka 中创建好自己定义的 topic 名称,也可以在配置类中写一个自动创建 topic,但是出现一个问题,项目上线每个 Kafka 的集群数都不一样,自动创建 topic 时,分区数和副本数不好设置,设置不合理,启动项目是会报错的。

2、生产者生产消息是否成功怎么看?

解决办法:kafkaTemplate 提供了一个回调方法 addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理。

3、消费者消费消息报错了怎么办?

解决办法:新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用 @Bean 注入,BeanName 默认就是方法名,然后我们将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

4、消费不同的 topic 中的数据,消费者组(group id)如果用的是同一个,消费时会报错的

解决办法:@KafkaListener 中的 id 监听器使用不同的名称,如果配置文属性配置了默认消费组(group id),注解中的 监听器 id 会覆盖默认的消费组(group id)。

5、重复消费了数据,怎么办?

原因:消费者宕机、重启或者被强行 kill 进程,导致消费者消费的 offset 没有提交。或者消费后的数据,当 offset 还没有提交时,Partition 就断开连接。

解决办法:我目前项目中,是消费的数据插入到 MySQL 中的,如果重复消费了,插入到数据库中的时候,会查询该主键已经在数据库存在,则更新该条数据。

四、参考文档

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/879722
推荐阅读
相关标签
  

闽ICP备14008679号