赞
踩
Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。
props.setProperty("enable.auto.commit", "true");
此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间点程序就被kill掉了。
关闭自动提交,采用异步提交+同步提交的方式来提交offsect。
// 关闭自动提交 props.setProperty("enable.auto.commit", "false"); // 消费逻辑 try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { // 具体业务逻辑 } consumer.commitAsync(); } System.out.println("while end."); } catch (Exception e) { System.err.println("consume error..." + e.getMessage()); } finally { try { consumer.commitSync(); System.out.println("commit sync suc."); } catch (Exception e) { System.err.println("commit sync error." + e.getMessage()); } finally { consumer.close(); System.out.println("close."); } }
这样还不够,当kill掉程序的时候,会发现并没有走到finally中。说明线程非正常停止。
1.使用线程池来运行线程
2.在实例销毁前使用结束标志手动停止线程
3.使用CountDownLatch等待线程停止
第一步:定义线程池
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(cpuCoreNum);
threadPoolTaskExecutor.setMaxPoolSize(cpuCoreNum * 2);
threadPoolTaskExecutor.setQueueCapacity(2000);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setThreadNamePrefix("global_thread_pool_task_executor");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(10);// 确保该值是线程池中各个线程阻塞的最大时长
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
此处两个配置参数至关重要
setWaitForTasksToCompleteOnShutdown(true)表示等待正在进行和排队的任务完成。
threadPoolTaskExecutor.setAwaitTerminationSeconds(10)虽然我们已经配置为等待正在进行和排队的任务完成,但Spring仍然会继续关闭容器的其余部分。这可能会释放任务执行器所需的资源,并导致任务失败。配置这个最大等待时间可以确保在指定的时间段内,容器级别的关闭过程将被阻止。
等待时间设置多少具体看线程池中业务线程最大耗时来定。
如果不停止线程,就会超过线程池的等待时间。通过以下WARN日志可以发现,在停止线程池的时候仍然存在业务线程没有停掉的情况,所以还需要定义一个标志来手动停止线程。
WARN 11472 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Timed out while waiting for executor 'threadPoolTaskExecutor' to terminate
第二步:定义结束标志,并在对象销毁前停止线程
// 线程中断标志
public volatile boolean running = true;
while (running) {
...
}
然后再实现DisposableBean接口中的destroy方法,在实例销毁之前将running置为false停止线程
@Override
public void destroy() throws Exception {
this.running = false; // 循环并非立即停止,而是等到当前执行的循环体执行结束才会停止,所以这个地方的等待时间需要与线程池中的setAwaitTerminationSeconds参数相对应
}
当destroy方法运行结束,系统就会销毁掉当前实例,接着就会开始销毁当前实例的依赖(没有被其它实例所引用的话),而此时需要注意的是线程其实并没有运行结束。所以问题出现了:线程还在运行中,而运行所需要的资源(比如Redis连接资源)被提前关闭了,就会导致异常出现。所以在将running置为false之后还需要使用CountDwonLatch等待线程结束,再接着销毁其它依赖。
此处省略第三步,直接上完整的样例代码:
@Component public class ConsumerClosedSafely implements CommandLineRunner, DisposableBean { private volatile boolean running = true; private final CountDownLatch latch = new CountDownLatch(1); private final String[] topics = new String[]{"test"}; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; public void consume() throws Exception{ Properties props = new Properties(); //TODO 其它属性 props.setProperty("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topics)); // 消费逻辑 try { while (running) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { //TODO 具体业务逻辑 } consumer.commitAsync(); } System.out.println("while end."); } catch (Exception e) { System.err.println("consume error..." + e.getMessage()); } finally { try { consumer.commitSync(); System.out.println("commit sync suc."); } catch (Exception e) { System.err.println("commit sync error." + e.getMessage()); } finally { consumer.close(); System.out.println("close."); // 计数器减一 latch.countDown(); System.out.println("latch count down ."); } } } @Override public void run(String... args) throws Exception { Runnable r = ()->{ try { consume(); } catch (Exception e) { System.exit(1); } }; threadPoolTaskExecutor.execute(r); } @Override public void destroy() throws Exception { // 终止循环 this.running= false; // 等待运行结束 latch.await(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。