当前位置:   article > 正文

服务如何优雅关闭

如何优雅的kill 服务

背景

很多时候服务都有平滑退出的需求,例如RPC服务在停止之后需要从注册服务摘除节点、从消息队列已经消费的消息需要正常处理完成等。一般地我们希望能让服务在退出前能执行完当前正在执行的任务,这个时候就需要我们在JVM关闭的时候运行一些清理现场的代码。

方案

ShutdownHook

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,允许用户注册一个JVM关闭的钩子。这个钩子可以在以下几种场景被调用:

  • 程序正常退出;
  • 使用System.exit();
  • 终端使用Ctrl+C触发的终端;
  • 系统关闭;
  • 使用kill pid命令干掉进程;

一般地发布系统会通过kill命令来停止服务。这个时候服务可以接收到关闭信号并执行钩子程序进行清理工作。

场景示例

假设以下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当我们停止服务的时候,希望队列的消息都能正常处理完成,代码示例如下:

  1. /**
  2. * 服务关闭测试
  3. */
  4. public class ShutDownTest {
  5. private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
  6. private static AtomicLong taskId = new AtomicLong(0);
  7. // 生产任务
  8. private static class ProduceTask implements Runnable {
  9. private AtomicBoolean stopped = new AtomicBoolean(false);
  10. @Override
  11. public void run() {
  12. while (!stopped.get()) {
  13. long element = taskId.incrementAndGet();
  14. queue.add(element);
  15. System.out.println("add element : " + element);
  16. try {
  17. Thread.sleep(50);
  18. } catch (InterruptedException e) {
  19. }
  20. }
  21. }
  22. public void setStopped() {
  23. stopped.compareAndSet(false, true);
  24. System.out.println("stop producer.");
  25. }
  26. }
  27. // 消费任务
  28. private static class ConsumeTask implements Runnable {
  29. private AtomicBoolean stopped = new AtomicBoolean(false);
  30. @Override
  31. public void run() {
  32. while (!stopped.get() || queue.size() > 0) {
  33. try {
  34. long element = queue.take();
  35. System.out.println("consume element : " + element);
  36. doWork();
  37. } catch (InterruptedException e) {
  38. }
  39. }
  40. }
  41. private void doWork() {
  42. try {
  43. // 消费速度比生产速度稍慢,模拟积压情况
  44. Thread.sleep(60);
  45. } catch (InterruptedException e) {
  46. }
  47. }
  48. public void setStopped() {
  49. stopped.compareAndSet(false, true);
  50. System.out.println("stop consumer.");
  51. }
  52. }
  53. public static void main(String[] args) {
  54. final ProduceTask producerTask = new ProduceTask();
  55. final Thread producerThread = new Thread(producerTask);
  56. final ConsumeTask consumeTask = new ConsumeTask();
  57. Thread consumeThread = new Thread(consumeTask);
  58. // 先启动消费
  59. consumeThread.start();
  60. // 再启动生产
  61. producerThread.start();
  62. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  63. System.out.println("try close...");
  64. // 先关闭生产
  65. producerTask.setStopped();
  66. // 再关闭消费
  67. consumeTask.setStopped();
  68. try {
  69. System.out.println("close wait...");
  70. Thread.sleep(5000);
  71. } catch (InterruptedException e) {
  72. }
  73. System.out.println("close finished...");
  74. }));
  75. }
  76. }
  77. 复制代码

执行结果如下所示,可以看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。

潜在问题

在使用ShutdownHook的时候,我们往往控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,假若是独立注册钩子,在更复杂的项目里面是不是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,我们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),然后就会偶尔出现服务无法关闭的问题。原因正是线程池先被关闭,kafka队列却还在消费消息,导致消费线程一直在等待。

Signal

Java同时提供了signal信号机制,我们的服务也可以接收到关闭信号。

使用Signal机制有以下原因:

  • ShutdownHook执行顺序无法保障,第三方组件也可能注册,导致业务自定义的退出流程依赖的资源会被提前关闭和清理;
  • Signal是非公开API,第三方组件基本很少使用,我们可以在内部托管服务关闭的执行顺序;
  • 在完成清理工作后可以执行exit调用,保证资源清理不会影响ShutdownHook的退出清理逻辑;

