赞
踩
目录
3.12、interruptIdleWorkers方法源码分析
一般在开发过程中,一个功能是运行时长太久了,一般是通过什么方式去优化的? 异步/多线程,对于一个业务方法而言,如果其中的调用链太长势必会引起程序运行时间延长,导致整个系统吞吐来量下降,而我们使用多线程方式来对该方法的调用链进行优化,对于一些耦合度不是特别高的调用关系可以直接通过多线程来走异步的方式进行处理,大大的缩短了程序的运行时长,但是如果我们的多线程创建方式是通过 new Thread();
这种方式去进行显式创建的话它真的可以吗?答案是不可以,Why?答案如下:
如果在生产环境使用new Thread();
这种方式去进行显式创建线程会带来什么后果?
OOM: 如果当前方法突遇高并发情况,假设此时来了1000个请求,而按传统的网络模型是BIO,此时服务器会开1000个线程来处理这1000个请求(不考虑WEB容器的最大线程数配置),当1000个请求执行时又会发现此方法中存在new Thread();
创建线程,此时每个执行请求的线程又会创建一个线程,此时就会出现1000*2=2000个线程的情况出现,而在一个程序中创建线程是需要向JVM申请内存分配的,但是此时大量线程在同一瞬间向JVM申请分配内存,此时会很容易造成内存溢出(OOM)的情况发生。
资源开销与耗时: Java对象的生命周期大致包括三个阶段:对象的创建,对象的使用,对象的清除。因此,对象的生命周期长度可用如下的表达式表示:Object = O1 + O2 +O3。其中O1表示对象的创建时间,O2表示对象的使用时间,而O3则表示其清除(垃圾回收)时间。由此,我们可以看出,只有O2是真正有效的时间,而O1、O3则是对象本身的开销。当我们去创建一个线程时也是一样,因为线程在Java中其实也是一个Thread类的实例,所以对于线程而言,其实它的创建(申请内存分配、JVM向OS提交线程映射进程申请、OS真实线程映射)和销毁对资源是开销非常大的并且非常耗时的。
不可管理性: 对于new Thread();
的显示创建出来的线程是无法管理的,一旦CPU调度成功,此线程的可管理性几乎为零。
那么我们使用线程池能给我们带来什么好处?
降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗。
提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
提高线程的可管理性:线程池可以统一管理、分配、调优和监控。
而在Java中为我们提供四种原生线程池,它们都是基于ThreadPoolExecutor类实现的,所以ThreadPoolExecutor类这也是我们待会儿分析线程池原理时的重点~
当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?下面就先来看一下它的主要处理流程。
首先判断核心的线程数是否已满,如果没有满,那么就去创建一个线程去执行该任务;否则请看下一步
如果线程池的核心线程数已满,那么就继续判断任务队列是否已满,如果没满,那么就将任务放到任务队列中;否则请看下一步
如果任务队列已满,那么就判断线程池是否已满,如果没满,那么就创建线程去执行该任务;否则请看下一步;
如果线程池已满,那么就根据拒绝策略来做出相应的处理;
在Java中,JDK通过Executors类为我们提供了四种封装好的线程池类型(ForkJoinPool不在本章探讨范围之内),源码如下:
- //创建一个定长的线程池
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
- //创建一个单线程的线程池
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
- }
- //创建一个可缓存支持灵活回收的线程池
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
- }
- //创建一个支持周期执行任务的线程池
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }

