当前位置:   article > 正文

使用ScheduledExecutorService执行周期性任务需要注意的地方_scheduledexecutorservice shutdown

scheduledexecutorservice shutdown

使用spring框架开发程序时基本上很少有人会使用ScheduledExecutorService来执行周期性任务,但是ScheduledExecutorService在某些场景下可能会用到,使用JDK自带的这个周期性调度器时一定要确保任务内部不能抛出运行时异常,否则后续任务将不会执行,至于原因,接下来将从源码角度分析下:

使用Excutors工厂类创建的ScheduledExecutorService,其实现类为ScheduledThreadPoolExecutor,

从上面代码可以看到,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以执行任务调度还是使用的ThreadPoolExecutor类的runWorker方法,该方法不断从任务队列中取任务并执行,ScheduledExecutorService scheduleAtFixedRate方法内部创建了经过包装的任务ScheduledFutureTask,任务调度方法:

  1. final void runWorker(Worker w) {
  2. /**
  3. * RUNNING: 接收新任务并且处理任务队列中任务
  4. * SHUTDOWN: 不接收新任务但处理任务队列中任务
  5. * STOP: 不接收新任务且不处理任务队列中任务,中断所有任务,ShutdownNow方法调用时更新池状态为STOP
  6. * TIDYING: 所有任务已经结束,workerCount=0,线程状态转换为TIDYING,接下来执行terminated()方法
  7. * TERMINATED: terminated()方法已结束
  8. */
  9. Thread wt = Thread.currentThread();
  10. Runnable task = w.firstTask;
  11. w.firstTask = null;
  12. w.unlock(); // allow interrupts
  13. boolean completedAbruptly = true;
  14. try {
  15. //getTask方法可能会阻塞或者等待超时等待任务
  16. while (task != null || (task = getTask()) != null) {
  17. //每个工作线程一把锁
  18. w.lock();
  19. // If pool is stopping, ensure thread is interrupted;
  20. // if not, ensure thread is not interrupted. This
  21. // requires a recheck in second case to deal with
  22. // shutdownNow race while clearing interrupt
  23. /**
  24. private static final int RUNNING = -1 << COUNT_BITS; //0xe0000000
  25. private static final int SHUTDOWN = 0 << COUNT_BITS;
  26. private static final int STOP = 1 << COUNT_BITS; //0x20000000
  27. private static final int TIDYING = 2 << COUNT_BITS; //0x40000000
  28. private static final int TERMINATED = 3 << COUNT_BITS; //0x60000000
  29. * 线程池状态切换
  30. *
  31. * RUNNING -> SHUTDOWN
  32. * shutdown方法
  33. * (RUNNING or SHUTDOWN) -> STOP
  34. * 调用shutdownNow
  35. * SHUTDOWN -> TIDYING
  36. * When both queue and pool are empty
  37. * STOP -> TIDYING
  38. * When pool is empty
  39. * TIDYING -> TERMINATED
  40. * When the terminated() hook method has completed
  41. *
  42. 1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程
  43. 2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
  44. */
  45. //STOP只在shutDownNow中出现
  46. if ((runStateAtLeast(ctl.get(), STOP) ||(
  47. Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
  48. && !wt.isInterrupted())
  49. wt.interrupt();
  50. try {
  51. beforeExecute(wt, task);
  52. Throwable thrown = null;
  53. try {
  54. //此处直接调用Runnable的run方法:在当前线程执行run方法
  55. task.run();
  56. } catch (RuntimeException x) {
  57. thrown = x; throw x;
  58. } catch (Error x) {
  59. thrown = x; throw x;
  60. } catch (Throwable x) {
  61. thrown = x; throw new Error(x);
  62. } finally {
  63. afterExecute(task, thrown);
  64. }
  65. } finally {
  66. task = null;
  67. w.completedTasks++;
  68. w.unlock();
  69. }
  70. }
  71. completedAbruptly = false;
  72. } finally {
  73. processWorkerExit(w, completedAbruptly);
  74. }
  75. }

getTask返回的就是ScheduledFutureTask,所以task.run()根据多态性执行的就是ScheduledFutureTask的run方法:

  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. else if (!periodic) //不是周期性任务
  6. ScheduledFutureTask.super.run(); //调用FutureTask类run方法
  7. else if (ScheduledFutureTask.super.runAndReset()) {//周期性任务,没有返回结果,调用FutureTask类runAndReset方法
  8. //设置下次执行的时间
  9. setNextRunTime();
  10. //重新将该任务放入任务队列
  11. reExecutePeriodic(outerTask);
  12. }
  13. }

ScheduledFutureTask将实际任务委托给父类完成,从上面代码可以看出,只有当ScheduledFutureTask.super.runAndReset()返回true时,才会设置任务下次执行时间并重新把该任务放入任务等待队列中,

周期性任务调用的是FutureTask.runAndReset()方法,下面就是揭开问题面纱的部分:

  1. protected boolean runAndReset() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return false;
  6. boolean ran = false;
  7. int s = state;
  8. try {
  9. Callable<V> c = callable;
  10. if (c != null && s == NEW) {
  11. try {
  12. c.call(); //如果此处抛出异常,那么ran将不会被设置为true,该方法返回false
  13. ran = true;
  14. } catch (Throwable ex) {
  15. setException(ex);
  16. }
  17. }
  18. } finally {
  19. // runner must be non-null until state is settled to
  20. // prevent concurrent calls to run()
  21. runner = null;
  22. // state must be re-read after nulling runner to prevent
  23. // leaked interrupts
  24. s = state;
  25. if (s >= INTERRUPTING)
  26. handlePossibleCancellationInterrupt(s);
  27. }
  28. return ran && s == NEW;
  29. }

 

收工~~~~~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/83701
推荐阅读
相关标签
  

闽ICP备14008679号