背景
很多时候服务都有平滑退出的需求,例如RPC服务在停止之后需要从注册服务摘除节点、从消息队列已经消费的消息需要正常处理完成等。一般地我们希望能让服务在退出前能执行完当前正在执行的任务,这个时候就需要我们在JVM关闭的时候运行一些清理现场的代码。
方案
ShutdownHook
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,允许用户注册一个JVM关闭的钩子。这个钩子可以在以下几种场景被调用:
- 程序正常退出;
- 使用System.exit();
- 终端使用Ctrl+C触发的终端;
- 系统关闭;
- 使用kill pid命令干掉进程;
一般地发布系统会通过kill命令来停止服务。这个时候服务可以接收到关闭信号并执行钩子程序进行清理工作。
场景示例
假设以下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当我们停止服务的时候,希望队列的消息都能正常处理完成,代码示例如下:
- /**
- * 服务关闭测试
- */
- public class ShutDownTest {
-
- private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
-
- private static AtomicLong taskId = new AtomicLong(0);
-
- // 生产任务
- private static class ProduceTask implements Runnable {
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- @Override
- public void run() {
- while (!stopped.get()) {
- long element = taskId.incrementAndGet();
- queue.add(element);
- System.out.println("add element : " + element);
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- }
- }
- }
-
- public void setStopped() {
- stopped.compareAndSet(false, true);
- System.out.println("stop producer.");
- }
- }
-
- // 消费任务
- private static class ConsumeTask implements Runnable {
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- @Override
- public void run() {
- while (!stopped.get() || queue.size() > 0) {
- try {
- long element = queue.take();
- System.out.println("consume element : " + element);
- doWork();
- } catch (InterruptedException e) {
- }
- }
- }
-
- private void doWork() {
- try {
- // 消费速度比生产速度稍慢,模拟积压情况
- Thread.sleep(60);
- } catch (InterruptedException e) {
- }
- }
-
- public void setStopped() {
- stopped.compareAndSet(false, true);
- System.out.println("stop consumer.");
- }
- }
-
- public static void main(String[] args) {
- final ProduceTask producerTask = new ProduceTask();
- final Thread producerThread = new Thread(producerTask);
-
- final ConsumeTask consumeTask = new ConsumeTask();
- Thread consumeThread = new Thread(consumeTask);
-
- // 先启动消费
- consumeThread.start();
- // 再启动生产
- producerThread.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- System.out.println("try close...");
- // 先关闭生产
- producerTask.setStopped();
- // 再关闭消费
- consumeTask.setStopped();
- try {
- System.out.println("close wait...");
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- System.out.println("close finished...");
- }));
- }
- }
- 复制代码
执行结果如下所示,可以看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。
潜在问题
在使用ShutdownHook的时候,我们往往控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,假若是独立注册钩子,在更复杂的项目里面是不是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,我们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),然后就会偶尔出现服务无法关闭的问题。原因正是线程池先被关闭,kafka队列却还在消费消息,导致消费线程一直在等待。
Signal
Java同时提供了signal信号机制,我们的服务也可以接收到关闭信号。
使用Signal机制有以下原因:
- ShutdownHook执行顺序无法保障,第三方组件也可能注册,导致业务自定义的退出流程依赖的资源会被提前关闭和清理;
- Signal是非公开API,第三方组件基本很少使用,我们可以在内部托管服务关闭的执行顺序;
- 在完成清理工作后可以执行exit调用,保证资源清理不会影响ShutdownHook的退出清理逻辑;
这里核心的原因还是希望能完全保证服务关闭的顺序,避免出现问题。我们在服务内部按顺序维护关闭任务,上述代码调整后如下所示:
- public class TermHelper {
-
- private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
- private static AtomicBoolean stopping = new AtomicBoolean(false);
- private static AtomicBoolean registeredHolder = new AtomicBoolean(false);
-
- private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();
-
- private static void tryRegisterOnlyOnce() {
- boolean previousRegistered = registeredHolder.getAndSet(true);
- if (!previousRegistered) {
- registerTermSignal();
- }
- }
-
- private static void registerTermSignal() {
- Signal.handle(new Signal("TERM"), signal -> {
- boolean previous = signalTriggered.getAndSet(true);
- if (previous) {
- System.out.println("Term has been triggered.");
- return;
- }
- termAndExit();
- });
- }
-
- public static void addTerm(Runnable runnable) {
- tryRegisterOnlyOnce();
- terms.addLast(runnable);
- }
-
- public static void addFirstTerm(Runnable runnable) {
- tryRegisterOnlyOnce();
- terms.addFirst(runnable);
- }
-
- private static void termAndExit() {
- try {
- Thread current = Thread.currentThread();
- current.setName(current.getName() + "(退出线程)");
- System.out.println("do term cleanup....");
- doTerm();
- System.out.println("exit success.");
- System.exit(0);
- } catch (Throwable e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
-
- public static void doTerm() {
- boolean previousStopping = stopping.getAndSet(true);
- if (previousStopping) {
- System.out.println("Term routine already running, wait until done!");
- return;
- }
- for (Runnable runnable : terms) {
- try {
- System.out.println("execute term runnable : " + runnable);
- runnable.run();
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- }
- 复制代码
TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。我们也可以在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。
- public class ShutDownTest {
-
- private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
-
- private static AtomicLong taskId = new AtomicLong(0);
-
- // 生产任务
- private static class ProduceTask implements Runnable {
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- @Override
- public void run() {
- while (!stopped.get()) {
- long element = taskId.incrementAndGet();
- queue.add(element);
- System.out.println("add element : " + element);
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- }
- }
- }
-
- public void setStopped() {
- stopped.compareAndSet(false, true);
- System.out.println("stop producer.");
- }
- }
-
- // 消费任务
- private static class ConsumeTask implements Runnable {
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- @Override
- public void run() {
- while (!stopped.get() || queue.size() > 0) {
- try {
- long element = queue.take();
- System.out.println("consume element : " + element);
- doWork();
- } catch (InterruptedException e) {
- }
- }
- }
-
- private void doWork() {
- try {
- // 消费速度比生产速度稍慢,模拟积压情况
- Thread.sleep(60);
- } catch (InterruptedException e) {
- }
- }
-
- public void setStopped() {
- stopped.compareAndSet(false, true);
- System.out.println("stop consumer.");
- }
- }
-
- public static void main(String[] args) {
- final ProduceTask producerTask = new ProduceTask();
- final Thread producerThread = new Thread(producerTask);
-
- final ConsumeTask consumeTask = new ConsumeTask();
- Thread consumeThread = new Thread(consumeTask);
-
- // 先启动消费
- consumeThread.start();
- // 再启动生产
- producerThread.start();
-
- TermHelper.addFirstTerm(() -> {
- // 关闭生产
- producerTask.setStopped();
- });
-
- TermHelper.addTerm(() -> {
- // 再关闭消费
- consumeTask.setStopped();
- });
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- System.out.println("shut down hook...");
- }));
- }
- }
- 复制代码
执行结果如下所示。需要注意的是我们只注册了TERM信号,所以需要通过kill -TERM的方式关闭服务。从图中可以看到我们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。
小结
若需要平滑停止服务,我们一般可以通过ShutdownHook和Signal来实现。ShutdownHook一般比较难保证关闭任务的执行顺序,这个时候可以考虑使用Signal机制来完全托管我们关闭服务的执行顺序。