这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情。并发课题对于Java来说是一个又重要又难的一大块,除非气定神闲、精力满满,否则我本身是不敢随便写这个话题的。随便出一个生涩、浮于表面的文章,我觉得意义不大。所以一直就搁置到现在。这一次开启,有一个小小的契机:我自己面试中,已经被问烂了的构造函数的几个参数有什么意义,这种问题,发现其实很多人并不了解。就着这次的机会,我就重开这个课题。
一、基本的一些准备知识
李老爷子的线程池堪称经典,老爷子也因此风靡全球开发者圈子,阅读了源码,你才能感受到什么叫做编程思想,我们普普通通的CRUD简直都弱爆了!老爷子牛逼点也在于,源码中的注释非常完备,这不得不佩服:思想牛逼一方面,能把思想完善、由浅入深的表述出来,我觉得更牛逼!其中对于这个ThreadPoolExecutor的基础知识的了解,我觉得完全可以看注释就可以学全了。要想了解线程池源码,我们先要了解如下几个方面:
- 线程池的几种状态
- 线程池的状态表述
- 状态的使用的方式
- 线程池的构造函数
1、线程池的几种状态
最关键的是几种扭转的状态,让我们直接上老爷子的注释:
- /* The runState provides the main lifecycle control, taking on values:
- *
- * RUNNING: Accept new tasks and process queued tasks
- * SHUTDOWN: Don't accept new tasks, but process queued tasks
- * STOP: Don't accept new tasks, don't process queued tasks,
- * and interrupt in-progress tasks
- * TIDYING: All tasks have terminated, workerCount is zero,
- * the thread transitioning to state TIDYING
- * will run the terminated() hook method
- * TERMINATED: terminated() has completed
- *
- * The numerical order among these values matters, to allow
- * ordered comparisons. The runState monotonically increases over
- * time, but need not hit each state. The transitions are:
- *
- *
- * (下面是几种转态转换的根本的基本方式,很简单的英文,不用翻译)
- * RUNNING -> SHUTDOWN
- * On invocation of shutdown(), perhaps implicitly in finalize()
- * (RUNNING or SHUTDOWN) -> STOP
- * On invocation of shutdownNow()
- * SHUTDOWN -> TIDYING
- * When both queue and pool are empty
- * STOP -> TIDYING
- * When pool is empty
- * TIDYING -> TERMINATED
- * When the terminated() hook method has completed
- *
- * Threads waiting in awaitTermination() will return when the
- * state reaches TERMINATED.
- */
- RUNNING:接受新的任务,并且也继续运行阻塞队列里面的任务
- SHUTDOWN:不接受新的任务了,但是可以继续执行阻塞队列里面的任务
- STOP:不接受新的任务了,也不运行阻塞队列里面的任务了,并且去打断正在执行的任务
- TIDYING:所有的任务都已经终止了,workerCount(任务数)是0,线程池运行到了这个状态之后,会去调terminated()这个方法
- TERMINATED:terminated()这个方法执行完成
2、线程池的状态表述
同样,上源码:
- // ctl这是一个很重要的参数,使用位标记方式,将当前线程池状态和当前线程池创建的任务多少杂糅到了一起
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 预留三位
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 线程池最大线程大小:(2^29)-1 (about 500 million)
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
- // 线程池状态位,使用int的高三位进行储存
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
-
- // 通过ctl值计算出运行线程池状态值
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 通过ctl值计算出线程池当前任务多少的值
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 通过运行状态和任务多少的两个值,生成ctl这个包装的值
- private static int ctlOf(int rs, int wc) { return rs | wc; }
思想也很简单:大家熟知的int类型,是占四字节,32位的。为了状态操作的高效与空间节约,老爷子使用了位操作来控制。其中32位的高三位用来存储线程池的状态;低29位用来控制当前线程池有多少个线程。上面的源码就是对位操作的基本实现(都是基本的位操作,我这里不在累赘)
3、状态的使用的方式
这里会给出几个源码中,对状态和线程数量操控的方式:
- // (c:ctl,s:state)当前线程池的状态,是不是小于给定的状态
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
-
- // (c:ctl,s:state)当前线程池的状态,是不是大于等于给定的状态
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
-
- // 当前线程池的状态是RUNNING的吗
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
-
- // 使用CAS原理对当前线程池线程数量值加一
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
-
- // 使用CAS原理对当前线程池线程数量值减一
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
-
- // 使用CAS原理对当前线程池线程数量值减一,直到成功为止
- private void decrementWorkerCount() {
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
下面的源码是对线程状态修改源码:
- private void advanceRunState(int targetState) {
- // 这是一个死循环,直到修改成功才break
- for (;;) {
- int c = ctl.get();
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
这里有两个判断条件,只要一个成功就会break循环
- runStateAtLeast:如果当前状态和要设置的状态相等,或者比要设置的状态大。说明线程池状态的不可逆,说明,如果一个线程池已经是SHUTDOWN了,是不能设置回RUNNING状态的
- compareAndSet:CAS设置ctl值。根据短路原理,到了这个方法执行已经说明当前状态是小于要设置状态了,所以可以修改ctl的状态位值。如果设置失败,返回false,继续死循环。成功,break
3、线程池的构造函数
常用的JDK推荐的,或者各大“api使用”书籍介绍的,无非都是下面的几个方法,进行创建线程池:
- Executors.newCachedThreadPool
- Executors.newFixedThreadPool
- Executors.newScheduledThreadPool
- Executors.newSingleThreadExecutor
可是当我们深入源码,才发现:这几个方法的内部无非都调用了ThreadPoolExecutor的构造函数,即使是newScheduledThreadPool这个方法,表面调用了ScheduledThreadPoolExecutor类,可是深入源码才发现:ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor,并且构造函数使用了super进行了构建。这就给我们了一个很好的切入口:只要研究ThreadPoolExecutor构造函数就行。进一步,还会发现,ThreadPoolExecutor有四个构造函数,入参不一样,也都不约而同,最终调用了入参最多的那个(入参少的时候使用默认值),我们看看ThreadPoolExecutor中入参最多的构造函数的源码:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- // 对入参进行合法性校验
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- // 获取系统安全管理器(不做分析)
- this.acc = System.getSecurityManager() == null ?
- null : AccessController.getContext();
- // 核心几大参数的赋值操作
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
- corePoolSize:核心运行线程数
- maximumPoolSize:最大运行运行程
- workQueue:阻塞队列
- keepAliveTime:当线程大于核心线程数时,且阻塞队列没有元素,最大等待时间
- threadFactory:生成线程的工厂类
- handler:超出线程池最大承受能力之后的失败策略方法对象
对于线程池表现出来的各种特性,就是通过这几个参数控制的,所以很关键!
二、线程池的基本执行图解
对于线程池源码,我们先主要从execute执行方法入手进行分析,下面主要用一个图进行大致流程的展示:
配合上代码,我们先指出对应代码的大致位置,我们有个大体的概念比较好:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
-
- int c = ctl.get();
- // 下面大约就是①的过程
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 下面大约就是②的过程
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 下面大约就是③的过程
- else if (!addWorker(command, false))
- // 下面大约就是④的过程
- reject(command);
- }
三、线程池细节源码分析
1、addWorker方法
a、addWorker,我们先来看看
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 死循环,在某些条件下,会返回挥着break
- retry:
- for (;;) {
- int c = ctl.get();
- // 当下线程池运行状态
- int rs = runStateOf(c);
-
- // 下面是对线程池状态的一些列判断
-
- // 这个判断稍微有点绕,返回false的条件是:
- // 线程池是SHUTDOWN、STOP、TIDYING、TERMINATED其中的任意一个状态
- // 且(线程池状态为STOP、TIDYING、TERMINATED 或者 firstTask不为空 或者 阻塞队列为空)
- // 同样是返回false,添加失败
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
-
- for (;;) {
-
- // 下面是对线程池当下线程数的一系列判断
-
- int wc = workerCountOf(c);
- // 线程数如果大于等于最大线程池允许数量((2^29)-1)或者大于等于设置的
- // 核心线程数或者最大线程数
- // 同样是返回false,添加失败
- if (wc >= CAPACITY ||
- // 这里也是一个玄妙之处:
- // 如果传入的core为true情况,可见线程数量依赖值为核心线程数
- // 如果为false,数量依赖于最大的线程数。通过这个core值,就可以
- // 控制什么时候,依赖什么值进行创建线程
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 下面是CAS的经典操作:
-
- // 这个第一个if如果设置成功,就结束整体的外部循环。没成功说明有竟态
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 再次获取一遍ctl,算是double check
- c = ctl.get();
- // 这里判断,如果为true,说明线程池当下状态已经被修改
- // 要重新通过外层循环的状态判断来确定返回值,所以continue了
- if (runStateOf(c) != rs)
- continue retry;
-
- // 到了这里,说明线程池状态没有被翻转,那就是说当前线程数因为竟态
- // 原因没有设置成功,那直接内部循环在执行一次,继续进行CAS的设置
- }
- }
-
- // 下面是启动线程的主要代码
- // 线程是否启动成功
- boolean workerStarted = false;
- // 线程是否添加成功
- boolean workerAdded = false;
- // 封装传入的线程对象Worker,这个也是很关键的对象,接下来会分析
- Worker w = null;
- try {
- // 封装线程的初始化工作,下面会分析
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- // 当下线程池的主锁,最大的一把锁,上锁期间主要对线程池容器进行维护
- // 这个容器是一个HashSet,保存当前运行的封装线程Worker
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 再次获取线程池当前状态,因为很有可能期间被人更改了
- int rs = runStateOf(ctl.get());
-
- // rs < SHUTDOWN:线程池是RUNNING状态
- // rs == SHUTDOWN && firstTask == null:
- // 线程池是SHUNTDOWN且firstTask为空,这种情况主要是因为
- // 线程池再SHUNDOWN状态了,可是阻塞队列还有没运行完的线程
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- // 保持一个线程最大线程池状态
- largestPoolSize = s;
- // 到这里线程添加到容器成功
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // 如果添加容器成功,就启动封装的线程,且设置启动标识位为true
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- // 如果封装线程启动失败,会进行一系列的失败处理
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
b、下面是对addWorkerFailed方法的解说
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- // 同样的,获取主锁
- mainLock.lock();
- try {
- // 不为空的情况将封装线程从容器中移除
- // 为空的情况,主要是new Worker的时候报错
- if (w != null)
- workers.remove(w);
- // 循环登陆,减少一个线程数
- decrementWorkerCount();
- // 试着看看,能不能结束线程池,就是把线程池TERMINASTE了
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
c、下面是tryTerminate方法的解说
- final void tryTerminate() {
- // 发现没,又是个死循环,老爷子很喜欢这种方式啊,而且是用for!
- for (;;) {
- int c = ctl.get();
- // 三种情况直接方法返回:
- // 1、正处在RUNNING状态的线程池
- // 2、线程池的状态是TIDYING或者是TERMINATE
- // 3、线程池是SHUNDOWN状态的,但是阻塞队列不为空
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 走到这里,线程池的状态可能是:SHUTDOWN(且阻塞队列空)、STOP
- // 如果此时线程数不为0的话,要进行打断操作了
- if (workerCountOf(c) != 0) {
- // 这里入参的意思是只打断容器里第一个封装线程里面的线程
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
-
- // 执行到这里,说明线程池的状态是:SHUTDOWN(阻塞队列为空)、STOP
- // 此时线程数为0,说明线程池可以进行终结操作了
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // CAS先将线程池设置成TIDYING的状态
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- // 执行用户实现的terminated方法
- terminated();
- } finally {
- // 无论怎么样都会将线程池设置成TERMINATED状态
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- // 到这里说明终结成功,不过根据Java原理,返回前
- // 先执行finally里面的解主锁的方法
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // 如果能执行到这里,说明CAS设置TIDYING状态失败
- // 说明是竟态状态
- }
- }
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- //线程没有被打断且获取到封装线程的锁
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
- // 用户自己实现的结束方法
- protected void terminated() { }
到这里,已经讲完了一个很主要的内部方法:addWorker。下面我们对封装线程对象Worker进行讲解
2、Worker对象
这个东西,是一个很很很很很很很很经典的Java并发模型:AQS。这片文章不做AQS的讲解,放到后续
a、具体的Worker对象张什么样
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- private static final long serialVersionUID = 6138294804551838833L;
- // 这个就是最终启动的线程,看到了吧
- final Thread thread;
- // 我们传入的Runnable对象被放到了这里
- Runnable firstTask;
- // 这里记录完成的任务数。
- // 这里说明下一个理念:一个Worker,是最终被运行的Runnanle对象
- // 在很大的情况下(下面做分析)Worker这个线程会一直存在
- // 存在的意义是不断读取阻塞队列里面存储的我们传进来的Runnable对象
- // 然后运行。所以我们实现的Runnable对象的run方法,最终不是被
- // start方法调用执行的,而是通过直接调用执行的!
- volatile long completedTasks;
-
- Worker(Runnable firstTask) {
- setState(-1); // AQS对象状态!也是一大难的东西!
- this.firstTask = firstTask;
- // 这里的getThreadFactory方法使用的就是我们传入的threadFactory
- // 对象
- this.thread = getThreadFactory().newThread(this);
- }
-
- public void run() {
- // 看到了吧,这里执行了外层对象的方法,去直接调用传入的
- // Runnable中的run方法,等下解说
- runWorker(this);
- }
-
- // 下面的几个函数都是AQS必须要实现的方法,这里不累赘
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
-
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
-
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
-
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
-
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
-
- public ThreadFactory getThreadFactory() {
- return threadFactory;
- }
b、默认的线程工厂DefaultThreadFactory:
- static class DefaultThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- DefaultThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- // 这里记录了线程名的前缀,可见会将线程池序号进行递增操作
- namePrefix = "pool-" +
- poolNumber.getAndIncrement() +
- "-thread-";
- }
-
- public Thread newThread(Runnable r) {
- // 这里就是生成喜闻乐见的Thread对象了,结合上面这里的r就是我们的Worker对象
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
3、runWorker方法
a、接下来又是一个关键性方法runWorker
- final void runWorker(Worker w) {
- // 获取当前运行着的Worker线程
- Thread wt = Thread.currentThread();
- // 这个就是我们当下传入的Runnable
- Runnable task = w.firstTask;
- // 置空的意思是:Worker其实是一个壳子,以后会一直运行着,不断执行其他阻塞队列
- // 里面的Runnable对象的run方法
- w.firstTask = null;
- // 这里做解锁操作,是表示下面所有操作是可以被打断的
- // 另外AQS默认情况下不做unlock操作,lock会阻塞
- w.unlock();
- // 这个标志位表示线程执行过程中有没有被打断,或者运行异常
- boolean completedAbruptly = true;
- try {
- // 这个While循环里面的语句相当关键,包含了线程池执行流程的枢纽!
- // 我先大致说一下,下面会详细分析getTask方法:
- // 主要就是判断如果当前Worker里面的Runnable对象不为空
- // 就会执行这个对象的run方法;执行完之后,还会回到这个循环
- // 再下面的finally块里面将task置空了,所以就去调用getTask方法
- // 而getTask方法是一个很大可能阻塞的方法,阻塞的原因就是等待
- // 阻塞队列里面放入对象!所以也就形成了,一个Worker对象,循环
- // 不停的执行传入的Runnable对象run方法。这也就构成了corePoolSize
- // 与maxPoolSize两个参数控制系统级别的线程多少的目的!也就是说,
- // 这就是线程池里面,“池”这个概念的由来~
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // 这里主要是判断是否要打断当前Worker所在的线程
- // 要满足两个个条件:
- // 1、当前线程池是STOP、TIDYING、TERMINATED
- // 2、当前线程是没有被打断的情况
- // 其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
- // 主要用于清除线程终端标志,因为很大可能线程池刚刚转换成STOP
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 在执行线程体之前执行的方法,用户实现
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- // 请看下面几个异常,都是直接抛了出去
- // 而并没有处理,所以处理内部异常也是
- // 线程池的一个关键点
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 在执行完线程体之后的方法,用户实现
- // 异常同时也传入这里了,所以可以自己实现一个子类
- // 自己实现这个方法,进行异常处理
- afterExecute(task, thrown);
- }
- } finally {
- // 这个地方肯定会被执行,所以无论run方法怎么样
- // Worker运行完成线程数都会加一
- task = null;
- w.completedTasks++;
- // 这里进行解锁操作
- w.unlock();
- }
- }
- // 注意代码执行到了这里说明while循环跳出来了
- // 大致有如下几种情况:
- // 1、阻塞队列里面没值了
- // 2、线程池状态翻转,便成了大于等于SHUTDOWN状态的了
- // 由于是正常结束,所以异常结束标志是false
- completedAbruptly = false;
- } finally {
- // 这里肯定会被执行,但是有两种情况跳入这个代码块
- // 1、run方法没有抛异常,completedAbruptly为false
- // 2、run方法抛异常,completedAbruptly为true
- // 下面也会进行解说
- processWorkerExit(w, completedAbruptly);
- }
- }
b、我们来看核心的getTask方法
- private Runnable getTask() {
- // 这个标志位主要用于后面的poll方法是否超时
- boolean timedOut = false;
-
- // 又来了,李老爷子!是一个死循环判断!
- for (;;) {
- int c = ctl.get();
- // 获取当前线程池运行状态
- int rs = runStateOf(c);
-
- // 如果同时符合下面两种情况,直接返回null,并减少线程数量
- // 1、线程池状态是:SHUTDOWN、STOP、TIDYING、TERMINTED
- // 2、线程池的状态是STOP、TIDYING、TERMINTED或者队列为空
- // 这预示着线程池要进行关闭操作了,此Worker要结束声明周期!
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- // 这里是循环指导CAS成功设置
- decrementWorkerCount();
- return null;
- }
-
- // 这里获取当前线程池的线程数
- int wc = workerCountOf(c);
-
- // 这个标识位要解释解释:
- // 1、allowCoreThreadTimeOut成员变量,可设置
- // 2、wc > corePoolSize线程数是否大于核心线程数
- // 简单说就是:这个标志位控制线程池的收缩!
- // 很关键是不是!
- // 正常情况下只要超出核心线程数的线程才要进行收缩的
- // 收缩的条件是根据传入的阻塞队列超时时间
- // 但是我们可以通过设置allowCoreThreadTimeOut为true
- // 这样核心线程也可以收缩!
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- // 这里maximumPoolSize不能为零和负数
- // 这里判断很复杂,简单理解就是:
- // 如果线程池线程数超出了设置的最大线程数或者阻塞队列被打断了
- // 且当前Worker所在线程不是最后一个线程或者阻塞队列空了。
- // 这里如果wc>maximumPoolSize,那一定大于1,那就说明
- // 一定会执行if方法体;如果小于等于maximumPoolSize情况,
- // 那就说明是线程合理的收缩,这种时候,只有allowCoreThreadTimeOut
- // 被置位或者线程数大于核心线程数,当然如果要是只有一个线程数且队列不为空
- // 的情况也不能收缩,要保证有封装线程能执行阻塞队列里面线程
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- // 这里如果设置成功说明可以终结这个Worker了
- return null;
- // 这里是continue,因为有竟态
- continue;
- }
-
- try {
- // 注意这里的timed的取值,timed为true的时候是:
- // 1、allowCoreThreadTimeOut被置位
- // 2、或者线程数大于核心线程数
- // 其他情况是直接take方法,直接阻塞的。除非被打断
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- // 正常情况是拿到了Runnable,直接返回了
- return r;
- // 这种是阻塞队列超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- // 打断情况并非阻塞队列超时,所以这里设置成false
- timedOut = false;
- }
- }
- }
c、下面是对processWorkerExit分析
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 这个判断说明当前Worker所在的线程执行Runnable中的run方法抛了异常
- // 所以这个时候,要将线程数减一
- if (completedAbruptly)
- decrementWorkerCount();
-
- // 获取主锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 将当前Worker存在期间一共执行了多少个Runnable累加到
- // 线程池的统计字段上面
- completedTaskCount += w.completedTasks;
- // 将封装线程从容器中移除
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 上面的方法在这里执行了,分析请看上面
- tryTerminate();
-
- int c = ctl.get();
- // 如果现在线程池的状态是:RUNNING、SHUTDOWN,执行if代码块
- if (runStateLessThan(c, STOP)) {
- // 如果没有抛异常情况,执行这个if代码块
- if (!completedAbruptly) {
- // 这个代码块,主要是要保证如果阻塞队列中还有Runnable线程
- // 而又走到了即将结束当前WOrker的代码,线程池要保证,至少还有
- // 运行着的Worker对阻塞队列中的线程进行处理,执行
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- // 为0的情况表示允许核心线程收缩,或者核心线程直接设成了0
- // 阻塞队列不为空要保证最小可用的Worker为1
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 判断当前线程数是不是比最小的还要小
- if (workerCountOf(c) >= min)
- // 这里表明,有足够的Worker去执行
- return;
- // 代码运行到这里,表明没有足够的Worker了,下面去创建
- }
- // 这里添加一个Worker的原因是:
- // RUNNING和SHUTDOWN状态都是允许继续执行阻塞队列中的线程的
- // 所以这里创建一个firstTask为null,依赖getTast去获取队列中的
- // 线程去执行。false的原因是创建依据maximumPoolSize
- addWorker(null, false);
- }
- }
四、结尾
到此为止,线程池的主要源码,都分析了,剩下,还有几个附加功能源码,留着接下来有精力再一点点回补吧。当然,对于下一步的深入,就要到AQS的分析了。可见,这里的Worker本身就是一个AQS,在Worker上面调用lock或是unlock方法,都是进入一个内部的阻塞队列的管理的。其中最最底层,还会涉及到操作系统中线程的同步原语:mutex!接下来,我会分析那个,敬请期待!