赞
踩
以ExecutorService的实现类ThreadPoolExecutor为例,看下关于线程池管理方法的实原理:
注意shutdown shutdownnow方法均不是阻塞的,仅仅完成状态的设置,不会等待任务执行完毕。
1.shutdown:调用该方法后会拒绝接收新任务。
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(SHUTDOWN);
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
调用advanceRunState方法先设置线程池的状态为SHUTDOWN,实现方式为cas自旋,该状态的线程池将不会接收新的任务。为什么不会接收?
Worker是一个Ruannble接口的实现类,其run方法中,有一处调用是不断从任务队列中取任务,每一次poll,都会先检测一遍当前线程池的状态:
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
如果是SHUTDOWN并且任务队列为空,则直接返回null,表示不再有新的任务了,worker线程会退出。
设置完状态后,接着调用interruptIdleWorkers方法,中断那些空闲worker线程:
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
其实就是遍历当前工作线程,并依次调用其interrpt方法。为什么需要这步?
因为Worker的run方法里,会不断从任务队列里poll任务,这是一个阻塞操作,如果不打断阻塞poll,则有的空闲的worker会一直等在那里,导致回收不及时。一旦打断阻塞,会结束当前循环,下一次再开始循环时,就可以检测到线程池状态已经变为了SHUTDOWN,所以就可以直接返回了。下面是getTask源码:
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
-
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
-
- int wc = workerCountOf(c);
-
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
最后一步是调用tryTerminate方法,尝试将线程池状态转换为TERMINATED,该状态才是线程池真正终止的标志:
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- if (workerCountOf(c) != 0) { // Eligible to terminate
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated();
- } finally {
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
该方法也是在一个for循环内部,通过不断尝试来terminate线程池。该方法会在多处被调用,这里看shutdown的情况。
如果当前状态是SHUTDOWN并且workQueue不为空,那么第一层if就直接return了,意味着当前时刻还无法terminate,因为workQueue里还有未执行完成的任务。否则如果当前状态是SHUTDOWN并且workQueue为空了,那么可以通过第一层if判断,这意味着线程池的等待队列里已经没有需要被处理的任务了,只需要等待正在被处理的任务结束即可。所以第二个if就是在判断正在执行任务的worker数目是不是0,如果不是0,就尝试执行interruptIdleWorkers方法,因为一旦执行完了,就可以通过该方法来停止工作线程的运行,完成shutdown。一旦worker数目为0了,就会进入到真正的TERMINATED状态的设置了,在terminate方法调用前,先设置为TIDYING状态,terminate方法调用后,再设置为TERMINATED状态。
为什么要有TIDYING状态而不是直接TERMINATED状态?
因为只要进入到TIDYING状态,说明工作线程数为0并且workQueue为0(如果状态是SHUTDOWN,那么有一个判断可以保证workQueue为0,如果状态是STOP,那么会在shutdownnow方法内清空workQueue),这可以定义为一个标志性的状态,即线程池为空,所以TIDYING状态就是为了表征这个case的。
与TERMINATED的区别是,TERMINATED状态在TIDYING基础上增加了terminate方法的调用。该状态是线程池关闭的真正状态。
总结shutdown方法做了什么事情:
1.设置线程池状态为SHUTDOWN,这将导致线程池不会接收新任务;
2.中断idle的worker线程,回收;
3.尝试terminate;
2.shutdownnow:调用该方法后会停止接收新的任务,并且停止当前正在执行的任务,清空workQueue。
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(STOP);
- interruptWorkers();
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
这里会先调用advanceRunState方法设置线程池状态为STOP。然后调用interruptWorkers方法中断所有worker线程,实现为依次调用interrupt方法打断worker线程:
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
然后再调用drainQueue方法,将workQueue里等待执行的任务清空并导入到结果集tasks队列里:
- private List<Runnable> drainQueue() {
- BlockingQueue<Runnable> q = workQueue;
- ArrayList<Runnable> taskList = new ArrayList<Runnable>();
- q.drainTo(taskList);
- if (!q.isEmpty()) {
- for (Runnable r : q.toArray(new Runnable[0])) {
- if (q.remove(r))
- taskList.add(r);
- }
- }
- return taskList;
- }
最后调用tryTerminate方法,尝试将线程池状态转换为TERMINATED。
对应到shutdownnow的情况,线程池状态会是STOP,所以第一个if可以通过,第二个if如果所有任务都有对中断信号的响应处理,那么理论上来说是可以退出的,workQueue数目应该是0了,如果不是,那么也会return,认为无法中断线程池。最后设置TERMINATED状态和之前的逻辑一致。
最后返回workQueue里的任务;
总结shutdownnow做了哪些事情:
1.设置线程池状态为STOP,拒绝接收新任务;
2.打断所有worker线程,回收;
3.清空并返回workQueue里的任务;
4.尝试terminate;
3.waitTermination
最后看一个阻塞方法,当调用shutdown方法后,可以再调用该waitTermination方法来等待线程池的退出:
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- if (nanos <= 0)
- return false;
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
循环检测线程池状态是否是TERMINATED。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。