赞
踩
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * 先前提交的任务将会被工作线程执行,新的线程将会被拒绝。这个方法 不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。 * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查线程访问权限 checkShutdownAccess(); //更新线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲工作线程 interruptIdleWorkers(); //线程池关闭hook onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //尝试结束线程池,这个前面以说,这里不再说 tryTerminate(); }
//检查线程访问权限checkShutdownAccess();
//更新线程池状态为SHUTDOWNadvanceRunState(SHUTDOWN);
//中断空闲工作线程interruptIdleWorkers();
//线程池关闭hookonShutdown(); // hook for ScheduledThreadPoolExecutor
checkShutdownAccess();
/** * If there is a security manager, makes sure caller has * permission to shut down threads in general (see shutdownPerm). * If this passes, additionally makes sure the caller is allowed * to interrupt each worker thread. This might not be true even if * first check passed, if the SecurityManager treats some threads * specially. */ private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //遍历工作线程集,检查任务线程访问权限 security.checkAccess(w.thread); } finally { mainLock.unlock(); } } }
advanceRunState(SHUTDOWN);
/** * Transitions runState to given target, or leaves it alone if * already at least the given target. * * @param targetState the desired state, either SHUTDOWN or STOP * (but not TIDYING or TERMINATED -- use tryTerminate for that) */ private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
interruptIdleWorkers();
/** * Common form of interruptIdleWorkers, to avoid having to * remember what the boolean argument means. */ private void interruptIdleWorkers() { //遍历工作线程集合,中断空闲工作线程,前面已将,这里不再说 interruptIdleWorkers(false); }
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { //遍历工作线程集 Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲 try { //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) //如果是只中断一个空闲线程,则结束本次中断空闲线程任务 break; } } finally { mainLock.unlock(); } }
private static final boolean ONLY_ONE = true;
onShutdown(); // hook for ScheduledThreadPoolExecutor /** * Performs any further cleanup following run state transition on * invocation of shutdown. A no-op here, but used by * ScheduledThreadPoolExecutor to cancel delayed tasks. */ void onShutdown() { //待子类扩展 }
final void tryTerminate() { //自旋尝试关闭线程池 for (;;) { int c = ctl.get(); //如果线程池正在运行,或正在关闭且任务队列不为空,则返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate //如果工作线程不为空,则中断空闲工作线程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //执行结束工作 terminated(); } finally { //线程池已结束 ctl.set(ctlOf(TERMINATED, 0)); //唤醒等待线程池结束的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
terminated();
/** * Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, subclasses should generally invoke * {@code super.terminated} within this method. */ protected void terminated() { //待子类扩展}
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * 尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空, 并返回任务队列中的任务集。 这个方法不会等待已执行的任务结束,我们用awaitTermination来等待任务执行完 * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查工作线程权限 checkShutdownAccess(); //更新线程池状态为STOP advanceRunState(STOP); //中断空闲工作线程 interruptWorkers(); //清空任务队列,并放到tasks集合中 tasks = drainQueue(); } finally { mainLock.unlock(); } //尝试结束线程池 tryTerminate(); return tasks; }
tasks = drainQueue();
/** * Drains the task queue into a new list, normally using * drainTo. But if the queue is a DelayQueue or any other kind of * queue for which poll or drainTo may fail to remove some * elements, it deletes them one by one. */ private List<Runnable> drainQueue() { //这个方法很简单,不再说了 BlockingQueue<Runnable> q = workQueue; List<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //自旋等待线程线程结束条件 for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
/** * Invokes {@code shutdown} when this executor is no longer * referenced and it has no threads. */ //线程池不在被应用时,关闭线程池 protected void finalize() { shutdown(); }//创建一个核心空闲工作线程等待任务 /** * Starts a core thread, causing it to idly wait for work. This * overrides the default policy of starting core threads only when * new tasks are executed. This method will return {@code false} * if all core threads have already been started. * * @return {@code true} if a thread was started */ public boolean prestartCoreThread() { //addWorker,core参数为true使用核心线程池数量,否则最大线程池数量 return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); }//确保至少有一个空闲工作线程等待任务/** * Same as prestartCoreThread except arranges that at least one * thread is started even if corePoolSize is 0. */ void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }//创建核心线程池数量的空闲工作线程等待任务 /** * Starts all core threads, causing them to idly wait for work. This * overrides the default policy of starting core threads only when * new tasks are executed. * * @return the number of threads started */ public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }//尝试移除任务取消的工作线程 /** * Tries to remove from the work queue all {@link Future} * tasks that have been cancelled. This method can be useful as a * storage reclamation operation, that has no other impact on * functionality. Cancelled tasks are never executed, but may * accumulate in work queues until worker threads can actively * remove them. Invoking this method instead tries to remove them now. * However, this method may fail to remove tasks in * the presence of interference by other threads. */ public void purge() { final BlockingQueue<Runnable> q = workQueue; try { Iterator<Runnable> it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { // Take slow path if we encounter interference during traversal. // Make copy for traversal and call remove for cancelled entries. // The slow path is more likely to be O(N*N). for (Object r : q.toArray()) if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) q.remove(r); } tryTerminate(); // In case SHUTDOWN and now empty } //是否关闭,大于运行状态,即为关闭 public boolean isShutdown() { return ! isRunning(ctl.get()); } private static boolean isRunning(int c) { return c < SHUTDOWN; }//线程池关闭,但还没有完全结束/** * Returns true if this executor is in the process of terminating * after {@link #shutdown} or {@link #shutdownNow} but has not * completely terminated. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have * ignored or suppressed interruption, causing this executor not * to properly terminate. * * @return true if terminating but not yet terminated */ public boolean isTerminating() { int c = ctl.get(); return ! isRunning(c) && runStateLessThan(c, TERMINATED); } //线程池是否结束 public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); }//设置工作线程保活时间 /** * Sets the time limit for which threads may remain idle before * being terminated. If there are more than the core number of * threads currently in the pool, after waiting this amount of * time without processing a task, excess threads will be * terminated. This overrides any value set in the constructor. * * @param time the time to wait. A time value of zero will cause * excess threads to terminate immediately after executing tasks. * @param unit the time unit of the {@code time} argument * @throws IllegalArgumentException if {@code time} less than zero or * if {@code time} is zero and {@code allowsCoreThreadTimeOut} * @see #getKeepAliveTime */ public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) //如果保证时间小于当前保证时间,中断空闲工作线程 interruptIdleWorkers(); }//运行工作线程超时等待/** * Sets the policy governing whether core threads may time out and * terminate if no tasks arrive within the keep-alive time, being * replaced if needed when new tasks arrive. When false, core * threads are never terminated due to lack of incoming * tasks. When true, the same keep-alive policy applying to * non-core threads applies also to core threads. To avoid * continual thread replacement, the keep-alive time must be * greater than zero when setting {@code true}. This method * should in general be called before the pool is actively used. * * @param value {@code true} if should time out, else {@code false} * @throws IllegalArgumentException if value is {@code true} * and the current keep-alive time is not greater than zero * * @since 1.6 */ public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } }//设置核心线程池数量 /** * Sets the core number of threads. This overrides any value set * in the constructor. If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle. If larger, new threads will, if needed, * be started to execute any queued tasks. * * @param corePoolSize the new core size * @throws IllegalArgumentException if {@code corePoolSize < 0} * @see #getCorePoolSize */ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) //如果当前工作线程数量大于corePoolSize,中断空闲工作线程 interruptIdleWorkers(); else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); //如果核心线程池数量增大,且工作线程未达到核心线程池数量,则添加等待执行的任务数量 //和新增核心线程池数量中最小者数量的空闲工作线程 while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }//设置最大线程池数量/** * Sets the maximum allowed number of threads. This overrides any * value set in the constructor. If the new value is smaller than * the current value, excess existing threads will be * terminated when they next become idle. * * @param maximumPoolSize the new maximum * @throws IllegalArgumentException if the new maximum is * less than or equal to zero, or * less than the {@linkplain #getCorePoolSize core pool size} * @see #getMaximumPoolSize */ public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) //如果当前工作线程数量大于最大线程池数量,则中断空闲工作线程 interruptIdleWorkers(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。