当前位置:   article > 正文

【Java】线程池(五)shutdown shutdownnow_scheduledexecutorservice的shutdown会阻塞吗

scheduledexecutorservice的shutdown会阻塞吗

以ExecutorService的实现类ThreadPoolExecutor为例,看下关于线程池管理方法的实原理:

注意shutdown shutdownnow方法均不是阻塞的,仅仅完成状态的设置,不会等待任务执行完毕。

1.shutdown:调用该方法后会拒绝接收新任务。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. advanceRunState(SHUTDOWN);
  7. interruptIdleWorkers();
  8. onShutdown(); // hook for ScheduledThreadPoolExecutor
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. }

调用advanceRunState方法先设置线程池的状态为SHUTDOWN,实现方式为cas自旋,该状态的线程池将不会接收新的任务。为什么不会接收?

Worker是一个Ruannble接口的实现类,其run方法中,有一处调用是不断从任务队列中取任务,每一次poll,都会先检测一遍当前线程池的状态:

  1. int rs = runStateOf(c);
  2. // Check if queue empty only if necessary.
  3. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  4. decrementWorkerCount();
  5. return null;
  6. }

如果是SHUTDOWN并且任务队列为空,则直接返回null,表示不再有新的任务了,worker线程会退出。

设置完状态后,接着调用interruptIdleWorkers方法,中断那些空闲worker线程:

  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. if (!t.isInterrupted() && w.tryLock()) {
  8. try {
  9. t.interrupt();
  10. } catch (SecurityException ignore) {
  11. } finally {
  12. w.unlock();
  13. }
  14. }
  15. if (onlyOne)
  16. break;
  17. }
  18. } finally {
  19. mainLock.unlock();
  20. }
  21. }

其实就是遍历当前工作线程,并依次调用其interrpt方法。为什么需要这步?

因为Worker的run方法里,会不断从任务队列里poll任务,这是一个阻塞操作,如果不打断阻塞poll,则有的空闲的worker会一直等在那里,导致回收不及时。一旦打断阻塞,会结束当前循环,下一次再开始循环时,就可以检测到线程池状态已经变为了SHUTDOWN,所以就可以直接返回了。下面是getTask源码:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // Are workers subject to culling?
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14. if ((wc > maximumPoolSize || (timed && timedOut))
  15. && (wc > 1 || workQueue.isEmpty())) {
  16. if (compareAndDecrementWorkerCount(c))
  17. return null;
  18. continue;
  19. }
  20. try {
  21. Runnable r = timed ?
  22. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23. workQueue.take();
  24. if (r != null)
  25. return r;
  26. timedOut = true;
  27. } catch (InterruptedException retry) {
  28. timedOut = false;
  29. }
  30. }
  31. }

最后一步是调用tryTerminate方法,尝试将线程池状态转换为TERMINATED,该状态才是线程池真正终止的标志:

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (isRunning(c) ||
  5. runStateAtLeast(c, TIDYING) ||
  6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  7. return;
  8. if (workerCountOf(c) != 0) { // Eligible to terminate
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  16. try {
  17. terminated();
  18. } finally {
  19. ctl.set(ctlOf(TERMINATED, 0));
  20. termination.signalAll();
  21. }
  22. return;
  23. }
  24. } finally {
  25. mainLock.unlock();
  26. }
  27. // else retry on failed CAS
  28. }
  29. }

该方法也是在一个for循环内部,通过不断尝试来terminate线程池。该方法会在多处被调用,这里看shutdown的情况。

如果当前状态是SHUTDOWN并且workQueue不为空,那么第一层if就直接return了,意味着当前时刻还无法terminate,因为workQueue里还有未执行完成的任务。否则如果当前状态是SHUTDOWN并且workQueue为空了,那么可以通过第一层if判断,这意味着线程池的等待队列里已经没有需要被处理的任务了,只需要等待正在被处理的任务结束即可。所以第二个if就是在判断正在执行任务的worker数目是不是0,如果不是0,就尝试执行interruptIdleWorkers方法,因为一旦执行完了,就可以通过该方法来停止工作线程的运行,完成shutdown。一旦worker数目为0了,就会进入到真正的TERMINATED状态的设置了,在terminate方法调用前,先设置为TIDYING状态,terminate方法调用后,再设置为TERMINATED状态。

为什么要有TIDYING状态而不是直接TERMINATED状态?

因为只要进入到TIDYING状态,说明工作线程数为0并且workQueue为0(如果状态是SHUTDOWN,那么有一个判断可以保证workQueue为0,如果状态是STOP,那么会在shutdownnow方法内清空workQueue),这可以定义为一个标志性的状态,即线程池为空,所以TIDYING状态就是为了表征这个case的。

与TERMINATED的区别是,TERMINATED状态在TIDYING基础上增加了terminate方法的调用。该状态是线程池关闭的真正状态。

总结shutdown方法做了什么事情:

1.设置线程池状态为SHUTDOWN,这将导致线程池不会接收新任务;

2.中断idle的worker线程,回收;

3.尝试terminate;

 

2.shutdownnow:调用该方法后会停止接收新的任务,并且停止当前正在执行的任务,清空workQueue。

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. interruptWorkers();
  9. tasks = drainQueue();
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks;
  15. }

这里会先调用advanceRunState方法设置线程池状态为STOP。然后调用interruptWorkers方法中断所有worker线程,实现为依次调用interrupt方法打断worker线程:

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

然后再调用drainQueue方法,将workQueue里等待执行的任务清空并导入到结果集tasks队列里:

  1. private List<Runnable> drainQueue() {
  2. BlockingQueue<Runnable> q = workQueue;
  3. ArrayList<Runnable> taskList = new ArrayList<Runnable>();
  4. q.drainTo(taskList);
  5. if (!q.isEmpty()) {
  6. for (Runnable r : q.toArray(new Runnable[0])) {
  7. if (q.remove(r))
  8. taskList.add(r);
  9. }
  10. }
  11. return taskList;
  12. }

最后调用tryTerminate方法,尝试将线程池状态转换为TERMINATED。

对应到shutdownnow的情况,线程池状态会是STOP,所以第一个if可以通过,第二个if如果所有任务都有对中断信号的响应处理,那么理论上来说是可以退出的,workQueue数目应该是0了,如果不是,那么也会return,认为无法中断线程池。最后设置TERMINATED状态和之前的逻辑一致。

最后返回workQueue里的任务;

总结shutdownnow做了哪些事情:

1.设置线程池状态为STOP,拒绝接收新任务;

2.打断所有worker线程,回收;

3.清空并返回workQueue里的任务;

4.尝试terminate;

 

3.waitTermination

最后看一个阻塞方法,当调用shutdown方法后,可以再调用该waitTermination方法来等待线程池的退出:

  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (;;) {
  8. if (runStateAtLeast(ctl.get(), TERMINATED))
  9. return true;
  10. if (nanos <= 0)
  11. return false;
  12. nanos = termination.awaitNanos(nanos);
  13. }
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. }

循环检测线程池状态是否是TERMINATED。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/83724
推荐阅读
相关标签
  

闽ICP备14008679号