当前位置:   article > 正文

kafka配置消费者重试机制,设置最大重试次数,重试间隔毫秒_kafka重试机制配置

kafka重试机制配置

场景表述:

当我们有一个功能,需要脱离数据库,也就是说当我们数据库宕机后数据不受影响,数据库恢复后数据需要正常写入。而客户端是无感的。好了,现在场景明确咱就动手实现。

1.pom引入kafka依赖

注意:版本不同重试机制也需要调整

  1. <!-- Kafka依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.9.13</version>
  6. </dependency>

2.yml文件配置kafka

  1. kafka:
  2. bootstrap-servers: kafka地址:端口
  3. producer:
  4. bootstrap-servers: kafka地址:端口
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. retries: 3
  8. acks: 0
  9. consumer:
  10. group-id: kafka-pm-groupt
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.现在kafka基础配置已经完成了,现在开始配置我们的消费者重试机制啦

首先咱们需要在yml里面定义我们的重试机制与重试间隔毫秒(如下图)

  1. mq:
  2. kafkaConfig:
  3. # 重试次数
  4. max-attempts: 3
  5. # 重试间隔时间
  6. interval: 120000

然后再定义kafka实例(如下图)

  1. package com.rainnytech.dataapi.config;
  2. import cn.hutool.core.util.IdUtil;
  3. import com.rainnytech.dataapi.common.util.FileUtils;
  4. import com.rainnytech.dataapi.consumer.kafka.KafkaConsumer;
  5. import com.rainnytech.dataapi.service.KafkaMessageService;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.ConsumerFactory;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.kafka.listener.CommonErrorHandler;
  15. import org.springframework.kafka.listener.DefaultErrorHandler;
  16. import org.springframework.util.backoff.BackOff;
  17. import org.springframework.util.backoff.FixedBackOff;
  18. import javax.annotation.Resource;
  19. /**
  20. * @author Larkin.Long
  21. * @description kafka消息队列配置
  22. * @date 2024/5/17 11:31
  23. **/
  24. @Slf4j
  25. @Configuration
  26. public class KafkaConfig {
  27. @Value("${mq.config.kafkaConfig.topic}")
  28. private String topic;
  29. @Value("${scheduled.sharding.agentStatePath}")
  30. private String agentStatePath;
  31. /**
  32. * 间隔毫秒
  33. */
  34. @Value("${mq.config.kafkaConfig.interval}")
  35. private Long interval;
  36. /**
  37. * 最大重试次数
  38. */
  39. @Value("${mq.config.kafkaConfig.max-attempts}")
  40. private Long maxAttempts;
  41. @Resource
  42. private KafkaTemplate<Object, Object> template;
  43. @Bean
  44. public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
  45. ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
  46. ConsumerFactory<Object, Object> kafkaConsumerFactory,
  47. KafkaTemplate<Object, Object> template) {
  48. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  49. configurer.configure(factory, kafkaConsumerFactory);
  50. //最大重试次数
  51. factory.setCommonErrorHandler(commonErrorHandler());
  52. return factory;
  53. }
  54. public CommonErrorHandler commonErrorHandler() {
  55. // 创建 FixedBackOff 对象
  56. BackOff backOff = new FixedBackOff(interval, maxAttempts);
  57. // 当所有重试尝试都用尽时执行的逻辑
  58. return new DefaultErrorHandler((consumerRecord, exception) -> {
  59. // 重试失败后将消息生成文件保存到本地
  60. if (consumerRecord.topic().equals(topic)){
  61. String filePath = agentStatePath + topic + IdUtil.getSnowflakeNextId() + ".txt";
  62. FileUtils.uploadFile(filePath, consumerRecord.value().toString());
  63. }
  64. }, backOff);
  65. }
  66. @Bean
  67. public KafkaConsumer kafkaConsumer(KafkaMessageService kafkaMessageService) {
  68. log.info("【消费者】初始化kafka");
  69. return new KafkaConsumer(kafkaMessageService);
  70. }
  71. }

到这一步其实我们的重试机制已经配置完成了,我们的消费者消费失败后会触发我们配置的消费重试机制,当我们消费次数都用尽后则会将我们的消息生成文件保存到服务器上。

这时候我们只需要使用定时器定时获取本地文件然后重新发送kafka消费,直到数据库恢复数据写入成功,实现闭环。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/965606
推荐阅读
相关标签
  

闽ICP备14008679号