这里核心的原因还是希望能完全保证服务关闭的顺序,避免出现问题。我们在服务内部按顺序维护关闭任务,上述代码调整后如下所示:

  1. public class TermHelper {
  2. private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
  3. private static AtomicBoolean stopping = new AtomicBoolean(false);
  4. private static AtomicBoolean registeredHolder = new AtomicBoolean(false);
  5. private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();
  6. private static void tryRegisterOnlyOnce() {
  7. boolean previousRegistered = registeredHolder.getAndSet(true);
  8. if (!previousRegistered) {
  9. registerTermSignal();
  10. }
  11. }
  12. private static void registerTermSignal() {
  13. Signal.handle(new Signal("TERM"), signal -> {
  14. boolean previous = signalTriggered.getAndSet(true);
  15. if (previous) {
  16. System.out.println("Term has been triggered.");
  17. return;
  18. }
  19. termAndExit();
  20. });
  21. }
  22. public static void addTerm(Runnable runnable) {
  23. tryRegisterOnlyOnce();
  24. terms.addLast(runnable);
  25. }
  26. public static void addFirstTerm(Runnable runnable) {
  27. tryRegisterOnlyOnce();
  28. terms.addFirst(runnable);
  29. }
  30. private static void termAndExit() {
  31. try {
  32. Thread current = Thread.currentThread();
  33. current.setName(current.getName() + "(退出线程)");
  34. System.out.println("do term cleanup....");
  35. doTerm();
  36. System.out.println("exit success.");
  37. System.exit(0);
  38. } catch (Throwable e) {
  39. e.printStackTrace();
  40. System.exit(1);
  41. }
  42. }
  43. public static void doTerm() {
  44. boolean previousStopping = stopping.getAndSet(true);
  45. if (previousStopping) {
  46. System.out.println("Term routine already running, wait until done!");
  47. return;
  48. }
  49. for (Runnable runnable : terms) {
  50. try {
  51. System.out.println("execute term runnable : " + runnable);
  52. runnable.run();
  53. } catch (Throwable e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. try {
  58. Thread.sleep(5000);
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64. 复制代码

TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。我们也可以在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。

  1. public class ShutDownTest {
  2. private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
  3. private static AtomicLong taskId = new AtomicLong(0);
  4. // 生产任务
  5. private static class ProduceTask implements Runnable {
  6. private AtomicBoolean stopped = new AtomicBoolean(false);
  7. @Override
  8. public void run() {
  9. while (!stopped.get()) {
  10. long element = taskId.incrementAndGet();
  11. queue.add(element);
  12. System.out.println("add element : " + element);
  13. try {
  14. Thread.sleep(50);
  15. } catch (InterruptedException e) {
  16. }
  17. }
  18. }
  19. public void setStopped() {
  20. stopped.compareAndSet(false, true);
  21. System.out.println("stop producer.");
  22. }
  23. }
  24. // 消费任务
  25. private static class ConsumeTask implements Runnable {
  26. private AtomicBoolean stopped = new AtomicBoolean(false);
  27. @Override
  28. public void run() {
  29. while (!stopped.get() || queue.size() > 0) {
  30. try {
  31. long element = queue.take();
  32. System.out.println("consume element : " + element);
  33. doWork();
  34. } catch (InterruptedException e) {
  35. }
  36. }
  37. }
  38. private void doWork() {
  39. try {
  40. // 消费速度比生产速度稍慢,模拟积压情况
  41. Thread.sleep(60);
  42. } catch (InterruptedException e) {
  43. }
  44. }
  45. public void setStopped() {
  46. stopped.compareAndSet(false, true);
  47. System.out.println("stop consumer.");
  48. }
  49. }
  50. public static void main(String[] args) {
  51. final ProduceTask producerTask = new ProduceTask();
  52. final Thread producerThread = new Thread(producerTask);
  53. final ConsumeTask consumeTask = new ConsumeTask();
  54. Thread consumeThread = new Thread(consumeTask);
  55. // 先启动消费
  56. consumeThread.start();
  57. // 再启动生产
  58. producerThread.start();
  59. TermHelper.addFirstTerm(() -> {
  60. // 关闭生产
  61. producerTask.setStopped();
  62. });
  63. TermHelper.addTerm(() -> {
  64. // 再关闭消费
  65. consumeTask.setStopped();
  66. });
  67. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  68. System.out.println("shut down hook...");
  69. }));
  70. }
  71. }
  72. 复制代码

执行结果如下所示。需要注意的是我们只注册了TERM信号,所以需要通过kill -TERM的方式关闭服务。从图中可以看到我们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。

小结

若需要平滑停止服务,我们一般可以通过ShutdownHook和Signal来实现。ShutdownHook一般比较难保证关闭任务的执行顺序,这个时候可以考虑使用Signal机制来完全托管我们关闭服务的执行顺序。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/745536
推荐阅读
相关标签
  

闽ICP备14008679号