当前位置:   article > 正文

SpringBoot 集成 Kafka 配置_spring kafka配置

spring kafka配置

原生模式

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>

自定义分区器

  1. /**
  2. * 自定义分区器
  3. *
  4. * @Author: chen yang
  5. * @Date: 2023/5/7 11:34
  6. */
  7. public class CustomerPartitioner implements Partitioner {
  8. @Override
  9. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  10. List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
  11. int partition = 2;
  12. return partitionInfos.size() % partition;
  13. }
  14. @Override
  15. public void close() {
  16. }
  17. @Override
  18. public void configure(Map<String, ?> configs) {
  19. }
  20. }

生产者

  1. /**
  2. * @Author: chen yang
  3. * @Date: 2023/5/6 21:59
  4. */
  5. public class Producer {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. Properties properties = new Properties();
  8. // 连接集群
  9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  10. // 序列化类型
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13. // 添加自定义分区器
  14. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class.getName());
  15. // batch.size:批次大小,默认 16K
  16. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  17. // linger.ms:等待时间,默认 0
  18. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  19. // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
  20. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  21. // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
  22. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  23. // 设置 ack
  24. // 0:不需要等待数据落盘应答,1:需要等 leader 落盘应答,-1(all):需要等 leader 和所有的 follower(isr队列) 落盘应答
  25. // type: String, valid values [all, -1, 0, 1], default: all
  26. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  27. // 设置重试次数,默认为 int 最大值
  28. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  29. // 设置事务id
  30. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "default_transactional_id_23");
  31. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  32. // 开启事务
  33. producer.initTransactions();
  34. producer.beginTransaction();
  35. try {
  36. // 异步发送
  37. producer.send(new ProducerRecord<>("test", "this is async message"));
  38. producer.send(new ProducerRecord<>("test", "this is a async rollback message!"), new Callback() {
  39. @Override
  40. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  41. // 发送消息失败会自动重试,不需要在回调函数中手动重试
  42. if (Objects.isNull(e)){
  43. System.out.println("result: " + recordMetadata.topic() + ", partitions: " + recordMetadata.partition());
  44. }
  45. }
  46. });
  47. // 同步发送,只需要在异步发送的基础上再调用 get() 犯法即可
  48. producer.send(new ProducerRecord<>("test", 1,"","this is sync message")).get();
  49. producer.commitTransaction();
  50. }catch (Exception e){
  51. producer.abortTransaction();
  52. }finally {
  53. producer.close();
  54. }
  55. }
  56. }

消费者

  1. /**
  2. * @Author: chen yang
  3. * @Date: 2023/7/9 10:37
  4. */
  5. public class Consumer {
  6. public static void main(String[] args) {
  7. Properties properties = new Properties();
  8. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  9. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_01");
  11. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  12. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  13. try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
  14. // 订阅 topic
  15. // ArrayList<String> topics = new ArrayList<>();
  16. // consumer.subscribe(topics);
  17. // 订阅 topic 下的 partition
  18. ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
  19. topicPartitions.add(new TopicPartition("night_topic", 1));
  20. consumer.assign(topicPartitions);
  21. // 从指定的 offset 开始消费
  22. // consumer.seek(new TopicPartition("night_topic", 1), 3);
  23. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
  24. consumerRecords.forEach(System.out::println);
  25. // 手动提交 offset
  26. consumer.commitAsync();
  27. } catch (Exception e) {
  28. throw new RuntimeException(e);
  29. }
  30. }
  31. }

KafkaTemplate

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

配置文件

  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. producer:
  5. batch-size: 16384
  6. acks: -1
  7. retries: 10
  8. transaction-id-prefix: transaction_05
  9. buffer-memory: 33554432
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. properties:
  13. linger:
  14. ms: 2000
  15. partitioner:
  16. class: com.night.config.CustomerPartitionHandler
  17. consumer:
  18. group-id: g_01
  19. enable-auto-commit: false
  20. auto-offset-reset: latest
  21. max-poll-records: 500
  22. # auto-commit-interval: 2000 autoCommit = false
  23. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  25. properties:
  26. session:
  27. timeout:
  28. ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
  29. request:
  30. timeout:
  31. ms: 18000
  32. listener:
  33. missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
  34. type: batch

自定义分区器

  1. /**
  2. * @Author: chen yang
  3. * @Date: 2023/7/8 11:02
  4. */
  5. @Component
  6. public class CustomerPartitionHandler implements Partitioner {
  7. @Override
  8. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  9. if (value.toString().contains("二")){
  10. return 2;
  11. }else if (value.toString().contains("一")){
  12. return 1;
  13. }else {
  14. return 0;
  15. }
  16. }
  17. @Override
  18. public void close() {
  19. }
  20. @Override
  21. public void configure(Map<String, ?> configs) {
  22. }
  23. }

