当前位置:   article > 正文

Spring-Kafka 3.0 消费者消费失败处理方案_kafka默认重试次数

kafka默认重试次数

一、背景

我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存

二、设置消费失败重试次数

1 默认重试次数在哪里看

Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils

2 如何修改重试次数

据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)

经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码

  1. public CommonErrorHandler commonErrorHandler() {
  2. BackOff backOff = new FixedBackOff(5000L, 3L);
  3. return new DefaultErrorHandler(backOff);
  4. }

然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了

三、设置消费失败处理方式

1 保存到数据库重试

我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset

  1. public CommonErrorHandler commonErrorHandler() {
  2. // 创建 FixedBackOff 对象
  3. BackOff backOff = new FixedBackOff(5000L, 3L);
  4. DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
  5. log.info("save to db " + record.value().toString());
  6. }, backOff);
  7. return defaultErrorHandler;
  8. }

如果你硬要提交也可以试试下面这种,指定提交当前的offset

  1. public CommonErrorHandler commonErrorHandler() {
  2. // 创建 FixedBackOff 对象
  3. BackOff backOff = new FixedBackOff(5000L, 3L);
  4. DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
  5. log.info("save to db " + record.value().toString());
  6. Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  7. offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
  8. consumer.commitSync(offsets);
  9. }, backOff);
  10. return defaultErrorHandler;
  11. }

2 发送到Kafka死信队列

仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT

  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate;
  3. public CommonErrorHandler commonErrorHandler() {
  4. // 创建 FixedBackOff 对象
  5. BackOff backOff = new FixedBackOff(5000L, 3L);
  6. DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
  7. return defaultErrorHandler;
  8. }
ConsumerRecordRecoverer 接口总共就这2种实现方式

四、整体消费者代码粘贴

1 application.yml

  1. kafka-consumer:
  2. bootstrapServers: 192.168.31.114:9092
  3. groupId: goods-center
  4. #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
  5. sessionTimeOut: 30000
  6. autoOffsetReset: latest
  7. #取消自动提交,即便如此 spring会帮助我们自动提交
  8. enableAutoCommit: false
  9. #自动提交间隔
  10. autoCommitInterval: 1000
  11. #拉取的最小字节
  12. fetchMinSize: 1
  13. #拉去最小字节的最大等待时间
  14. fetchMaxWait: 500
  15. maxPollRecords: 50
  16. #300秒的提交间隔,如果程序大于300秒提交,会报错
  17. maxPollInterval: 300000
  18. #心跳间隔
  19. heartbeatInterval: 10000
  20. keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer
  21. valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer

2 KafkaListenerProperties

  1. package com.ychen.goodscenter.fafka;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import org.springframework.boot.context.properties.ConfigurationProperties;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. //指定配置文件的前缀
  8. @ConfigurationProperties(prefix = "kafka-consumer")
  9. @Getter
  10. @Setter
  11. public class KafkaListenerProperties {
  12. private String groupId;
  13. private String sessionTimeOut;
  14. private String bootstrapServers;
  15. private String autoOffsetReset;
  16. private boolean enableAutoCommit;
  17. private String autoCommitInterval;
  18. private String fetchMinSize;
  19. private String fetchMaxWait;
  20. private String maxPollRecords;
  21. private String maxPollInterval;
  22. private String heartbeatInterval;
  23. private String keyDeserializer;
  24. private String valueDeserializer;
  25. }

3 KafkaConsumerConfig

  1. package com.ychen.goodscenter.fafka;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  11. import org.springframework.kafka.core.ConsumerFactory;
  12. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.kafka.listener.*;
  15. import org.springframework.util.backoff.BackOff;
  16. import org.springframework.util.backoff.FixedBackOff;
  17. import java.util.HashMap;
  18. import java.util.Map;
  19. @Configuration
  20. @EnableConfigurationProperties(KafkaListenerProperties.class)
  21. @Slf4j
  22. public class KafkaConsumerConfig {
  23. @Autowired
  24. private KafkaListenerProperties kafkaListenerProperties;
  25. @Autowired
  26. private KafkaTemplate<String, String> kafkaTemplate;
  27. @Bean
  28. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  29. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  30. factory.setConsumerFactory(consumerFactory());
  31. // 并发数 多个微服务实例会均分
  32. factory.setConcurrency(2);
  33. // factory.setBatchListener(true);
  34. factory.setCommonErrorHandler(commonErrorHandler());
  35. ContainerProperties containerProperties = factory.getContainerProperties();
  36. // 是否设置手动提交
  37. containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  38. return factory;
  39. }
  40. private ConsumerFactory<String, String> consumerFactory() {
  41. Map<String, Object> consumerConfigs = consumerConfigs();
  42. log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
  43. return new DefaultKafkaConsumerFactory<>(consumerConfigs);
  44. }
  45. public CommonErrorHandler commonErrorHandler() {
  46. // 创建 FixedBackOff 对象
  47. BackOff backOff = new FixedBackOff(5000L, 3L);
  48. DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
  49. // DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
  50. // log.info("save to db " + record.value().toString());
  51. // Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  52. // offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
  53. // consumer.commitSync(offsets);
  54. // }, backOff);
  55. return defaultErrorHandler;
  56. }
  57. public Map<String, Object> consumerConfigs() {
  58. Map<String, Object> propsMap = new HashMap<>();
  59. // 服务器地址
  60. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());
  61. // 是否自动提交
  62. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());
  63. // 自动提交间隔
  64. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());
  65. //会话时间
  66. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());
  67. //key序列化
  68. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());
  69. //value序列化
  70. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());
  71. // 心跳时间
  72. propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());
  73. // 分组id
  74. propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());
  75. //消费策略
  76. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());
  77. // poll记录数
  78. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());
  79. //poll时间
  80. propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());
  81. propsMap.put("spring.json.trusted.packages", "com.ychen.**");
  82. return propsMap;
  83. }
  84. }

4 MessageListener

  1. package com.ychen.goodscenter.fafka;
  2. import com.ychen.goodscenter.service.OrderService;
  3. import com.ychen.goodscenter.vo.req.SubmitOrderReq;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.dao.DuplicateKeyException;
  8. import org.springframework.kafka.annotation.KafkaListener;
  9. import org.springframework.kafka.support.Acknowledgment;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. @Slf4j
  13. public class MessageListener {
  14. @Autowired
  15. private OrderService orderService;
  16. @KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory")
  17. public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
  18. log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId());
  19. try {
  20. log.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
  21. orderService.submitOrder(record.value());
  22. // 同步提交
  23. acknowledgment.acknowledge();
  24. log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
  25. } catch (DuplicateKeyException dupe) {
  26. // 处理异常情况
  27. log.error("order-message-topic message error DuplicateKeyException", dupe);
  28. // 重复数据,忽略掉,同步提交
  29. acknowledgment.acknowledge();
  30. }
  31. }
  32. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/580853
推荐阅读
相关标签
  

闽ICP备14008679号