当前位置:   article > 正文

ThreadPoolExecutor源码解读(三)——如何优雅的关闭线程池(shutdown、shutdownNow、awaitTermination)_threadpoolexecutor.awaittermination

threadpoolexecutor.awaittermination

更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8


一、前言

学会了如何提交任务,还需要知道如何正确的关闭线程池。当关闭一个线程池时,有的工作线程还正在执行任务,有的调用者正在向线程池提交任务,并且工作队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是一个平滑过渡的过程。

二、shutdown()将runState流转为SHUTDOWN

shutdown()主要做了4步:

  1. 检查是否有中断线程池的权限。
  2. 自旋CAS设置runStatusSHUTDOWN
  3. 中断空闲线程。
  4. 尝试终止线程池。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //1.检查是否有Shutdown权限
        checkShutdownAccess();
        //2.cas设置runStatus为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //3.中断空闲线程
        interruptIdleWorkers();
        //空的钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //4.尝试终止线程池
    tryTerminate();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

检查权限和设置状态都很简单,shutdown是如何只中断空闲线程?为什么要调用tryTerminate()

1、shutdown是如何只中断空闲线程的?

从源码中可以看出在中断线程前会尝试获取Worker的锁,如果获得锁,说明当前的Worker是空闲的,可以中断,这也验证了Worker执行任务代码时加锁,确保除了线程池销毁导致中断外,没有其他中断的设置。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            //循环中断空闲worder
            Thread t = w.thread;
            //w.tryLock()尝试获取worker的锁,如果获得锁说明当前工作线程是空闲的
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2、为什么要调用tryTerminate()?

tryTerminate()不会强行终止线程池,当workerCount为0,workerQueue为空时:

  1. 状态流转到TIDYING
  2. 然后调用钩子函数terminated()
  3. 状态从TIDYING流转到TERMINATED
  4. 调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //1.当workQueue为空,workerCount为空时,cas流转状态为TIDYING
        //2.并调用了一个空钩子函数terminated
        //3.最终将状态流转为TERMINATED,并通知awaitTermination
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS  自旋
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3、termination.signalAll()唤醒的是什么?

awaitTermination()逻辑很简单,就是重复判断runState是否到达最终状态TERMINATED,如果是直接返回true,如果不是,调用termination.awaitNanos(nanos)阻塞一段时间,苏醒后再判断一次,如果runStateTERMINATED返回true,否则返回false。

/**
 * Wait condition to support awaitTermination
 */
private final Condition termination = mainLock.newCondition();

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            判断当前的runstate是否大于等于TERMINATED
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            //如果不是TERMINATED,将等待nanos
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

三、shutdownNow()将runState流转为STOP

shutdownNow()shutdown()相似,主要做了5步:

  1. 检查是否有中断线程池权限。
  2. 设置runStatusSTOP
  3. 中断所有工作线程。
  4. 清空工作队列。
  5. 尝试终止线程池。
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //1.检查是否有shutdown权限
        checkShutdownAccess();
        //2.设置runStatus为stop
        advanceRunState(STOP);
        //3.给worders发送终止信号
        interruptWorkers();
        //4.清空阻塞队列
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //5.尝试终止线程池
    tryTerminate();
    return tasks;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

四、shutdown()和shutdownNow()的区别

shutdownNow()shutdown()不同之处在于:

  • shutdownNow()会终止所有工作线程,不管是空闲还是正在运行。
  • shutdownNow()会清空阻塞队列。
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    //删除所有元素并加入到taskList
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        //如果q此时又被加入了任务再将任务删除并加入taskList
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

五、关闭线程池的正确姿势

分析源码之后可知只调用shutdown()或者shutdownNow()是不够的,因为线程池并不一定立刻终止,还需要调用awaitTermination,循环检查runState是否到了最终状态TERMINATED

故正确关闭线程池的姿势如下:

threadPoolExecutor.shutdown();
//threadPoolExecutor.shutdownNow();
try {
    boolean loop = true;
    do {
        loop = !threadPoolExecutor.awaitTermination(2, TimeUnit.SECONDS);
    } while (loop);
} catch (InterruptedException e) {
    e.printStackTrace();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

六、总结

  • shutdown()会中断空闲工作线程,不会中断正在执行任务的工作线程,也不会清空工作队列,会等待所有已提交的任务执行完,但是拒绝新提交的任务。
  • shutdownNow(),会中断所有工作线程,并清空工作队列,拒绝新提交的任务。
  • 关闭线程池,只调用shutdown()或者shutdownNow()是不够的,因为线程池并不一定立刻终止,还需要调用awaitTermination,循环检查runState是否到了最终状态TERMINATED

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/812576
推荐阅读
相关标签
  

闽ICP备14008679号