赞
踩
我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存
Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils
据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)
经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码
- public CommonErrorHandler commonErrorHandler() {
- BackOff backOff = new FixedBackOff(5000L, 3L);
- return new DefaultErrorHandler(backOff);
- }
然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了
我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset
- public CommonErrorHandler commonErrorHandler() {
- // 创建 FixedBackOff 对象
- BackOff backOff = new FixedBackOff(5000L, 3L);
- DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
- log.info("save to db " + record.value().toString());
- }, backOff);
- return defaultErrorHandler;
- }
如果你硬要提交也可以试试下面这种,指定提交当前的offset
- public CommonErrorHandler commonErrorHandler() {
- // 创建 FixedBackOff 对象
- BackOff backOff = new FixedBackOff(5000L, 3L);
- DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
- log.info("save to db " + record.value().toString());
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
- consumer.commitSync(offsets);
- }, backOff);
- return defaultErrorHandler;
- }
仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
-
- public CommonErrorHandler commonErrorHandler() {
- // 创建 FixedBackOff 对象
- BackOff backOff = new FixedBackOff(5000L, 3L);
-
- DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
-
- return defaultErrorHandler;
- }
ConsumerRecordRecoverer 接口总共就这2种实现方式
- kafka-consumer:
- bootstrapServers: 192.168.31.114:9092
- groupId: goods-center
- #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
- sessionTimeOut: 30000
- autoOffsetReset: latest
- #取消自动提交,即便如此 spring会帮助我们自动提交
- enableAutoCommit: false
- #自动提交间隔
- autoCommitInterval: 1000
- #拉取的最小字节
- fetchMinSize: 1
- #拉去最小字节的最大等待时间
- fetchMaxWait: 500
- maxPollRecords: 50
- #300秒的提交间隔,如果程序大于300秒提交,会报错
- maxPollInterval: 300000
- #心跳间隔
- heartbeatInterval: 10000
- keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer
- valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- package com.ychen.goodscenter.fafka;
-
- import lombok.Getter;
- import lombok.Setter;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- //指定配置文件的前缀
- @ConfigurationProperties(prefix = "kafka-consumer")
- @Getter
- @Setter
- public class KafkaListenerProperties {
-
- private String groupId;
-
- private String sessionTimeOut;
-
- private String bootstrapServers;
-
- private String autoOffsetReset;
-
- private boolean enableAutoCommit;
-
- private String autoCommitInterval;
-
- private String fetchMinSize;
-
- private String fetchMaxWait;
-
- private String maxPollRecords;
-
- private String maxPollInterval;
-
- private String heartbeatInterval;
-
- private String keyDeserializer;
-
- private String valueDeserializer;
-
- }
- package com.ychen.goodscenter.fafka;
-
- import com.alibaba.fastjson2.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.listener.*;
- import org.springframework.util.backoff.BackOff;
- import org.springframework.util.backoff.FixedBackOff;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- @EnableConfigurationProperties(KafkaListenerProperties.class)
- @Slf4j
- public class KafkaConsumerConfig {
- @Autowired
- private KafkaListenerProperties kafkaListenerProperties;
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- // 并发数 多个微服务实例会均分
- factory.setConcurrency(2);
- // factory.setBatchListener(true);
- factory.setCommonErrorHandler(commonErrorHandler());
-
- ContainerProperties containerProperties = factory.getContainerProperties();
- // 是否设置手动提交
- containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
-
- return factory;
- }
-
-
- private ConsumerFactory<String, String> consumerFactory() {
- Map<String, Object> consumerConfigs = consumerConfigs();
- log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
- return new DefaultKafkaConsumerFactory<>(consumerConfigs);
- }
-
-
- public CommonErrorHandler commonErrorHandler() {
- // 创建 FixedBackOff 对象
- BackOff backOff = new FixedBackOff(5000L, 3L);
-
- DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
- // DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
- // log.info("save to db " + record.value().toString());
- // Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- // offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
- // consumer.commitSync(offsets);
- // }, backOff);
- return defaultErrorHandler;
- }
-
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>();
- // 服务器地址
- propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());
- // 是否自动提交
- propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());
- // 自动提交间隔
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());
-
- //会话时间
- propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());
- //key序列化
- propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());
- //value序列化
- propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());
- // 心跳时间
- propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());
-
- // 分组id
- propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());
- //消费策略
- propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());
- // poll记录数
- propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());
- //poll时间
- propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());
-
- propsMap.put("spring.json.trusted.packages", "com.ychen.**");
-
- return propsMap;
- }
-
-
- }
- package com.ychen.goodscenter.fafka;
-
- import com.ychen.goodscenter.service.OrderService;
- import com.ychen.goodscenter.vo.req.SubmitOrderReq;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.dao.DuplicateKeyException;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
-
- @Component
- @Slf4j
- public class MessageListener {
- @Autowired
- private OrderService orderService;
-
- @KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory")
- public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
- log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId());
-
- try {
- log.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
-
- orderService.submitOrder(record.value());
- // 同步提交
- acknowledgment.acknowledge();
- log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
- } catch (DuplicateKeyException dupe) {
- // 处理异常情况
- log.error("order-message-topic message error DuplicateKeyException", dupe);
- // 重复数据,忽略掉,同步提交
- acknowledgment.acknowledge();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。