生产者

  1. @RestController
  2. @RequiredArgsConstructor
  3. public class HelloController {
  4. private final KafkaTemplate<String, String> kafkaTemplate;
  5. @GetMapping("/send")
  6. @Transactional // 配置文件中设置了事务id,那么启用时要加上 该注解或者使用 kafka 事务处理
  7. public Boolean send(String msg){
  8. for (int i = 0; i < 10; i++) {
  9. kafkaTemplate.send("night.topic", null, "night key - " + i, msg + " - " + i).addCallback(success -> {
  10. // 成功回调
  11. if (success == null || success.getRecordMetadata() == null){
  12. return;
  13. }
  14. String topic = success.getRecordMetadata().topic();
  15. int partition = success.getRecordMetadata().partition();
  16. long offset = success.getRecordMetadata().offset();
  17. String key = success.getProducerRecord().key();
  18. System.out.println("send topic:" + topic +", partition: " + partition + ", key:" + key + ", offset: " + offset);
  19. }, failure -> {
  20. // 失败回调
  21. System.out.println("发送消息失败:" + failure.getMessage());
  22. });
  23. }
  24. return true;
  25. }
  26. }

消费者配置

消费数据过滤
  1. /**
  2. * 消费数据过滤
  3. *
  4. * @Author: chen yang
  5. * @Date: 2023/7/8 12:20
  6. */
  7. @Component
  8. public class ConsumerFilterStrategy implements RecordFilterStrategy<String, String> {
  9. @Override
  10. public boolean filter(ConsumerRecord<String, String> consumerRecord) {
  11. // return true: 丢弃消息
  12. return consumerRecord.value().contains("无效数据");
  13. }
  14. }

消费异常处理类
  1. /**
  2. * 消费异常处理类
  3. *
  4. * @Author: chen yang
  5. * @Date: 2023/7/8 11:55
  6. */
  7. @Component
  8. public class ConsumerExceptionHandler implements ConsumerAwareListenerErrorHandler {
  9. @Override
  10. public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
  11. System.out.println("消费异常:" + message.getPayload() + ", ex: " + e.getMessage());
  12. return null;
  13. }
  14. }

消费者配置 
  1. /**
  2. * @Author: chen yang
  3. * @Date: 2023/7/8 12:22
  4. */
  5. @Configuration
  6. @RequiredArgsConstructor
  7. public class KafkaConsumerConfig {
  8. private final KafkaTemplate<String, String> kafkaTemplate;
  9. private final ConsumerFilterStrategy consumerFilterStrategy;
  10. @Bean
  11. public ConcurrentKafkaListenerContainerFactory<String, String> filterContainerFactory(){
  12. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  13. // @SendTo 使用,console exception: a KafkaTemplate is required to support replies
  14. factory.setReplyTemplate(kafkaTemplate);
  15. factory.setConsumerFactory(initConsumerFactory());
  16. // 设置并发量,小于或等于Topic的分区数,并且要在consumerFactory设置一次拉取的数量
  17. factory.setConcurrency(1);
  18. // 设置为批量监听
  19. factory.setBatchListener(true);
  20. // 配合RecordFilterStrategy使用,被过滤的信息将被丢弃
  21. factory.setAckDiscarded(true);
  22. factory.setRecordFilterStrategy(consumerFilterStrategy);
  23. return factory;
  24. }
  25. @Bean
  26. public ConsumerFactory<String, String> initConsumerFactory(){
  27. HashMap<String, Object> configs = new HashMap<>();
  28. configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  29. configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_id_02");
  30. configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  31. configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
  32. configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  33. configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
  34. configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  35. configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  36. return new DefaultKafkaConsumerFactory<>(configs);
  37. }
  38. }

@KafkaListener

  1. @Component
  2. public class HelloListener {
  3. @KafkaListener(topicPartitions = {
  4. @TopicPartition(topic = "night.topic", partitions = {"0"})
  5. }, errorHandler = "consumerExceptionHandler", containerFactory = "filterContainerFactory")
  6. public void consumer0(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
  7. System.out.println("first consumer receive list size: " + records.size());
  8. records.forEach(System.out::println);
  9. consumer.commitSync();
  10. }
  11. @KafkaListener(topicPartitions = {
  12. @TopicPartition(topic = "night.topic", partitions = {"1"})
  13. }, errorHandler = "consumerExceptionHandler")
  14. @SendTo("singleTopic")
  15. public String consumer1(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
  16. System.out.println("second consumer receive list size: " + records.size());
  17. records.forEach(System.out::println);
  18. consumer.commitSync();
  19. return "@SendTo annotation msg";
  20. }
  21. @KafkaListener(topicPartitions = {
  22. @TopicPartition(topic = "night.topic", partitions = {"2"})
  23. }, errorHandler = "consumerExceptionHandler")
  24. public void consumer2(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
  25. System.out.println("third consumer receive list size: " + records.size());
  26. records.forEach(System.out::println);
  27. consumer.commitSync();
  28. }
  29. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/951057
推荐阅读
相关标签
  

闽ICP备14008679号