在上面的源码中,其实我们通过观察发现JDK为我们提供的四种线程池内部都是通过封装ThreadPoolExecutor类的构造函数来进行线程池的初始化的,所以我们先来理清楚线程池“家族”体系。
从上图中我们可以得知,线程池的最上层接口是Executor,而这个接口定义了一个核心方法execute(Runnable command),当我们使用它时,需要传递一个Runnable类型的异步任务作为参数。我们看一下Executor接口的定义:
- public interface Executor {
-
- /**
- * Executes the given command at some time in the future. The command
- * may execute in a new thread, in a pooled thread, or in the calling
- * thread, at the discretion of the {@code Executor} implementation.
- *
- * @param command the runnable task
- * @throws RejectedExecutionException if this task cannot be
- * accepted for execution
- * @throws NullPointerException if command is null
- */
- void execute(Runnable command);
- }
而Executor接口是一个函数式接口,其中只定义了一个方法,但是我们在使用线程池的时候为什么能够调用的方法却会有那么多呢?因为还有一个ExecutorService接口,它继承了Executor接口作为Executor接口的子接口,为Executor接口提供了很多拓展方法。我们接着看ExecutorService接口的实现:
- public interface ExecutorService extends Executor {
- // 等待线程池执行完成已接收的任何后关闭线程池,将线程池置为SHUNTDOWM状态
- void shutdown();
- // 尝试主动终止线程池中的所有正在执行的任务并返回未执行的任务列表,
- // 将线程池置为STOP状态
- List<Runnable> shutdownNow();
- // 判断线程池是否已关闭:线程池调用过shutdown或者shutdownNow后返回true
- boolean isShutdown();
- // 判断线程池中的子线程是否已全部终止
- // 当调用shutdown后全部任务执行完成返回true或调用shutdownNow成功后返回true
- boolean isTerminated();
- // 配合shutdown使用,在调用shutdown后调用该方法,让线程池在指定时间内关闭,
- // 不管任务是否执行完成,在指定时间内还在执行任务则抛出异常中断线程
- // 注意:有时能够关闭线程池单并不能完全保证线程池中子线程停止执行
- // 比如子线程中用到 BufferedReader,那么需要配合shutdownNow主动中断所有子线程
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- // 向线程池提交一个Callable类型的异步任务,当线程池执行后返回执行结果
- <T> Future<T> submit(Callable<T> task);
- // 向线程池提交一个Runnable类型的异步任务,线程池执行完成后将返回指定类型的执行结果
- <T> Future<T> submit(Runnable task, T result);
- // 向线程池提交一个Runnable类型的异步任务,线程池执行完成后执行的结果
- Future<?> submit(Runnable task);
- // 传入一个Collection类型的异步任务集合,批量执行并返回执行结果
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- // 传入一个Collection类型的异步任务集合,在指定的时间内批量执行并返回执行
- // 结果,如果超时则抛出异常中断线程
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- // 传入一个Collection类型的异步任务集合,返回第一个执行完成的结果并终止其他线程
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- // 传入一个Collection类型的异步任务集合,在指定的时间内返回第一个执行完成的结果
- // 并终止其他线程,如果超时则抛出异常中断线程
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }

Executor接口有一个子接口ExecutorService,而AbstracExecutorService类又实现了ExecutorService接口,而ThreadPoolExcutor正是AbstrcExecutorService的子类。
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
- public class ThreadPoolExecutor extends AbstractExecutorService {
-
- // 控制变量-存放状态和线程数
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
- // 任务队列,必须是阻塞队列
- private final BlockingQueue<Runnable> workQueue;
-
- // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
- private final HashSet<Worker> workers = new HashSet<>();
-
- // 全局锁
- private final ReentrantLock mainLock = new ReentrantLock();
-
- // awaitTermination方法使用的等待条件变量
- private final Condition termination = mainLock.newCondition();
-
- // 线程工厂,用于创建新的线程实例
- private volatile ThreadFactory threadFactory;
-
- // 拒绝执行处理器,对应不同的拒绝策略
- private volatile RejectedExecutionHandler handler;
-
- // 空闲线程等待任务的时间周期,单位是纳秒
- private volatile long keepAliveTime;
-
- // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
- private volatile boolean allowCoreThreadTimeOut;
-
- // 核心线程数
- private volatile int corePoolSize;
-
- // 线程池容量
- private volatile int maximumPoolSize;
-
- // 省略其他代码
- }

下面看参数列表最长的构造函数:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }

可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:
corePoolSize:int类型,核心线程数量。
maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。
keepAliveTime:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。
unit:TimeUnit类型,keepAliveTime参数的时间单位,实际上keepAliveTime最终会转化为纳秒。
workQueue:BlockingQueue<Runnable>类型,等待队列或者叫任务队列。
threadFactory:ThreadFactory类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。
handler:RejectedExecutionHandler类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:
AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。
DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。
DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。
CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。
要了解线程池,我们首先要了解的线程池里面的状态控制的参数 ctl,这个线程池的状态控制参数是一个原子操作的 AtomicInteger,这个ctl包含两个参数 :
runState
:当前线程池的状态
workerCount
:激活(工作)的线程数
ctl
是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态。
状态控制主要围绕原子整型成员变量ctl,关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3;
- private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
-
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
-
- // 通过ctl值获取运行状态
- private static int runStateOf(int c) { return c & ~COUNT_MASK; }
- // 通过ctl值获取工作线程数
- private static int workerCountOf(int c) { return c & COUNT_MASK; }
-
- // 通过运行状态和工作线程数计算ctl的值,或运算
- private static int ctlOf(int rs, int wc) { return rs | wc; }
-
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
-
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
-
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
-
- // CAS操作线程数增加1
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
-
- // CAS操作线程数减少1
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
-
- // 线程数直接减少1
- private void decrementWorkerCount() {
- ctl.addAndGet(-1);
- }

