赞
踩
目录
核心的话,我们的Java自带的线程池就是由两种类型(那些Executors提供的固定类型只是对这两种类型改了一些参数而已):
下面我们就先来分析普通线程池 ThreadPoolExecutor 的源码。
- public class ThreadPoolExecutor extends AbstractExecutorService {
- // 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 表示低29位的值,用于表示线程池中工作线程的数量
- private static final int COUNT_BITS = Integer.SIZE - 3; // =29
- // 用于表示线程池中工作线程的数量的最大值,等于2^29-1
- private static final int CAPACITY = (1 << COUNT_BITS) - 1; // =000 11111...
- // 线程池的五种状态,高3位表示 下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
- private static final int RUNNING = -1 << COUNT_BITS; // 111 00000...
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000...
- private static final int STOP = 1 << COUNT_BITS; // 001 00000...
- private static final int TIDYING = 2 << COUNT_BITS; // 010 00000...
- private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000...
-
- // 传入ctl,计算返回线程池的状态
- // 通过与运算,将高3位的值保留,低29位的值置为0
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 传入ctl,计算返回线程池中工作线程的数量
- // 通过与运算,将高3位的值置为0,低29位的值保留
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 计算ctl的值,等于运行状态“加上”线程数量
- // 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
- private static int ctlOf(int rs, int wc) { return rs | wc; }
-
-
- // runStateLessThan方法用于判断线程池的状态是否小于某个状态
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
- // runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
- // isRunning方法用于判断线程池的状态是否是RUNNING状态
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
- // 该方法用于增加工作线程的数量,如果增加成功则返回true,否则返回false
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
- // 该方法用于减少工作线程的数量,如果减少成功则返回true,否则返回false
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
- // 该方法用于设置线程池的状态,如果设置成功则返回true,否则返回false
- private void decrementWorkerCount() {
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
- // 用于保存工作线程的集合
- private final HashSet<Worker> workers = new HashSet<Worker>();
- // 线程池的状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
- private final ReentrantLock mainLock = new ReentrantLock();
- // 线程池的条件锁,用mainLock创建的,在线程池的状态发生改变时,会使用这个条件锁来通知所有等待的线程
- private final Condition termination = mainLock.newCondition();
-
-
- // 核心参数:任务阻塞队列
- private final BlockingQueue<Runnable> workQueue;
- // 核心参数:线程池的线程工厂,用于创建线程
- private volatile ThreadFactory threadFactory;
- // 核心参数:拒绝策略,用于在线程池已经关闭或者线程数已经达到最大值的情况下,对新任务的处理策略
- private volatile RejectedExecutionHandler handler;
- // 核心参数:当线程数大于核心线程数时,多余的空闲线程存活的最长时间
- private volatile long keepAliveTime;
- // 是否允许核心线程空闲退出,默认值为false
- private volatile boolean allowCoreThreadTimeOut;
- // 核心参数:线程池的核心线程数
- private volatile int corePoolSize;
- // 核心参数:线程池的最大线程数
- private volatile int maximumPoolSize;
-
- // 这是一个动态的变量。
- // largestPoolSize表示线程池中曾经出现过的最大线程数,即在线程池的整个生命周期中,曾经有多少个线程同时处于活动状态。所以说largestPoolSize是小于等于maximumPoolSize的。
- // 它的值可以通过ThreadPoolExecutor的getLargestPoolSize()方法获取。
- private int largestPoolSize;
- // 表示已经完成的任务数量
- private long completedTaskCount;
- // 默认的拒绝策略,默认的拒绝策略就是直接抛出异常。如果构造方法中没有转入指定的拒绝策略,就会将defaultHandler赋值给handler
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
- ...
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
ThreadPoolExecutor中有五个内部类,总的来说其实就两类:1、拒绝策略(Policy) 2、工作线程类(Worker)
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。
拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。
关于拒绝策略相关的源码分析见:【线程池】线程池的拒绝策略(饱和策略)
Worker类,每一个Worker类就会绑定一个线程(Worker类有一个成员属性持有一个线程对象),可以将Worker理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker里面还会有一些统计信息,存储一些相关的数据。
关于Worker工作线程相关的源码分析见:【线程池】Java线程池的内部类Worker详解
下面我们以执行普通任务为例,讲解ThreadPoolExecutor的源码。
之前我们学过线程是有自己的生命周期的,其实线程池也是存在生命周期的。在我们讲线程池体系结构的时候,讲了一些方法,比如shutDown()/shutDownNow(),它们都是与线程池的生命周期相关联的。
我们先来看一下线程池ThreadPoolExecutor中定义的生命周期中的状态及相关方法:
- // 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 表示低29位的值,用于表示线程池中工作线程的数量
- private static final int COUNT_BITS = Integer.SIZE - 3; // =29
- // 用于表示线程池中工作线程的数量的最大值,等于2^29-1
- private static final int CAPACITY = (1 << COUNT_BITS) - 1; // =000 11111...
- // 线程池的五种状态,高3位表示 下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
- private static final int RUNNING = -1 << COUNT_BITS; // 111 00000...
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000...
- private static final int STOP = 1 << COUNT_BITS; // 001 00000...
- private static final int TIDYING = 2 << COUNT_BITS; // 010 00000...
- private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000...
- // 传入ctl,计算返回线程池的状态
- // 通过与运算,将高3位的值保留,低29位的值置为0
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 传入ctl,计算返回线程池中工作线程的数量
- // 通过与运算,将高3位的值置为0,低29位的值保留
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 计算ctl的值,等于运行状态“加上”线程数量
- // 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
- private static int ctlOf(int rs, int wc) { return rs | wc; }
-
-
- // runStateLessThan方法用于判断线程池的状态是否小于某个状态
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
- // runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
- // isRunning方法用于判断线程池的状态是否是RUNNING状态
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
从上面这段代码,我们可以得出:
(1)线程池的状态和工作线程的数量共同保存在控制变量ctl中,类似于AQS中的state变量,不过这里是直接使用的AtomicInteger,这里换成unsafe+volatile也是可以的;
(2)ctl的高三位保存运行状态,低29位保存工作线程的数量,也就是说线程的数量最多只能有(2^29-1)个,也就是上面的CAPACITY;关于ctl的详细讲解见:【线程池】线程池的ctl属性详解
(3)线程池的状态一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED;
(4)RUNNING,表示可接受新任务,且可执行队列中的任务;
(5)SHUTDOWN,表示不接受新任务,但可执行队列中的任务;
(6)STOP,表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
(7)TIDYING,所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
(8)TERMINATED,中止状态,已经执行完terminated()钩子方法;
下面我们再通过流程图来看看这些状态之间是怎么流转的:
(1)新建线程池时,它的初始状态为RUNNING,这个在上面定义ctl的时候可以看到;
(2)RUNNING->SHUTDOWN,执行shutdown()方法时;
(3)RUNNING->STOP,执行shutdownNow()方法时;
(4)SHUTDOWN->STOP,执行shutdownNow()方法时;
(5)STOP->TIDYING,执行了shutdown()或者shutdownNow()后,所有任务已中止,且工作线程数量为0时,此时会执行terminated()方法;
(6)TIDYING->TERMINATED,执行完terminated()方法后;
下面让我们一起来看看源码中是怎么控制线程池状态的。
RUNNING,比较简单,创建线程池的时候就会初始化ctl,而ctl初始化为RUNNING状态,所以线程池的初始状态就为RUNNING状态。
- // 初始状态为RUNNING
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
执行shutdown()方法时把状态修改为SHUTDOWN,这里肯定会成功,因为advanceRunState()方法中是个自旋,不成功不会退出。
- public void shutdown() {
- // 获取线程池锁
- final ReentrantLock mainLock = this.mainLock;
- // 加锁,修改线程池状态
- mainLock.lock();
- try {
- // 检查是否有权限关闭线程池
- checkShutdownAccess();
- // 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回
- advanceRunState(SHUTDOWN);
- // 标记空闲线程为中断状态
- interruptIdleWorkers();
- onShutdown();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
-
- private void advanceRunState(int targetState) {
- // 自旋修改线程池状态
- for (;;) {
- // 获取ctl
- int c = ctl.get();
- // 如果状态大于SHUTDOWN(因为只有RUNNING状态才能转化为SHUTDOWN状态,而只有RUNNING状态是小于SHUTDOWN状态的),
- // 或者修改为SHUTDOWN成功了,才会break跳出自旋
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
执行shutdownNow()方法时,会把线程池状态修改为STOP状态,同时标记所有线程为中断状态。
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- // 获取线程池的锁
- final ReentrantLock mainLock = this.mainLock;
- // 加锁,进行状态修改操作
- mainLock.lock();
- try {
- // 检查是否有权限关闭线程池
- checkShutdownAccess();
- // 修改为STOP状态
- advanceRunState(STOP);
- // 标记所有线程为中断状态
- interruptWorkers();
- // 将队列中的任务全部移除,并返回
- tasks = drainQueue();
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- // 返回队列中的任务
- return tasks;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
至于线程是否响应中断其实是在队列的take()或poll()方法中响应的,最后会到AQS中,它们检测到线程中断了会抛出一个InterruptedException异常,然后getTask()中捕获这个异常,并且在下一次的自旋时退出当前线程并减少工作线程的数量。
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果状态为STOP了,这里会直接退出循环,且减少工作线程数量
- // 退出循环了也就相当于这个线程的生命周期结束了
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 真正响应中断是在poll()方法或者take()方法中
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- // 这里捕获中断异常
- timedOut = false;
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里有一个问题,就是已经通过getTask()取出来且返回的任务怎么办?
实际上它们会正常执行完毕,这个知识点涉及到runWorker()这个方法,我们在后面会分析这个方法。
当执行shutdown()或shutdownNow()之后,如果所有任务已中止,且工作线程数量为0,就会进入这个状态。
- final void tryTerminate() {
- // 自旋修改状态
- for (;;) {
- int c = ctl.get();
- // 下面几种情况不会执行后续代码
- // 1. 运行中
- // 2. 状态的值比TIDYING还大,也就是TERMINATED
- // 3. SHUTDOWN状态且任务队列不为空
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 工作线程数量不为0,也不会执行后续代码
- if (workerCountOf(c) != 0) {
- // 尝试中断空闲的线程
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
-
- // 获取线程池的锁
- final ReentrantLock mainLock = this.mainLock;
- // 加锁
- mainLock.lock();
- try {
- // CAS修改状态为TIDYING状态
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- // 更新成功,执行terminated钩子方法
- terminated();
- // 确保terminated钩子方法执行完毕后,再次修改状态为TERMINATED(最终的终止状态),线程池彻底关闭
- } finally {
- // 强制更新状态为TERMINATED,这里不需要CAS了
- ctl.set(ctlOf(TERMINATED, 0));
- // 通知所有等待线程
- termination.signalAll();
- }
- return;
- }
- } finally {
- // 解锁
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
实际更新状态为TIDYING和TERMINATED状态的代码都在tryTerminate()方法中,实际上tryTerminated()方法在很多地方都有调用,比如shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用tryTerminate()方法,但最后只会有一个线程真正执行到修改状态为TIDYING的地方。
修改状态为TIDYING后执行terminated()方法,最后修改状态为TERMINATED,标志着线程池真正消亡了。
见上面TIDYING中分析。
本章节我们讲的是普通线程池,以固定线程池为例,下面就是线程池提交任务的基本用法。我们可以使用Executors框架来直接创建线程池,但是本质它还是通过ThreadPoolExecutor的构造方法创建的线程池。
- // 固定线程池
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- // 提交单个任务,无返回值
- executorService.execute(new Runnable() {
- public void run() {
- }
- });
- // 提交单个任务,有返回值,返回Future
- executorService.submit(new Callable<String>() {
- public String call() throws Exception {
- return "abc";
- }
- });
ThreadPoolExecutor的多个构造方法,都是调用的下面这个构造方法来实现的。
- /**
- * 用给定的初始参数创建一个新的ThreadPoolExecutor。
- * 所有的构造方法最终都会调用这个构造方法,都是基于这个方法实现的
- */
- public ThreadPoolExecutor( int corePoolSize, // 线程池的核心线程数量
- int maximumPoolSize, // 线程池的最大线程数
- long keepAliveTime, // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
- TimeUnit unit, // 存活时间的时间单位
- BlockingQueue<Runnable> workQueue, // 任务队列,用来储存等待执行任务的队列
- ThreadFactory threadFactory, // 线程工厂,用来创建线程,一般默认即可
- RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
- ) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
-
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
-
- this.acc = System.getSecurityManager() == null ?
- null :
- AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
关于构造方法核心参数的详细笔记见:【线程池】Java线程池的核心参数
下面我们通过这个构造方法创建一个线程池,它的核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话。如果使用它运行20个任务,会是什么结果呢?
- public class ThreadPoolTest01 {
- public static void main(String[] args) {
- // 通过构造方法新建一个线程池
- // 核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话
- ExecutorService threadPool = new ThreadPoolExecutor(5, 10,
- 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
- Executors.defaultThreadFactory(),
- new RejectedExecutionHandler() { // 自定义拒绝策略
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println(currentThreadName() + ", discard task");
- }
- }
- );
- // 循环提交20个任务,注意观察num
- for (int i = 0; i < 20; i++) {
- int num = i;
- threadPool.execute(()->{
- try {
- // 打印执行任务的线程信息 和 num的大小 以及执行的时间
- System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- }
-
- // 打印当前线程的名字
- private static String currentThreadName() {
- return Thread.currentThread().getName();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
我们一起来看看一次运行的结果:
pool-1-thread-1, 0 running, 1572678434411
pool-1-thread-3, 2 running, 1572678434411
pool-1-thread-2, 1 running, 1572678434411
pool-1-thread-4, 3 running, 1572678434411
pool-1-thread-5, 4 running, 1572678434411
pool-1-thread-6, 10 running, 1572678434412
pool-1-thread-7, 11 running, 1572678434412
pool-1-thread-8, 12 running, 1572678434412
main, discard task
main, discard task
main, discard task
main, discard task
main, discard task
pool-1-thread-9, 13 running, 1572678434412
pool-1-thread-10, 14 running, 1572678434412
pool-1-thread-3, 5 running, 1572678436411
pool-1-thread-1, 6 running, 1572678436411
pool-1-thread-6, 7 running, 1572678436412
pool-1-thread-2, 8 running, 1572678436412
pool-1-thread-7, 9 running, 1572678436412
注意,观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,竟然不是按顺序打印的,为什么呢?
下面我们基于上面这段代码示例,一步一步debug进去查看ThreadPoolExecutor中地相关源码。
execute()方法是线程池提交任务的方法之一,也是最核心的方法。该方法是在Executor线程池顶层接口中定义的,用于提交Runnable无返回值任务到线程池中执行,该方法也没有返回值。该方法在ThreadPoolExecutor中实现。
java.util.concurrent.ThreadPoolExecutor#execute
- // 提交任务,任务并非立即执行,所以翻译成执行任务似乎不太合适,其实是将任务提交到任务队列中,然后线程池再从任务队列中取出任务执行
- public void execute(Runnable command) {
- // 任务不能为空
- if (command == null)
- throw new NullPointerException();
-
- // 获取ctl(高3位存储状态,低29位存储工作线程的数量)
- int c = ctl.get();
- // 1. 如果工作线程数量小于核心数量
- if (workerCountOf(c) < corePoolSize) {
- // 就添加一个工作线程(核心) 并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回
- if (addWorker(command, true))
- return;
- // 如果创建工作线程失败,则重新获取下ctl
- c = ctl.get();
- }
- // 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列(workQueue是构造方法的核心参数之一)
- if (isRunning(c) && workQueue.offer(command)) {
- // 再次获取ctl
- int recheck = ctl.get();
- // 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起
- else if (!addWorker(command, false))
- // 非核心工作线程创建失败,执行拒绝策略
- reject(command);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
提交任务的过程大致如下:
(1)工作线程数量小于核心数量,创建核心线程,并将任务交给该核心线程去执行;
(2)达到核心数量,将任务加入任务队列;
(3)任务队列满了,创建非核心线程,并将任务交给该非核心线程去执行;
(4)线程数达到最大数量,队列也满了,则执行拒绝策略;
其实,就是三道坎——核心数量、任务队列、最大数量,这样就比较好记了。
流程图大致如下:
任务流转的过程我们知道了,但是任务是在哪里执行的呢?继续往下看。
讲解这个方法之前先了解下 retry: 因为源码中有这个retry标记
先看一个简单的例子
- public void testRetry() {
- int i = 0;
- retry: //①
- while (true) {
- i++;
- System.out.println("i=" + i);
- int j = 0;
- // retry: //②
- for (; ; ) {
- j++;
- System.out.println("j=" + j);
- if (j == 2) {
- break retry;
- }
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
首先需要说明的是,retry:可以理解为java中的一种特殊的标记,其中retry可以换成任何合法的命名。
a:、b:、A13: 等等都是可以的。该标记其实就是":",符号前面是自己定义的任意合法名字。
i=1
j=1
j=2
这里有一点需要注意,这里和C语言的goto不一样,如果break到了循环外面(就是说再也没有循环套在retry外面了),那么这个程序就永远不会再进入到这个循环了,而C语言的goto只是重新调整了程序的执行位置,程序会接着按照代码向下执行,进入循环。如果是用的continue retry就还可以再进入循环。
....
j=1
j=2
i=132348
j=1
j=2
i=132349
j=1
j=2
i=132350
j=1
j=2
...一直循环打印
retry相当于一个标记,只用在循环里面,很像goto语句,break到retry字符处。如果retry没有在循环(for,while)里面,在执行到retry时,就会跳出整个循环。如果retry在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。
注意:retry:需要放在for,whlie,do...while的前面声明,变量只跟在break和continue后面。
如果工作线程数小于核心线程数的话,会调用 addWorker,顾名思义,这个方法主要用来创建一个工作线程,并启动之,其中会做线程池状态、工作线程数量等各种检测。
源码很长,但其实就做了两件事:
java.util.concurrent.ThreadPoolExecutor#addWorker
- // 该方法用于创建新的工作线程添加到线程池中,如果添加成功则启动线程
- // 参数firstTask是一个Runnable对象,表示要给该线程的第一个任务
- // 参数core表示创建的是否是核心线程
- // 返回值表示是否成功创建并启动了工作线程
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 判断有没有资格创建新的工作线程
- // 主要是一些状态/数量的检查等等
- retry:
- for (;;) {
- // 获取ctl
- int c = ctl.get();
- // 获取当前线程池运行状态
- int rs = runStateOf(c);
- // 线程池状态检查
- if (rs >= SHUTDOWN && // 大于等于SHUTDOWN说明线程池不是RUNNING状态
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- // 两种情况会直接返回false 1.如果非运行状态 2.不是【线程池是停止状态并且传入的任务是null并且workQueue不等于空】这种情况
- return false;
- // 工作线程数量检查
- for (;;) {
- // 获取当前线程数
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- // 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回false
- return false;
- // 完成了上面的判断后,说明现在线程池可以创建新线程,则将线程数量加1,因为后面要创建线程了,并跳出循环
- if (compareAndIncrementWorkerCount(c))
- // 因为外面的retry:已经不在循环内了,跳到retry:位置后也不会再进入到该循环了,其实就相当于跳出这两层循环并且不再进入这两层循环,执行后续的流程
- break retry;
- // 如果上面的compareAndIncrementWorkerCount(c)方法返回false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取ctl
- c = ctl.get(); // Re-read ctl
- // 如果当前的运行状态已经和最开始获取的状态不一样了
- if (runStateOf(c) != rs)
- // 回到retry:,但是因为这里用的是continue,所以程序往后执行还是会再次进入到该循环继续执行上面的判断
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
-
- // 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作
- // 标记是否启动了工作线程
- boolean workerStarted = false;
- // 标记是否成功添加了工作线程
- boolean workerAdded = false;
- // 要创建的Worker对象
- Worker w = null;
- try {
- // 创建工作线程
- // 增加一个worker 这个firstTask就是一个线程(任务线程,每一个Worker都和一个任务线程绑定),Runable,具体可以看这Worker源码
- w = new Worker(firstTask);
- // 这个是绑定在Worker上的工作线程,并不是任务线程,工作线程是用来执行任务的,
- final Thread t = w.thread;
- // 判断t是否为null
- if (t != null) {
- // 获取线程池的锁
- final ReentrantLock mainLock = this.mainLock;
- // 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致
- mainLock.lock();
- try {
- // 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭
- // 获取线程池运行状态
- int rs = runStateOf(ctl.get());
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
-
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
-
- // 添加到工作线程队列。 workers是线程池中存放工作线程的集合
- // 增加work 每一个线程池都持有一个workers集合,里面存储着线程池的所有worker,也就是所有线程
- workers.add(w);
- // 还在池子中的线程数量(只能在mainLock中使用)
- int s = workers.size();
- // 不能超过线程池的最大线程数量
- if (s > largestPoolSize)
- largestPoolSize = s;
-
- // 标记线程添加成功
- workerAdded = true;
- }
- } finally {
- // 解锁
- mainLock.unlock();
- }
- if (workerAdded) {
- // 线程添加成功之后启动线程
- // 启动绑定在worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了
- t.start();
- // 标记工作线程启动成功
- workerStarted = true;
- }
- }
- } finally {
- // 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)
- if (! workerStarted)
- addWorkerFailed(w);
- }
- // 返回新创建的工作线程是否启动成功
- return workerStarted;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里其实还没到任务执行的地方,上面我们可以看到线程是包含在Worker这个类中的,我们可以推测出其实执行任务的操作是在内部类Worker中完成的。
通过源码,可以总结出创建 Worker 失败的原因:
addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- // 加锁
- mainLock.lock();
- try {
- // 移除添加失败的worker
- if (w != null)
- workers.remove(w);
- // 减少worker数量
- decrementWorkerCount();
- // 尝试终止线程池
- tryTerminate();
- } finally {
- // 解锁
- mainLock.unlock();
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这个方法主要做三件事
线程池添加线程流程图:
其实真正执行任务的操作,是在runWorker()方法中完成的。该方法是ThreadPoolExecutor中的核心方法之一,但是调用该方法确实通过内部类Worker实现的。
上面在addWorker()方法的源码中,ThreadPoolExecutor线程池调用了绑定在Worker对象上的t工作线程的start()方法,也就相当于启动了Worker工作线程。Worker实现了Runnable方法,所以它其中存在run()方法,当启动了工作线程之后,就会自动执行run()方法,而run()方法其实调用的就是ThreadPoolExecutor的runWorker()方法,该方法就会让工作线程持续地去队列中拿任务执行。
至于ThreadPoolExecutor如何执行任务和对其源码的分析,详见:Worker源码分析。这里就不再赘述了。
通过上面对线程池的重要方法地分析,我们了解了线程池中普通任务执行的流程。
(1)execute(),提交任务的方法,根据核心数量、任务队列大小、最大数量,分成四种情况判断任务应该往哪去;
(2)addWorker(),添加工作线程的方法,通过Worker内部类封装一个Thread实例维护工作线程的执行;
(3)runWorker(),真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;
(4)getTask(),真正从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的poll()还是take()方法,keepAliveTime参数也是在这里使用的。
线程池运作的总流程图:
下面这个流程图我感觉可能更能直观的看出ThreadPoolExecutor每一部分的分工:
下面这个流程我我感觉能更直观展现每一个方法的调用流程:
至此,我们再看最开始给出的那个使用例子带来的问题:
观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,竟然不是按顺序打印的,为什么呢?
线程池的参数:核心数量5个,最大数量10个,任务队列5个。
答:执行前5个任务执行时,正好还不到核心数量,所以新建核心线程并执行了他们;
执行中间的5个任务时,已达到核心数量,所以他们先入队列;
执行后面5个任务时,已达核心数量且队列已满,所以新建非核心线程并执行了他们;
再执行最后5个任务时,线程池已达到满负荷状态,所以执行了拒绝策略。
最后再来一个ThreadPoolExecutor运行机制图示:
前面我们一起学习了线程池中普通任务的执行流程,但其实ThreadPoolExecutor线程池中还有一种任务,叫作未来任务(future task),使用它我们可以获取任务执行的结果。其实线程池就是利用FutureTask实现了异步提交执行任务的功能,可以拿到异步任务结果。那么它是怎么实现的呢?请看下面的分析。
我们还是从一个例子入手,来讲解来章的内容。
我们定义一个线程池,并使用它提交5个任务,这5个任务分别返回0、1、2、3、4,在未来的某一时刻,我们再取用它们的返回值,做一个累加操作。
- public class ThreadPoolTest02 {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 新建一个固定5个线程的线程池
- ExecutorService threadPool = Executors.newFixedThreadPool(5);
- List<Future<Integer>> futureList = new ArrayList<>();
- // 提交5个任务,分别返回0、1、2、3、4
- for (int i = 0; i < 5; i++) {
- int num = i;
- // 使用submit()方法来提交任务,任务执行的结果用Future包装
- Future<Integer> future = threadPool.submit(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("return: " + num);
- // 返回值
- return num;
- });
- // 把future添加到list中
- futureList.add(future);
- }
- // 任务全部提交完再从future中get返回值,并做累加
- int sum = 0;
- for (Future<Integer> future : futureList) {
- sum += future.get();
- }
- System.out.println("sum=" + sum);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里我们思考两个问题:
(1)如果这里使用普通任务,要怎么写,时间大概是多少?
如果使用普通任务,那么就要把累加操作放到任务里面,而且并不是那么好写(final的问题),总时间大概是1秒多一点。但是,这样有一个缺点,就是累加操作跟任务本身的内容耦合到一起了,后面如果改成累乘,还要修改任务的内容。
(2)如果这里把future.get()放到for循环里面,时间大概是多少?
这个问题我们先不回答,先来看源码分析。
我们如果想用未来任务(future task),就需要使用ThreadPoolExecutor的submit()方法来提交任务,不能再用execute()方法提交了。可以通过该方法拿到异步任务结果。
调用抽象类中的submit方法,它是提交有返回值任务的一种方式,内部使用未来任务(FutureTask)包装,再交给execute()去执行,最后返回未来任务本身。FutureTask包装的都是有返回值的Callable类型的任务。
java.util.concurrent.AbstractExecutorService#submit
- public <T> Future<T> submit(Callable<T> task) {
- // 非空检测
- if (task == null) throw new NullPointerException();
- // 将任务包装成FutureTask
- RunnableFuture<T> ftask = newTaskFor(task);
- // 交给execute()方法去执行
- execute(ftask);
- // 返回futureTask
- return ftask;
- }
-
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- // 将普通任务包装成FutureTask
- return new FutureTask<T>(callable);
- }
这里的设计很巧妙,实际上这两个方法都是在AbstractExecutorService这个抽象类中完成的,这是模板方法的一种运用。
我们来看看FutureTask的继承体系:
FutureTask实现了RunnableFuture接口,而RunnableFuture接口组合了Runnable接口和Future接口的能力,而Future接口提供了get任务返回值的能力。
问题:submit()方法返回的为什么是Future接口而不是RunnableFuture接口或者FutureTask类呢?
答:这是因为submit()返回的结果,对外部调用者只想暴露其get()的能力(Future接口),而不想暴露其run()的能力(Runaable接口)。
关于整个Future体系的详细讲解见(里面有Future、FutureTask的源码分析):【并发编程】Java的Future机制详解(Future接口和FutureTask类)
经过之前的学习,我们知道execute()方法最后调用的是task的run()方法。上节讲了我们传进去的任务,最后被包装成了FutureTask,也就是说execute()方法最后会调用到FutureTask的run()方法,所以我们直接看这个方法就可以了。
java.util.concurrent.FutureTask#run
- public void run() {
- // FutureTask任务的状态不为NEW,或者修改为当前线程来运行这个任务失败,则直接返回
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
-
- try {
- // 真正的任务,存储在FutureTask类的成员属性callable中
- Callable<V> c = callable;
- // 任务的state必须为NEW时才运行
- if (c != null && state == NEW) {
- // 运行的结果
- V result;
- // 是否已经执行完毕
- boolean ran;
- try {
- // 执行任务,并获取任务返回结果
- result = c.call();
- // 已执行完毕
- ran = true;
- // 如果执行失败抛出异常
- } catch (Throwable ex) {
- // 清空
- result = null;
- ran = false;
- // 保存任务执行过程中出现的异常
- setException(ex);
- }
- // 如果成功执行完任务
- if (ran)
- // 保存任务执行结果
- set(result);
- }
- } finally {
- // 置空runner,runner是Future的成员属性,用来保存执行任务的线程
- runner = null;
- // 获取任务状态
- int s = state;
- // 如果任务执行过程中被中断了,则处理中断
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
可以看到代码也比较简单,先做状态的检测,再执行任务,最后处理结果或异常。任务的异步执行结果就是在run()方法中被回填到FutureTask中的。
执行任务这里没啥问题,让我们看看处理结果和异常的代码。
- // 保存任务执行过程中出现的异常
- // t为要保存的异常信息
- protected void setException(Throwable t) {
- // 将状态从NEW置为COMPLETING
- if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
- // 返回值置为传进来的异常(outcome就是FutureTask调用get()方法时返回的内容)
- outcome = t;
- // 最终的状态设置为EXCEPTIONAL
- UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
- // 调用完成方法
- finishCompletion();
- }
- }
-
- // 保存任务执行结果
- // v为任务执行完返回的结果
- protected void set(V v) {
- // 将状态从NEW置为COMPLETING
- if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
- // 返回值置为传进来的结果(outcome就是FutureTask调用get()方法时返回的内容)
- outcome = v;
- // 最终的状态设置为NORMAL
- UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
- // 调用完成方法
- finishCompletion();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
咋一看,这两个方法似乎差不多,不同的是返回的结果不一样且状态不一样,最后都调用了finishCompletion()方法。
- private void finishCompletion() {
- // 如果队列不为空(waiters是FutureTask中的成员属性,也就是get方法阻塞的线程队列,这个队列实际上为调用者线程)
- for (WaitNode q; (q = waiters) != null;) {
- // 置空队列
- if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
- for (;;) {
- // 调用者线程,例如调用FutureTask对象的get()方法的线程就是调用者线程
- Thread t = q.thread;
- if (t != null) {
- q.thread = null;
- // 如果调用者线程不为空,则唤醒它。也就是当任务还没有执行玩,调用者就去执行任务FutureTask.get(),调用者就会被阻塞在get()中,
- // 当任务执行完成后,FutureTask.run()方法就会调用finishCompletion()方法中的LockSupport.unpark(t),唤醒调用者线程,这样调用者就能得到任务执行结果了。
- LockSupport.unpark(t);
- }
- // 获取下一个节点
- WaitNode next = q.next;
- if (next == null)
- break;
- q.next = null; // unlink to help gc
- // 后移一个节点
- q = next;
- }
- break;
- }
- }
- // 钩子方法,子类重写
- done();
- // 置空任务
- callable = null; // to reduce footprint
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
整个run()方法总结下来:
(1)FutureTask有一个状态state控制任务的运行过程,从创建任务到正常运行结束state从NEW->COMPLETING->NORMAL,异常运行结束state从NEW->COMPLETING->EXCEPTIONAL;
(2)FutureTask保存了运行任务的线程Runner,它是线程池中的某个线程;
(3)调用者线程是保存在waiters队列中的,它是什么时候设置进去的呢?
(4)任务执行完毕,除了设置状态state变化之外,还要唤醒调用者线程。
调用者线程是什么时候保存在FutureTask中(waiters)的呢?查看构造方法:
- public FutureTask(Callable<V> callable) {
- if (callable == null)
- throw new NullPointerException();
- // 绑定任务到FutureTask
- this.callable = callable;
- // 初始化任务状态
- this.state = NEW; // ensure visibility of callable
- }
发现并没有相关信息,我们再试想一下,如果调用者不调用get()方法,那么这种未来任务是不是跟普通任务没有什么区别?确实是的,所以只有调用get()方法了才有必要保存调用者线程到FutureTask中。
所以,我们来看看get()方法的源码。
get()方法调用时如果任务未执行完毕,会阻塞直到任务结束。
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- // 如果状态小于等于COMPLETING,则进入队列等待
- if (s <= COMPLETING)
- s = awaitDone(false, 0L);
- // 返回结果(异常)
- return report(s);
- }
是不是很清楚,如果任务状态小于等于COMPLETING,则将调用者线程加入队列中等待。
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- // 我们这里假设不带超时
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- // 自旋
- for (;;) {
- // 处理中断
- if (Thread.interrupted()) {
- removeWaiter(q);
- throw new InterruptedException();
- }
- // 4. 如果状态大于COMPLETING了,则跳出循环并返回
- // 这是自旋的出口
- int s = state;
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;
- }
- // 如果状态等于COMPLETING,说明任务快完成了,就差设置状态到NORMAL或EXCEPTIONAL和设置结果了
- // 这时候就让出CPU,优先完成任务
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- // 1. 如果队列为空
- else if (q == null)
- // 初始化队列(WaitNode中记录了调用者线程)
- q = new WaitNode();
- // 2. 未进入队列
- else if (!queued)
- // 尝试入队
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- // 超时处理
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);
- return state;
- }
- LockSupport.parkNanos(this, nanos);
- }
- // 3. 阻塞当前线程(调用者线程)
- else
- LockSupport.park(this);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里我们假设调用get()时任务还未执行,也就是其状态为NEW,我们试着按上面标示的1、2、3、4走一遍逻辑:
(1)第一次循环,状态为NEW,直接到1处,初始化队列并把调用者线程封装在WaitNode中;
(2)第二次循环,状态为NEW,队列不为空,到2处,让包含调用者线程的WaitNode入队;
(3)第三次循环,状态为NEW,队列不为空,且已入队,到3处,阻塞调用者线程;
(4)假设过了一会任务执行完毕了,根据run()方法的分析最后会unpark调用者线程,也就是3处会被唤醒;
(5)第四次循环,状态肯定大于COMPLETING了,退出循环并返回;
问题:为什么要在for循环中控制整个流程呢,把这里的每一步单独拿出来写行不行?
答:因为每一次动作都需要重新检查状态state有没有变化,如果拿出去写也是可以的,只是代码会非常冗长。这里只分析了get()时状态为NEW,其它的状态也可以自行验证,都是可以保证正确的,甚至两个线程交叉运行(断点的技巧)。
OK,这里返回之后,再看看是怎么处理最终的结果的。
report 方法就是根据传入的状态值 s,来决定是抛出异常,还是返回结果值。这个两种情况都 表示 FutureTask 完结了。
- private V report(int s) throws ExecutionException {
- // 获取任务执行结果 outcome是FutureTask类的成员属性,用来存储任务执行结果
- Object x = outcome;
- // 任务正常结束
- if (s == NORMAL)
- return (V)x;
- // 被取消了
- if (s >= CANCELLED)
- throw new CancellationException();
- // 执行异常,则将异常结果抛出
- throw new ExecutionException((Throwable)x);
- }
还记得前面分析run的时候吗,任务执行异常时是把异常放在outcome里面的,这里就用到了。
(1)如果正常执行结束,则返回任务的返回值;
(2)如果异常结束,则包装成ExecutionException异常抛出;
通过这种方式,线程中出现的异常也可以返回给调用者线程(调用FutureTask对象相关方法的线程)了,不会像执行普通任务那样调用者是不知道任务执行到底有没有成功的。
FutureTask除了可以获取任务的返回值以外,还能够取消任务的执行。
java.util.concurrent.FutureTask#cancel
- public boolean cancel(boolean mayInterruptIfRunning) {
- // 如果任务已经完成或者已经被取消,直接返回
- if (!(state == NEW &&
- UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
- mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
- return false;
- try { // in case call to interrupt throws exception
- if (mayInterruptIfRunning) {
- try {
- // 执行任务的线程
- Thread t = runner;
- // 取消任务是通过中断执行任务的线程实现的
- if (t != null)
- t.interrupt();
- } finally { // final state
- // 设置任务状态为中断
- UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
- }
- }
- } finally {
- finishCompletion();
- }
- return true;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里取消任务是通过中断执行线程来处理的,有兴趣的同学可以自己分析一下。
(1)未来任务是通过把普通任务包装成FutureTask来实现的。
(2)通过FutureTask不仅能够获取任务执行的结果,还有感知到任务执行的异常,甚至还可以取消任务;
(3)AbstractExecutorService中定义了很多模板方法,这是一种很重要的设计模式;
(4)FutureTask其实就是典型的异常调用的实现方式,后面我们学习到Netty、Dubbo的时候还会见到这种设计思想的。
答:实际上并没有什么区别,主要是根据corePoolSize来判断任务该去哪里,两者在执行任务的过程中并没有任何区别。有可能新建的时候是核心线程,而keepAliveTime时间到了结束了的也可能是刚开始创建的核心线程。
前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?
答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。
一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得shutdown()和shutdownNow()方法吗?
观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法来中断线程地,而且interruptIdleWorkers()方法中就用到了tryLock(),只有获取到锁了才能中断线程,如果没有获取到锁则不中断。而调用tryLock()后没获取到锁只有一种原因,那就是lock()所在的地方runWorker()方法中,有任务正在执行。这样shutdown()方法就实现了只中断空闲线程,不会中断正在执行任务的线程。
而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。
所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制调用shutdown()时不要中断正在执行任务的线程。
那么为什么Worker使用AQS实现锁,而不直接用ReentrantLock呢?我们可以看到Worker的tryAcquire 方法,它是不允许重入的,而 ReentrantLock是允许重入的。所以这是为了实现不可重入的特性去反应线程现在的执行状态。
很多同学应该关注到了。线程池的执行任务有两种方法,一种是 submit、一种是 execute; 这两个方法是有区别的。
execute:
1. execute 只可以接收一个 Runnable 的参数
2. execute 如果出现异常会抛出
3. execute 没有返回值
submit:
1. submit 可以接收 Runable 和 Callable 这两种类型的参数,
2. 对于 submit 方法,如果传入一个 Callable,可以得到一个 Future 的返回值
3. submit 方法调用不会抛异常,除非调用 Future.get()才会在调用get()方法时抛出异常。
答:RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的。
一般地,通过一个线程(我们叫作远程线程)去调用远程接口,如果是同步调用,则直接让调用者线程阻塞着等待远程线程调用的结果,待结果返回了再返回;如果是异步调用,则先返回一个未来可以获取到远程结果的东西FutureXxx,当然,如果这个FutureXxx在远程结果返回之前调用了get()方法一样会阻塞着调用者线程。
有兴趣的同学可以先去预习一下dubbo的异步调用(它是把Future扔到RpcContext中的)。
相关文章:【线程池】Java的线程池
【线程池】Java线程池的核心参数
【线程池】Executors框架创建线程池
【线程池】ScheduledExecutorService接口和ScheduledThreadPoolExecutor定时任务线程池使用详解 【线程池】线程池的拒绝策略(饱和策略)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。