赞
踩
补充了和Thread的interrupt操作相关的知识,回头再来看ThreadPoolExecutor中interrupt,关闭线程池等相关操作。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
开始一个顺序的shutdown操作,shutdown之前被执行的已提交任务,新的任务不会再被接收了。如果线程池已经被shutdown了,该方法的调用没有其他任何效果了。
该方法不会等待之前已经提交的任务执行完毕,awaitTermination方法才有这个效果。
具体看内部逻辑,checkShutdownAccess这个方法是确保允许调用发interrupt每个Worker线程的,具体就不看了。
/**
* Transitions runState to given target, or leaves it alone if
* already at least the given target.
*
* @param targetState the desired state, either SHUTDOWN or STOP
* (but not TIDYING or TERMINATED -- use tryTerminate for that)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
设置的逻辑是,如果当前的线程池状态已经是要设置的状态,或者已经超过了要设置状态(runStateAtLeast方法返回值是true),就保持不做任何操作了,直接break。
如果线程池当前状态比要设置的状态小,比如当前是RUNNING,要设置是的SHUTDOWN,那么runStateAtLeast方法返回false,继续走第二个判断,原子设置rs,如果失败的话继续这个流程。
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
这里简单说一下传false的效果,就是检查当前所有worker线程,在获取Worker锁的情况下,把所有没有interrupt的线程都执行interrupt操作。
/** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
中断那些可能在等待执行任务的线程(没有被锁住的——idle语义,tryLock成功,如果Worker线程在执行任务,runWorker方法中的执行任务的Worker是占有的锁的,所以这里是无法获取锁的,也就是非idle的了),让他们能检查是否可以terminate。这里直接吞了SecurityException异常,防止某些线程在interrupt之后仍然处于uninterrupted状态。
onlyOne参数,如果是true,最多只中断一个Worker。这种情况只有在tryTerminate调用的时候才会出现,表示可以termination,但是还有其他的Worker存在。在这种情况下,最多只有一个处于等待的Worker被中断,来保证shutdown信号的繁衍传递(propagate语义),以便能处理所有信号都处于等待状态的情况,这个情景是什么,代码块在哪儿?。
中断任意一个随机的线程都能保证从shutdown操作开始之后新添加的Worker最终都能退出(哪个代码块有这个功能?)。
为了保证最终的termination,永远只interrupt一个线程就足够了(为什么足够),但是shutdown操作总是所有idle的workers,这样冗余的workers可以立即退出,而不是等待一个straggler任务来完成操作。
只看这一个方法和这一段注释可能会有点云里雾里,还需要结合其他方法一起看。
/** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { try { w.thread.interrupt(); } catch (SecurityException ignore) { } } } finally { mainLock.unlock(); } }
和interruptIdleWorkers和区别从代码上看就是后者在进行中断之前进行了一个而判断:
if (!t.isInterrupted() && w.tryLock())
纵观整个ThreadPoolExecutor类代码,只有runWorker方法中会尝试持有Worker锁(调用Worker的lock方法)。而Worker之所以继承AbstractQueuedSynchronizer类的语义也是为了保护一个正在等待执行任务的Worker线程不被中断操作影响。interruptIdleWorkers方法会因为这层保护而放弃对某个Worker线程的中断(tryLock为false)。
但是对于interruptWorkers方法,没有这个判断,是无差别的中断操作(除非中断是抛出了SecurityException异常进入catch块并被吞掉),在shutdownNow方法中调用。
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
尝试stop所有actively executing线程,halt所有正处于等待状态的任务,并返回一个等待执行的task列表。返回列表之后,这些任务已经从task队列中移除了,通过drained (removed)操作。
该方法不会等待actively executing tasks终止,而是立即结束。如果想等待指定时间,可以调用awaitTermination方法。
该方法和shutdownNow方法的区别有三个:
第一个是把状态设置为STOP而不是SHUTDOWN。
第二个是调用interruptWorkers方法而不是interruptIdleWorkers,这两个方法的区别上面已经说过了。
第三个是drainQueue把所有任务从队列中移除。
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
在线程状态是SHUTDOWN而且线程池和任务队列都是空的,或者线程池处于STOP状态,并且线程池是空的,把线程池的状态改为TERMINATED。
如果线程池状态是可以被terminate,但是wc不是0,那么用interruptIdleWorkers(true)来中断一个idle worker来确保shutdown操作的繁衍(propagate语义)。
该方法一定要跟在任何使termination可行的操作之后——减少wc的值或者在shutdown过程中从任务队列中移除任务。目前已知调用:
1.addWorker中
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
2.shutdown操作
3.shutdownNow操作
4.remove操作
remove操作的执行在execute方法中double check的时候,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
如果在成功添加task之后线程池shutdown了,需要回滚入队列操作——remove。
5.purge操作
6.processWorkerExit操作
该操作在runWorkerd的finally块中执行。
先看第一个if判断:
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
如果线程池状态是SHUTDOWN,而且任务队列不是空的,也直接返回。
如果线程池状态是SHUTDOWN,而且任务队列是空的,向下进行。
如果线程池状态是STOP,向下进行。
再看第二个if判断:
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
所以能走到下面流程的条件是:
1.线程池状态是STOP且wc是0
2.线程池状态是SHUTDOWN而且wc(pool)和任务队列(queue)都是空的
只有这两个情况,线程池的状态会被原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)将状态设置为TIDYING,并在成功之后(因为tryTerminate方法会在多出调用,存在竞争)进一步在terminated结束之后的finally块中通过ctl.set(ctlOf(TERMINATED, 0))设置为TERMINATED。
最后执行termination.signalAll(),会唤醒awaitTermination方法中由于执行termination.awaitNanos(nanos)操作进入等待状态的线程。
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
shutdown会把状态改为SHUTDOWN,advanceRunState(SHUTDOWN)
shutdownNow会把状态改为STOP,advanceRunState(STOP)。
tryTerminate中会在两重if判断都过了之后,原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)将状态设置为TIDYING,并在成功之后进一步在terminated结束之后的finally块中通过ctl.set(ctlOf(TERMINATED, 0))设置为TERMINATED。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。