当前位置:   article > 正文

ThreadPoolExecutor解析四(线程池关闭)

用threadpoolexecutor关闭线程池后提示resourcewarning: unclosed
Executor接口的定义:[url]http://donald-draper.iteye.com/blog/2365625[/url]
ExecutorService接口定义:[url]http://donald-draper.iteye.com/blog/2365738[/url]
Future接口定义:[url]http://donald-draper.iteye.com/blog/2365798[/url]
FutureTask解析:[url]http://donald-draper.iteye.com/blog/2365980[/url]
CompletionService接口定义:[url]http://donald-draper.iteye.com/blog/2366239[/url]
ExecutorCompletionService解析:[url]http://donald-draper.iteye.com/blog/2366254[/url]
AbstractExecutorService解析:[url]http://donald-draper.iteye.com/blog/2366348[/url]
ScheduledExecutorService接口定义:[url]http://donald-draper.iteye.com/blog/2366436[/url]
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
[url]http://donald-draper.iteye.com/blog/2366934[/url]
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
[url]http://donald-draper.iteye.com/blog/2367064[/url]
ThreadPoolExecutor解析三(线程池执行提交任务):
[url]http://donald-draper.iteye.com/blog/2367199[/url]
上一篇看了线程池执行提交任务,先回顾一下:
执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
今天来看一下线程池的关闭:
/**     * Initiates an orderly shutdown in which previously submitted     * tasks are executed, but no new tasks will be accepted.     * Invocation has no additional effect if already shut down.     *     先前提交的任务将会被工作线程执行,新的线程将会被拒绝。这个方法     不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。     * <p>This method does not wait for previously submitted tasks to     * complete execution.  Use {@link #awaitTermination awaitTermination}     * to do that.     *     * @throws SecurityException {@inheritDoc}     */    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {	    //检查线程访问权限            checkShutdownAccess();	    //更新线程池状态为SHUTDOWN            advanceRunState(SHUTDOWN);	    //中断空闲工作线程            interruptIdleWorkers();	    //线程池关闭hook            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }	//尝试结束线程池,这个前面以说,这里不再说        tryTerminate();    }

分四步来看:
1.
//检查线程访问权限checkShutdownAccess();

2.
//更新线程池状态为SHUTDOWNadvanceRunState(SHUTDOWN);

3.
//中断空闲工作线程interruptIdleWorkers();

4.
//线程池关闭hookonShutdown(); // hook for ScheduledThreadPoolExecutor

下面来看每一点:
1.
//检查线程访问权限
checkShutdownAccess();

 /**     * If there is a security manager, makes sure caller has     * permission to shut down threads in general (see shutdownPerm).     * If this passes, additionally makes sure the caller is allowed     * to interrupt each worker thread. This might not be true even if     * first check passed, if the SecurityManager treats some threads     * specially.     */    private void checkShutdownAccess() {        SecurityManager security = System.getSecurityManager();        if (security != null) {            security.checkPermission(shutdownPerm);            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                for (Worker w : workers)		    //遍历工作线程集,检查任务线程访问权限                    security.checkAccess(w.thread);            } finally {                mainLock.unlock();            }        }    }

2.
//更新线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);

 /**     * Transitions runState to given target, or leaves it alone if     * already at least the given target.     *     * @param targetState the desired state, either SHUTDOWN or STOP     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)     */    private void advanceRunState(int targetState) {        for (;;) {            int c = ctl.get();            if (runStateAtLeast(c, targetState) ||                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))                break;        }    }

3.
//中断空闲工作线程
interruptIdleWorkers();


/**     * Common form of interruptIdleWorkers, to avoid having to     * remember what the boolean argument means.     */    private void interruptIdleWorkers() {       //遍历工作线程集合,中断空闲工作线程,前面已将,这里不再说        interruptIdleWorkers(false);    }

private void interruptIdleWorkers(boolean onlyOne) {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers) {	        //遍历工作线程集                Thread t = w.thread;                if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲                    try {		        //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程                        t.interrupt();                    } catch (SecurityException ignore) {                    } finally {                        w.unlock();                    }                }                if (onlyOne)		    //如果是只中断一个空闲线程,则结束本次中断空闲线程任务                    break;            }        } finally {            mainLock.unlock();        }    }

//只中断一个空闲工作线程
private static final boolean ONLY_ONE = true;


4.
//线程池关闭hook
onShutdown(); // hook for ScheduledThreadPoolExecutor   /**     * Performs any further cleanup following run state transition on     * invocation of shutdown.  A no-op here, but used by     * ScheduledThreadPoolExecutor to cancel delayed tasks.     */    void onShutdown() {    //待子类扩展    }


我们也把tryTerminate代码贴出来,以便理解:
final void tryTerminate() {        //自旋尝试关闭线程池        for (;;) {            int c = ctl.get();	    //如果线程池正在运行,或正在关闭且任务队列不为空,则返回            if (isRunning(c) ||                runStateAtLeast(c, TIDYING) ||                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))                return;            if (workerCountOf(c) != 0) { // Eligible to terminate	        //如果工作线程不为空,则中断空闲工作线程                interruptIdleWorkers(ONLY_ONE);                return;            }            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {	        //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                    try {		        //执行结束工作                        terminated();                    } finally {		        //线程池已结束                        ctl.set(ctlOf(TERMINATED, 0));			//唤醒等待线程池结束的线程                        termination.signalAll();                    }                    return;                }            } finally {                mainLock.unlock();            }            // else retry on failed CAS        }    }

//执行结束工作
terminated();

/**     * Method invoked when the Executor has terminated.  Default     * implementation does nothing. Note: To properly nest multiple     * overridings, subclasses should generally invoke     * {@code super.terminated} within this method.     */    protected void terminated() {     //待子类扩展}

从上面可以看出关闭线程池,首先检查工作线程运行时访问权限,
更新线程状态为SHUTDOWN,中断空闲工作线程,最后尝试关闭线程池。
再看来以及关闭线程池:
/**     * Attempts to stop all actively executing tasks, halts the     * processing of waiting tasks, and returns a list of the tasks     * that were awaiting execution. These tasks are drained (removed)     * from the task queue upon return from this method.     *     尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,     并返回任务队列中的任务集。     这个方法不会等待已执行的任务结束,我们用awaitTermination来等待任务执行完     * <p>This method does not wait for actively executing tasks to     * terminate.  Use {@link #awaitTermination awaitTermination} to     * do that.     *     * <p>There are no guarantees beyond best-effort attempts to stop     * processing actively executing tasks.  This implementation     * cancels tasks via {@link Thread#interrupt}, so any task that     * fails to respond to interrupts may never terminate.     *     * @throws SecurityException {@inheritDoc}     */    public List<Runnable> shutdownNow() {        List<Runnable> tasks;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {	    //检查工作线程权限            checkShutdownAccess();	    //更新线程池状态为STOP            advanceRunState(STOP);	    //中断空闲工作线程            interruptWorkers();            //清空任务队列,并放到tasks集合中            tasks = drainQueue();        } finally {            mainLock.unlock();        }	//尝试结束线程池        tryTerminate();        return tasks;    }

来看清空任务队列,并放到tasks集合中
 tasks = drainQueue();


/**     * Drains the task queue into a new list, normally using     * drainTo. But if the queue is a DelayQueue or any other kind of     * queue for which poll or drainTo may fail to remove some     * elements, it deletes them one by one.     */    private List<Runnable> drainQueue() {        //这个方法很简单,不再说了        BlockingQueue<Runnable> q = workQueue;        List<Runnable> taskList = new ArrayList<Runnable>();        q.drainTo(taskList);        if (!q.isEmpty()) {            for (Runnable r : q.toArray(new Runnable[0])) {                if (q.remove(r))                    taskList.add(r);            }        }        return taskList;    }

立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。
我们可以用awaitTermination来等待任务执行完
public boolean awaitTermination(long timeout, TimeUnit unit)        throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {	    //自旋等待线程线程结束条件            for (;;) {                if (runStateAtLeast(ctl.get(), TERMINATED))                    return true;                if (nanos <= 0)                    return false;                nanos = termination.awaitNanos(nanos);            }        } finally {            mainLock.unlock();        }    }

来看一些其他的方法:
/**     * Invokes {@code shutdown} when this executor is no longer     * referenced and it has no threads.     */    //线程池不在被应用时,关闭线程池    protected void finalize() {        shutdown();    }//创建一个核心空闲工作线程等待任务 /**     * Starts a core thread, causing it to idly wait for work. This     * overrides the default policy of starting core threads only when     * new tasks are executed. This method will return {@code false}     * if all core threads have already been started.     *     * @return {@code true} if a thread was started     */    public boolean prestartCoreThread() {        //addWorker,core参数为true使用核心线程池数量,否则最大线程池数量        return workerCountOf(ctl.get()) < corePoolSize &&            addWorker(null, true);    }//确保至少有一个空闲工作线程等待任务/**     * Same as prestartCoreThread except arranges that at least one     * thread is started even if corePoolSize is 0.     */    void ensurePrestart() {        int wc = workerCountOf(ctl.get());        if (wc < corePoolSize)            addWorker(null, true);        else if (wc == 0)            addWorker(null, false);    }//创建核心线程池数量的空闲工作线程等待任务    /**     * Starts all core threads, causing them to idly wait for work. This     * overrides the default policy of starting core threads only when     * new tasks are executed.     *     * @return the number of threads started     */    public int prestartAllCoreThreads() {        int n = 0;        while (addWorker(null, true))            ++n;        return n;    }//尝试移除任务取消的工作线程 /**     * Tries to remove from the work queue all {@link Future}     * tasks that have been cancelled. This method can be useful as a     * storage reclamation operation, that has no other impact on     * functionality. Cancelled tasks are never executed, but may     * accumulate in work queues until worker threads can actively     * remove them. Invoking this method instead tries to remove them now.     * However, this method may fail to remove tasks in     * the presence of interference by other threads.     */    public void purge() {        final BlockingQueue<Runnable> q = workQueue;        try {            Iterator<Runnable> it = q.iterator();            while (it.hasNext()) {                Runnable r = it.next();                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())                    it.remove();            }        } catch (ConcurrentModificationException fallThrough) {            // Take slow path if we encounter interference during traversal.            // Make copy for traversal and call remove for cancelled entries.            // The slow path is more likely to be O(N*N).            for (Object r : q.toArray())                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())                    q.remove(r);        }        tryTerminate(); // In case SHUTDOWN and now empty    } //是否关闭,大于运行状态,即为关闭  public boolean isShutdown() {        return ! isRunning(ctl.get());    }   private static boolean isRunning(int c) {        return c < SHUTDOWN;    }//线程池关闭,但还没有完全结束/**     * Returns true if this executor is in the process of terminating     * after {@link #shutdown} or {@link #shutdownNow} but has not     * completely terminated.  This method may be useful for     * debugging. A return of {@code true} reported a sufficient     * period after shutdown may indicate that submitted tasks have     * ignored or suppressed interruption, causing this executor not     * to properly terminate.     *     * @return true if terminating but not yet terminated     */    public boolean isTerminating() {        int c = ctl.get();        return ! isRunning(c) && runStateLessThan(c, TERMINATED);    } //线程池是否结束  public boolean isTerminated() {        return runStateAtLeast(ctl.get(), TERMINATED);    }//设置工作线程保活时间   /**     * Sets the time limit for which threads may remain idle before     * being terminated.  If there are more than the core number of     * threads currently in the pool, after waiting this amount of     * time without processing a task, excess threads will be     * terminated.  This overrides any value set in the constructor.     *     * @param time the time to wait.  A time value of zero will cause     *        excess threads to terminate immediately after executing tasks.     * @param unit the time unit of the {@code time} argument     * @throws IllegalArgumentException if {@code time} less than zero or     *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}     * @see #getKeepAliveTime     */    public void setKeepAliveTime(long time, TimeUnit unit) {        if (time < 0)            throw new IllegalArgumentException();        if (time == 0 && allowsCoreThreadTimeOut())            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");        long keepAliveTime = unit.toNanos(time);        long delta = keepAliveTime - this.keepAliveTime;        this.keepAliveTime = keepAliveTime;        if (delta < 0)	    //如果保证时间小于当前保证时间,中断空闲工作线程            interruptIdleWorkers();    }//运行工作线程超时等待/**     * Sets the policy governing whether core threads may time out and     * terminate if no tasks arrive within the keep-alive time, being     * replaced if needed when new tasks arrive. When false, core     * threads are never terminated due to lack of incoming     * tasks. When true, the same keep-alive policy applying to     * non-core threads applies also to core threads. To avoid     * continual thread replacement, the keep-alive time must be     * greater than zero when setting {@code true}. This method     * should in general be called before the pool is actively used.     *     * @param value {@code true} if should time out, else {@code false}     * @throws IllegalArgumentException if value is {@code true}     *         and the current keep-alive time is not greater than zero     *     * @since 1.6     */    public void allowCoreThreadTimeOut(boolean value) {        if (value && keepAliveTime <= 0)            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");        if (value != allowCoreThreadTimeOut) {            allowCoreThreadTimeOut = value;            if (value)                interruptIdleWorkers();        }    }//设置核心线程池数量  /**     * Sets the core number of threads.  This overrides any value set     * in the constructor.  If the new value is smaller than the     * current value, excess existing threads will be terminated when     * they next become idle.  If larger, new threads will, if needed,     * be started to execute any queued tasks.     *     * @param corePoolSize the new core size     * @throws IllegalArgumentException if {@code corePoolSize < 0}     * @see #getCorePoolSize     */    public void setCorePoolSize(int corePoolSize) {        if (corePoolSize < 0)            throw new IllegalArgumentException();        int delta = corePoolSize - this.corePoolSize;        this.corePoolSize = corePoolSize;        if (workerCountOf(ctl.get()) > corePoolSize)	    //如果当前工作线程数量大于corePoolSize,中断空闲工作线程            interruptIdleWorkers();        else if (delta > 0) {            // We don't really know how many new threads are "needed".            // As a heuristic, prestart enough new workers (up to new            // core size) to handle the current number of tasks in            // queue, but stop if queue becomes empty while doing so.            int k = Math.min(delta, workQueue.size());	    //如果核心线程池数量增大,且工作线程未达到核心线程池数量,则添加等待执行的任务数量	    //和新增核心线程池数量中最小者数量的空闲工作线程            while (k-- > 0 && addWorker(null, true)) {                if (workQueue.isEmpty())                    break;            }        }    }//设置最大线程池数量/**     * Sets the maximum allowed number of threads. This overrides any     * value set in the constructor. If the new value is smaller than     * the current value, excess existing threads will be     * terminated when they next become idle.     *     * @param maximumPoolSize the new maximum     * @throws IllegalArgumentException if the new maximum is     *         less than or equal to zero, or     *         less than the {@linkplain #getCorePoolSize core pool size}     * @see #getMaximumPoolSize     */    public void setMaximumPoolSize(int maximumPoolSize) {        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)            throw new IllegalArgumentException();        this.maximumPoolSize = maximumPoolSize;        if (workerCountOf(ctl.get()) > maximumPoolSize)	    //如果当前工作线程数量大于最大线程池数量,则中断空闲工作线程            interruptIdleWorkers();    }

总结:
[color=blue]关闭线程池,首先检查工作线程运行时访问权限,更新线程状态为SHUTDOWN,中断空闲工作线程,最后尝试关闭线程池。立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。我们可以用awaitTermination来等待任务执行完。[/color]
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/812574
推荐阅读
相关标签