小结一下线程池的运行状态常量:
采用变量ctl
作为状态控制的好处:
ctl
这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
ThreadPoolExecutor的运行状态有5种,其生命周期转换如下图所示:
线程池异步执行任务的方法实现是ThreadPoolExecutor#execute(),源码如下:
- public void execute(Runnable command) {
- //如果传递的任务为空则抛出空指针异常
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- //如果工作线程数小于核心线程数,
- if (workerCountOf(c) < corePoolSize) {
- //执行addWork,提交为核心线程,提交成功return。提交失败重新获取ctl
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- //如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,且将新线程向阻塞队列提交。
- if (isRunning(c) && workQueue.offer(command)) {
- //这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
- //recheck 需要再次检查,主要目的是判断加入到阻塞队里中的线程是否可以被执行
- int recheck = ctl.get();
- //如果线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略
- if (!isRunning(recheck) && remove(command))
- reject(command);
- // 如果线程池的工作线程为零,则调用addWoker提交任务
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //添加非核心线程失败,拒绝
- else if (!addWorker(command, false))
- reject(command);
- }

这里简单分析一下整个流程:
如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。
如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。
如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务。
这里是一个疑惑点:为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。
boolean addWorker(Runnable firstTask, boolean core)
方法的第一的参数可以用于直接传入任务实例,第二个参数用于标识将要创建的工作线程是否核心线程。方法源码如下:
- // 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- // 注意这是一个死循环 - 最外层循环
- for (int c = ctl.get();;) {
- // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
- // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
- // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
- // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP)
- || firstTask != null
- || workQueue.isEmpty()))
- return false;
- // 注意这也是一个死循环 - 二层循环
- for (;;) {
- // 这里每一轮循环都会重新获取工作线程数wc
- // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
- // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
- if (workerCountOf(c)
- >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
- return false;
- // 成功通过CAS更新工作线程数wc,则break到最外层的循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
- c = ctl.get(); // Re-read ctl
- // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
- if (runStateAtLeast(c, SHUTDOWN))
- continue retry;
- // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 标记工作线程是否启动成功
- boolean workerStarted = false;
- // 标记工作线程是否创建成功
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
- // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
- // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
- // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
- // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
- // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
- if (isRunning(c) ||
- (runStateLessThan(c, STOP) && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // 把创建的工作线程实例添加到工作线程集合
- workers.add(w);
- int s = workers.size();
- // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
- if (s > largestPoolSize)
- largestPoolSize = s;
- // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
- if (workerAdded) {
- t.start();
- // 标记线程启动成功
- workerStarted = true;
- }
- }
- } finally {
- // 线程启动失败,需要从工作线程集合移除对应的Worker
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
-
- // 添加Worker失败
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 从工作线程集合移除之
- if (w != null)
- workers.remove(w);
- // wc数量减1
- decrementWorkerCount();
- // 基于状态判断尝试终结线程池
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }

整个流程如图所示:
先看看 addWork()
的两个参数,第一个是需要提交的线程Runnable firstTask
,第二个参数是 boolean
类型,表示是否为核心线程。 execute()
中有三处调用了 addWork()
我们逐一分析。
第一次,条件if (workerCountOf(c) < corePoolSize)
这个很好理解,工作线程数少于核心线程数,提交任务。所以addWorker(command, true)
。
第二次,如果 workerCountOf(recheck) == 0
如果worker的数量为0,那就 addWorker(null,false)
。为什么这里是 null ?之前已经把command
提交到阻塞队列了workQueue.offer(command)
。所以提交一个空线程,直接从阻塞队列里面取就可以了。
第三次,如果线程池没有RUNNING
或者offer
阻塞队列失败,addWorker(command,false)
,很好理解,对应的就是,阻塞队列满了,将任务提交到,非核心线程池。
上面的分析逻辑中需要注意一点,Worker实例创建的同时,在其构造函数中会通过ThreadFactory创建一个Java线程Thread实例,后面会加锁后二次检查是否需要把Worker实例添加到工作线程集合workers中和是否需要启动Worker中持有的Thread实例,只有启动了Thread实例实例,Worker才真正开始运作,否则只是一个无用的临时对象。Worker本身也实现了Runnable接口,它可以看成是一个Runnable的适配器。
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- */
- private static final long serialVersionUID = 6138294804551838833L;
-
- // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null
- final Thread thread;
- // 保存传入的Runnable任务实例
- Runnable firstTask;
- // 记录每个线程完成的任务总数
- volatile long completedTasks;
-
- // 唯一的构造函数,传入任务实例firstTask,注意可以为null
- Worker(Runnable firstTask) {
- // 禁止线程中断,直到runWorker()方法执行
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例
- this.thread = getThreadFactory().newThread(this);
- }
-
- // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法
- public void run() {
- runWorker(this);
- }
-
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
- // 是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
-
- // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
-
- // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
-
- // 加锁
- public void lock() { acquire(1); }
-
- // 尝试加锁
- public boolean tryLock() { return tryAcquire(1); }
-
- // 解锁
- public void unlock() { release(1); }
-
- // 是否锁定
- public boolean isLocked() { return isHeldExclusively(); }
-
- // 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);
,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);
将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。
通过前面分析线程池的工作原理我们可以得知一个结论:在线程池内部关于线程的调度执行都是被封装成一个Worker对象来操作的。而当我们使用Worker.thread.start()启动线程时,JVM会调用Worker中重写的run()方法执行,而Worker.run()方法源码如下:
- /** Delegates main run loop to outer runWorker */
- // 将线程运行主逻辑交给外部 Worker.runWorker()
- public void run() {runWorker(this);}
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
- final void runWorker(Worker w) {
- // 获取当前线程,实际上和Worker持有的线程实例是相同的
- Thread wt = Thread.currentThread();
- // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
- Runnable task = w.firstTask;
- // 设置Worker中持有的初始化时传入的任务对象为null
- w.firstTask = null;
- // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
- w.unlock(); // allow interrupts
- // 记录线程是否因为用户异常终结,默认是true
- boolean completedAbruptly = true;
- try {
- // 1. 如果线程池外部传递了任务则直接执行外部传递的任务
- // 2. 如果没有获取到外部传递进来的任务则调用getTask()去队列中获取任务并执行
- // 2.1. 如果在任务队列中获取到了任务则直接执行已经获取的任务
- // 2.2. 如果任务队列为空,没有任务则反复执行空循环阻塞当前线程死亡
- // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
- while (task != null || (task = getTask()) != null) {
- // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态
- // 否则,要保证当前线程不是中断状态
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 钩子方法,任务执行前
- beforeExecute(wt, task);
- try {
- // 调用任务的run方法,而不是start()方法,因为Worker本身就是一个线程类
- task.run();
- // 钩子方法,任务执行后 - 正常情况
- afterExecute(task, null);
- } catch (Throwable ex) {
- // 钩子方法,任务执行后 - 异常情况
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- // 清空task临时变量,这个很重要,否则while会死循环执行同一个task
- task = null;
- // 执行完成后自增当前工作线程执行的任务数量
- w.completedTasks++;
- // Worker解锁,本质是AQS释放资源,设置state为0
- w.unlock();
- }
- }
- // 如果线程能够执行到最后一行代表线程执行过程中没有由于发生异常导致跳出循环,将 突然结束 标志改为false
- completedAbruptly = false;
- } finally {
- // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
- // 执行回收工作线程的逻辑
- processWorkerExit(w, completedAbruptly);
- }
- }

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
总结一下runWorker方法的执行过程:
while循环不断地通过getTask()方法获取任务;
getTask()方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
调用task.run()
执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
如上就是关于线程池复用的原理,简单来说就是通过一个死循环让当前线程一直处于运行状态,阻止OS将当前工作线程回收,从而做到线程的复用。而关于死循环的条件则比较简单,判断task是否为空,在调用方法执行的时候会先获取外部传递的任务,如果没有获取到外部传递的任务则调用getTask()方法获取任务队列中的任务并执行。
接下来分析一下从任务队列中获取任务的getTask()方法和处理线程退出的后续工作的方法processWorkerExit()。
getTask方法用来从阻塞队列中取任务,代码如下:
- private Runnable getTask() {
- // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- /*
- * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
- * 1. rs >= STOP,线程池是否正在stop;
- * 2. 阻塞队列是否为空。
- * 如果以上条件满足,则将workerCount减1并返回null。
- * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
- */
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- // timed变量用于判断是否需要进行超时控制。
- // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
- // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
- // 对于超过核心线程数量的这些线程,需要进行超时控制
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- /*
- * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
- * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
- * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
- * 如果减1失败,则返回重试。
- * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
- */
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- /*
- * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
- * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
- *
- */
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- // 如果 r == null,说明已经超时,timedOut设置为true
- timedOut = true;
- } catch (InterruptedException retry) {
- // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
- timedOut = false;
- }
- }
- }

