赞
踩
场景表述:
当我们有一个功能,需要脱离数据库,也就是说当我们数据库宕机后数据不受影响,数据库恢复后数据需要正常写入。而客户端是无感的。好了,现在场景明确咱就动手实现。
1.pom引入kafka依赖
注意:版本不同重试机制也需要调整
- <!-- Kafka依赖 -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.9.13</version>
- </dependency>
2.yml文件配置kafka
- kafka:
- bootstrap-servers: kafka地址:端口
- producer:
- bootstrap-servers: kafka地址:端口
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- retries: 3
- acks: 0
- consumer:
- group-id: kafka-pm-groupt
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.现在kafka基础配置已经完成了,现在开始配置我们的消费者重试机制啦
首先咱们需要在yml里面定义我们的重试机制与重试间隔毫秒(如下图)
- mq:
- kafkaConfig:
- # 重试次数
- max-attempts: 3
- # 重试间隔时间
- interval: 120000
然后再定义kafka实例(如下图)
- package com.rainnytech.dataapi.config;
-
- import cn.hutool.core.util.IdUtil;
- import com.rainnytech.dataapi.common.util.FileUtils;
- import com.rainnytech.dataapi.consumer.kafka.KafkaConsumer;
- import com.rainnytech.dataapi.service.KafkaMessageService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.listener.CommonErrorHandler;
- import org.springframework.kafka.listener.DefaultErrorHandler;
- import org.springframework.util.backoff.BackOff;
- import org.springframework.util.backoff.FixedBackOff;
-
- import javax.annotation.Resource;
-
- /**
- * @author Larkin.Long
- * @description kafka消息队列配置
- * @date 2024/5/17 11:31
- **/
- @Slf4j
- @Configuration
- public class KafkaConfig {
- @Value("${mq.config.kafkaConfig.topic}")
- private String topic;
- @Value("${scheduled.sharding.agentStatePath}")
- private String agentStatePath;
-
- /**
- * 间隔毫秒
- */
- @Value("${mq.config.kafkaConfig.interval}")
- private Long interval;
-
- /**
- * 最大重试次数
- */
- @Value("${mq.config.kafkaConfig.max-attempts}")
- private Long maxAttempts;
-
- @Resource
- private KafkaTemplate<Object, Object> template;
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
- ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
- ConsumerFactory<Object, Object> kafkaConsumerFactory,
- KafkaTemplate<Object, Object> template) {
- ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
- configurer.configure(factory, kafkaConsumerFactory);
- //最大重试次数
- factory.setCommonErrorHandler(commonErrorHandler());
- return factory;
- }
-
- public CommonErrorHandler commonErrorHandler() {
- // 创建 FixedBackOff 对象
- BackOff backOff = new FixedBackOff(interval, maxAttempts);
- // 当所有重试尝试都用尽时执行的逻辑
- return new DefaultErrorHandler((consumerRecord, exception) -> {
- // 重试失败后将消息生成文件保存到本地
- if (consumerRecord.topic().equals(topic)){
- String filePath = agentStatePath + topic + IdUtil.getSnowflakeNextId() + ".txt";
- FileUtils.uploadFile(filePath, consumerRecord.value().toString());
- }
- }, backOff);
- }
-
- @Bean
- public KafkaConsumer kafkaConsumer(KafkaMessageService kafkaMessageService) {
- log.info("【消费者】初始化kafka");
- return new KafkaConsumer(kafkaMessageService);
- }
-
- }
到这一步其实我们的重试机制已经配置完成了,我们的消费者消费失败后会触发我们配置的消费重试机制,当我们消费次数都用尽后则会将我们的消息生成文件保存到服务器上。
这时候我们只需要使用定时器定时获取本地文件然后重新发送kafka消费,直到数据库恢复数据写入成功,实现闭环。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。