赞
踩
最近在接触Kafka, 在消费者消费消息出现异常的时候, 会看到Kafka会一直重复拉取信息, 10次异常后才不再继续.
由于我是配置了告警的, 一次异常这样一弄就是刷刷10条告警信息, 想要调整, 根据网上的资料, 增加代码配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory containerFactory( ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 最大重试次数3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
}, new FixedBackOff(5000, 3));
factory.setErrorHandler(seekToCurrentErrorHandler);
//设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
后面启动后一个个打断点, 最后发现这个实例被Spring创建了两次, 一个是我的, 一个是默认的.
org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
@Bean
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
}));
return factory;
}
问题就在于
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
我自己定义的factory的名称没有按这个命名, 所以它认为没有这个bean, 会自己在实例化一个.
设置下bean的名字.
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory containerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 最大重试次数3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
}, new FixedBackOff(5000, 3));
factory.setErrorHandler(seekToCurrentErrorHandler);
//设置提交偏移量的方式 ,否则出现异常的时候, 会报错No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
就正常了.测试下, 确实按照5秒间隔重试了3次.
接着从这里看
@Bean
@ConditionalOnMissingBean(
name = {"kafkaListenerContainerFactory"}
)
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, (ConsumerFactory)kafkaConsumerFactory.getIfAvailable(() -> {
return new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
}));
return factory;
}
这里创建了一个默认的factory,
点进去configurer.configure
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory, ConsumerFactory<Object, Object> consumerFactory) {
listenerFactory.setConsumerFactory(consumerFactory);
this.configureListenerFactory(listenerFactory);
this.configureContainer(listenerFactory.getContainerProperties());
}
点进去configureListenerFactory
public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; private MessageConverter messageConverter; private KafkaTemplate<Object, Object> replyTemplate; private KafkaAwareTransactionManager<Object, Object> transactionManager; private ConsumerAwareRebalanceListener rebalanceListener; private ErrorHandler errorHandler; private BatchErrorHandler batchErrorHandler; private AfterRollbackProcessor<Object, Object> afterRollbackProcessor; private RecordInterceptor<Object, Object> recordInterceptor; private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); Listener properties = this.properties.getListener(); properties.getClass(); map.from(properties::getConcurrency).to(factory::setConcurrency); map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.replyTemplate).to(factory::setReplyTemplate); if (properties.getType().equals(Type.BATCH)) { factory.setBatchListener(true); factory.setBatchErrorHandler(this.batchErrorHandler); } else { factory.setErrorHandler(this.errorHandler); } map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.recordInterceptor).to(factory::setRecordInterceptor); } .... }
可以看到这里设置了ErrorHandler, 这个类也是Spring管理的实例ConcurrentKafkaListenerContainerFactoryConfigurer
再来看spring初始化这个bean对象的地方:
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableKafka.class) class KafkaAnnotationDrivenConfiguration { ... private final ErrorHandler errorHandler; ... KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter, ObjectProvider<BatchMessageConverter> batchMessageConverter, ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate, ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager, ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler, ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor, ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.batchMessageConverter = batchMessageConverter .getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter)); this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique(); this.rebalanceListener = rebalanceListener.getIfUnique(); this.errorHandler = errorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.recordInterceptor = recordInterceptor.getIfUnique(); } @Bean @ConditionalOnMissingBean ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); MessageConverter messageConverterToUse = this.properties.getListener().getType().equals(Type.BATCH) ? this.batchMessageConverter : this.messageConverter; configurer.setMessageConverter((MessageConverter)messageConverterToUse); configurer.setReplyTemplate(this.kafkaTemplate); configurer.setTransactionManager(this.transactionManager); configurer.setRebalanceListener(this.rebalanceListener); configurer.setErrorHandler(this.errorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setRecordInterceptor(this.recordInterceptor); return configurer; }
根据截图可以看到这里初始化, ErrorHandler
是Null
所以回到上面的方法factory.setErrorHandler(this.errorHandler);
, 拿到的也是null
上面这里设置告一段落, 接着来看消费者实例这边org.springframework.kafka.listener.KafkaMessageListenerContainer
if (isRunning()) { return; } if (this.clientIdSuffix == null) { // stand-alone container checkTopics(); } ContainerProperties containerProperties = getContainerProperties(); checkAckMode(containerProperties); Object messageListener = containerProperties.getMessageListener(); if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener; ListenerType listenerType = determineListenerType(listener); this.listenerConsumer = new ListenerConsumer(listener, listenerType); ... }
创建监听消费者.
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) { Properties consumerProperties = propertiesFromProperties(); checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory); this.autoCommit = determineAutoCommit(consumerProperties); ... GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler(); ... this.errorHandler = determineErrorHandler(errHandler); ... } ... protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler) { return errHandler != null ? (ErrorHandler) errHandler : this.transactionManager != null ? null : new SeekToCurrentErrorHandler(); }
而它默认的ErrorHandler也是Null, 如下图所示
到这里, 才终于赋予了一个非null的ErrorHandler.
所以重点在于 new SeekToCurrentErrorHandler()
;
这里也是上面我们自定义的配置是否影响重试次数的地方, 如果我们自己配置了ConcurrentKafkaListenerContainerFactory
, 并设置了最大重试次数, 此时上面的
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
获取的就是我们设置的, 包含重试次数信息的errHandler.
它的构造函数中所做的事情:
public SeekToCurrentErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
super(recoverer, backOff);
}
这里的常量SeekUtils.DEFAULT_BACK_OFF
, 看看出自哪里.
public final class SeekUtils {
/**
* The number of times a topic/partition/offset can fail before being rejected.
*/
public static final int DEFAULT_MAX_FAILURES = 10;
/**
* The default back off - a {@link FixedBackOff} with 0 interval and
* {@link #DEFAULT_MAX_FAILURES} - 1 retry attempts.
*/
public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);
...
}
到这里才终于找到了这个10次的出处.
接着上面看, 调用了super类FailedRecordProcessor
的构造函数:
protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger);
this.classifier = configureDefaultClassifier();
}
这里将重试的配置给了failureTracker
.后面它就是主角了.
当消费异常时, 就会调用org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#invokeErrorHandler
方法
private void invokeErrorHandler(final ConsumerRecord<K, V> record, Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) { if (this.errorHandler instanceof RemainingRecordsErrorHandler) { if (this.producer == null) { processCommits(); } List<ConsumerRecord<?, ?>> records = new ArrayList<>(); records.add(record); while (iterator.hasNext()) { records.add(iterator.next()); } this.errorHandler.handle(decorateException(e), records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer); } else { this.errorHandler.handle(decorateException(e), record, this.consumer); } }
接着就调用errorHandler中的handle
方法
我们已经知道ErrorHandler
是org.springframework.kafka.listener.SeekToCurrentErrorHandler
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
getSkipPredicate(records, thrownException), this.logger, getLogLevel());
}
看方法getSkipPredicate
:
protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::skip;
}
.....
}
调用了failureTracker
的skip
方法.
boolean skip(ConsumerRecord<?, ?> record, Exception exception) { if (this.noRetries) { attemptRecovery(record, exception, null); return true; } Map<TopicPartition, FailedRecord> map = this.failures.get(); if (map == null) { this.failures.set(new HashMap<>()); map = this.failures.get(); } TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); FailedRecord failedRecord = map.get(topicPartition); if (failedRecord == null || failedRecord.getOffset() != record.offset()) { failedRecord = new FailedRecord(record.offset(), this.backOff.start()); map.put(topicPartition, failedRecord); } else { failedRecord.getDeliveryAttempts().incrementAndGet(); } long nextBackOff = failedRecord.getBackOffExecution().nextBackOff(); if (nextBackOff != BackOffExecution.STOP) { try { Thread.sleep(nextBackOff); } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } return false; } else { attemptRecovery(record, exception, topicPartition); map.remove(topicPartition); if (map.isEmpty()) { this.failures.remove(); } return true; } }
重点是:long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
获取重试的次数. 如果超出最大次数就返回-1.
public long nextBackOff() {
++this.currentAttempts;
return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;
}
如果是-1, 就进入attemptRecovery
方法打印异常日志.
这里其实就是调用recoverer
函数式接口BiConsumer.
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, LogAccessor logger) { Assert.notNull(backOff, "'backOff' cannot be null"); if (recoverer == null) { this.recoverer = (rec, thr) -> { Map<TopicPartition, FailedRecord> map = this.failures.get(); FailedRecord failedRecord = null; if (map != null) { failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition())); } logger.error(thr, "Backoff " + (failedRecord == null ? "none" : failedRecord.getBackOffExecution()) + " exhausted for " + ListenerUtils.recordToString(rec)); }; } else { this.recoverer = recoverer; } this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP; this.backOff = backOff; }
可以看到最后一次重试后确实打印了这个日志.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。