在getTask()方法中的逻辑也比较简单,前期校验线程池状态,这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
值得注意的是这里使用的是阻塞时获取方式,也就代表如果任务队列中没有任务,当前线程会阻塞等待,直到任务队列中有新的任务时才会获取并返回执行,不过如果线程池设置了存活时间,那么当前线程会阻塞到存活时间的阈值,如果超出存活时间会返回null。而如果返回null,则在runWorker方法中会执行processWorkerExit,即关闭该工作线程,从而实现了线程池的另一个功能: 线程池内线程空闲时间超过给定的存活时间时自动回收该线程资源。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
下面我们再来看看processWorkerExit方法的实现:
processWorkerExit()方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法才算真正的终结)。
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果completedAbruptly=false,说明是由getTask返回null导致的,WorkerCount递减的操作已经执行
- // 如果completedAbruptly=true,说明是由执行任务的过程中发生异常导致,需要进行WorkerCount递减的操作
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
- completedTaskCount += w.completedTasks;
- // 从workers中删除当前worker,对workers更新需要加mainLock锁
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
-
- // 根据线程池状态判断是否结束线程池
- // 见下一小点分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
- tryTerminate();
-
- int c = ctl.get();
- // 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
- // 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
- // 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- // 如果允许核心线程超时,最小值为0,否则为corePoolSize
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- // 如果最小值为0,同时任务队列不空,则更新最小值为1
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 工作线程数大于等于最小值,直接返回不新增非核心线程
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }

代码的后面部分区域,会判断线程池的状态,如果线程池是RUNNING或者SHUTDOWN状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:
allowCoreThreadTimeOut = true
:也就是允许核心线程超时的前提下,如果任务队列空,则会通过创建一个非核心线程保持线程池中至少有一个工作线程。
allowCoreThreadTimeOut = false
:如果工作线程总数大于corePoolSize则直接返回,否则创建一个非核心线程,也就是会趋向于保持线程池中的工作线程数量趋向于corePoolSize。
processWorkerExit()执行完毕之后,意味着该工作线程的生命周期已经完结。
每个工作线程终结的时候都会调用tryTerminate()方法:
- //根据线程池状态判断是否结束线程池
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get(); // 获取ctl
- // 如果线程池运行状态是RUNNING,或者大于等于TIDYING,或者运行状态为
- // SHUTDOWN且队列为空,则直接return返回
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 如果工作线程数不为0,则中断一个空闲线程并return
- if (workerCountOf(c) != 0) { // Eligible to terminate
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 尝试将线程池状态设置为TIDYING状态
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- //如果CAS成功,执行terminated()钩子方法
- terminated();
- } finally {
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }

这里有疑惑的地方是tryTerminate()方法的第二个if代码逻辑:工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程。方法API注释中有这样一段话:
If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate.
当满足终结线程池的条件但是工作线程数不为0,这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。
下面将会分析的shutdown()方法中会通过interruptIdleWorkers()中断所有的空闲线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 权限校验,安全策略相关判断
- checkShutdownAccess();
- // 设置SHUTDOWN状态
- advanceRunState(SHUTDOWN);
- // 中断所有的空闲的工作线程
- interruptIdleWorkers();
- // 钩子方法
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- // 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
- tryTerminate();
- }

这里shutdown()中除了tryTerminate(),其他它方法都是包裹在锁里面执行,确保工作线程集合稳定性以及关闭权限、确保状态变更串行化,中断所有工作线程并且避免工作线程“中断风暴”(多次并发调用shutdown()如果不加锁,会反复中断工作线程)。
下面就来分析一下interruptIdleWorkers方法。
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。
为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。
reject(Runnable command)方法很简单:
- final void reject(Runnable command) {
- handler.rejectedExecution(command, this);
- }
调用线程池持有的成员RejectedExecutionHandler实例回调任务实例和当前线程池实例。
本文比较详细的分析了线程池的工作流程,总体来说有如下几个内容:
分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。
线程池的复用原理,只需要理解死循环+getTask即可大致明白线程池复用的思维。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。