当前位置:   article > 正文

ThreadPoolExecutor源码解析_threadpoolexecutor 创建 worker 的时候为什么会 setstate

threadpoolexecutor 创建 worker 的时候为什么会 setstate

在Java中,如果每个请求到来我们就创建一个新的线程,创建和销毁线程花费的时间和消耗的系统资源都非常之大,甚至有可能比在处理实际的用户请求所使用的资源还要多。

如果在一个jvm里面创建太多的线程,那么可能会使得系统由于过度消耗内存或者频繁切换线程而导致系统资源不足。

所以这时有了线程池这个概念,非常好的解决了上面两个问题。另外线程池也提供了许多可调参数和可扩展性接口,以满足不同的情景需求。程序员可以使用Excutors的工厂方法,比如newCachedThreadPool(线程池线程个数最多可达Integer.MAX_VALUE,线程自动回收),newFixedThreadPool(固定大小的线程池)和newSingleThreadExecutor(单个线程)来创建线程池。

如下面所示类图中,Executors其实是一个工具类,里面提供了很多静态方法返回不同的线程池实例。

1、线程池ThreadPoolExecutor中的成员变量:

(1)ctl : 是一个Integer的原子变量,用来记录线程池状态和线程池中的线程个数,类似于ReentrantReadWriteLock锁使用一个变量来保存两种信息。 这里假设Integer类型是32位的二进制,则其中高3位用来表示线程池的状态,后面29位用来表示线程池的线程个数。

  1. //默认是Running状态,线程个数为0
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. //并不是所有的平台的int类型都是32位的,所以准确的说就是当前平台的Integer的二进制位数-3的剩余位数来表示线程的个数
  4. private static final int COUNT_BITS = Integer.SIZE - 3;
  5. //线程最大个数(低29位为1) 00011111111111111111111111111111
  6. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  7. //RUNNING状态 11100000000000000000000000000000 负数 = 原码取反+1
  8. private static final int RUNNING = -1 << COUNT_BITS;
  9. //SHUTDOWN状态 00000000000000000000000000000000
  10. private static final int SHUTDOWN = 0 << COUNT_BITS;
  11. //STOP状态 00100000000000000000000000000000
  12. private static final int STOP = 1 << COUNT_BITS;
  13. //TIDYING状态 01000000000000000000000000000000
  14. private static final int TIDYING = 2 << COUNT_BITS;
  15. //TERMINATED状态 01100000000000000000000000000000
  16. private static final int TERMINATED = 3 << COUNT_BITS;
  17. //获取高3位 线程状态
  18. private static int runStateOf(int c) { return c & ~CAPACITY; }
  19. //获取低29位 线程个数
  20. private static int workerCountOf(int c) { return c & CAPACITY; }
  21. //计算ctl值 线程状态与线程个数
  22. private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态有下面几种:

RUNNING :  接受新任务并且处理阻塞队列里面的任务。

SHUTDOWN : 拒绝新任务但是处理阻塞队列里面的任务。

STOP : 拒绝新任务并且抛弃阻塞队列里面的任务。

TIDYING : 所有任务都执行完毕(包含阻塞队列里面的任务)后当前线程池活动线程位0,将要调用terminated方法。

TERMINATED : 终止状态,termincated方法调用完毕以后的状态,那么什么是termincated方法呢?我们可以留个小疑问后面讲

线程池的状态转换如下所示:

RUNNING -> SHUTDOWN : 显式调用shutdown()方法。

(RUNNING或者SHUTDOWN) - > STOP : 显式调用shutdownNow()方法。

SHUTDOWN -> TIDYING : 当线程池和任务队列为空的时候。

STOP -> TIDYING : 当线程池为空的时候(为什么这里不用队列为空呢?因为STOP状态下会主动抛弃所有阻塞的任务,必然为空了)。

TYDING -> TERMINATED : 当termincated方法执行完毕。

(2)corePoolsize :  线程池核心线程个数。

(3)workQueue : 用户保存等待执行的任务的阻塞队列,它可以是基于数组的有界的ArrayBlockingQueue、也可以是基于链表的无界的LinkedBlockingQueue、最多只有一个元素的同步队列SynchronizedQueue等等。

(4)maximunPoolSize : 线程池中最大线程的数量

(5)ThreadFactory : 创建线程的工厂,后面在Worker构造函数的时候会用到,可以选择自定义,也可以使用默认

(6)handler : 拒绝策略,是RejectedExecutionHandler类型的变量。当队列满并且线程个数达到了maximunPoolSize后采取的策略。其实现的子类有AbortPolicy(抛出异常),DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)等等策略。

(7)keepAliveTime : 存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置的状态,则这些闲置的线程能存活的最大时间。在后面getTask方法中可以使用到,该线程要是超时获取不到任务则就会被丢弃。

(8)TimeUnit : 存活时间的单位。

(9)mainLock : 独占锁,用来控制新增worker线程操作的原子性。

(10)termination : 该锁的条件队列,在线程调用awaitTermination方法时候用来存放阻塞的线程。在tryTermination的时候会唤醒所有阻塞的线程,这个后面再具体看下。

(11)Worker : 继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,设置为-1是为了避免该线程在运行runWorker方法前就被中断了。

2、讲完了ThreadPoolExecutors的主要参数,我们接下来讲下创建线程池的主要几种工具

(1)newFixedThreadPool : 创建一个核心线程数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE(LinkedBlockingQueue的最大个数)。keepAliveTime=0说明只要线程个数比核心线程个数个数多并且当前空闲就回收

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

(2)newSingleThreadExecutor : 创建一个核心线程个数和最大线程个数都为1 的线程池,阻塞队列长度同样为Integer.MAX_VALUE。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

 (3)newCachedThreadPool : 创建一个按需创建线程的线程池,初始化线程个数为0,最多的线程个数为Integer.MAX_VALUE,并且阻塞队列是为同步队列。keepAvlieTime=60表示只要当前线程在60s内空闲的话则回收。加入同步队列的任务会被马上执行,同步队列里面最多就只有一个任务。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

3、下面开始分析实现原理,我们把一个任务提交给线程池去处理的时候,线程池的处理过程是怎么样的?

(1)execute方法的作用就是提交任务commond到线程池进行执行。

  1. public void execute(Runnable command) {
  2. //如果任务为空,则直接抛出异常
  3. if (command == null)
  4. throw new NullPointerException();
  5. //获取当前线程池的状态 + 线程个数变量的组合值
  6. int c = ctl.get();
  7. //判断当前线程池个数是否小于核心线程个数,如果小于则开启新线程运行
  8. if (workerCountOf(c) < corePoolSize) {
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. //若是当前线程池个数大于等于核心线程个数,也就是核心线程池已满,那么就添加任务到阻塞队列中
  14. if (isRunning(c) && workQueue.offer(command)) {
  15. //还需要再次检查下当前线程池的状态,这是因为添加任务到阻塞队列后,但是在执行(1.1)代码
  16. 前有可能线程池的状态发生了变化
  17. int recheck = ctl.get();
  18. //(1.1)如果当前的状态已经不是Running状态的话,则从队列中删除该任务,并执行拒绝策略
  19. if (! isRunning(recheck) && remove(command))
  20. reject(command);
  21. // 如果当前还是Running状态那么当前线程池是否为空,如果为空则要添加一个null线程。只添加
  22. 线程不添加任务。
  23. else if (workerCountOf(recheck) == 0)
  24. addWorker(null, false);
  25. }
  26. //如果阻塞队列也满了(workQueue.offer(command)失败了),那么就新增线程,如果新增失败那么就
  27. 选择拒绝策略拒绝该任务。
  28. else if (!addWorker(command, false))
  29. reject(command);
  30. }

 (2)addWorker方法用于新增一个线程

  1. //添加线程 firstTask:任务 core : true表示线程池个数小于核心个数 false表示小于最大线程数
  2. //代码这么长,其实就分为两个部分:1、使用CAS循环操作使线程数加1 2、新建一个线程并且启用它
  3. private boolean addWorker(Runnable firstTask, boolean core) {
  4. //go语句避免死循环,例如处于双循环下的语句只要break retry就可以跳出双循环
  5. retry:
  6. for (;;) {
  7. //获取线程池的状态
  8. int c = ctl.get();
  9. int rs = runStateOf(c);
  10. // Check if queue empty only if necessary.
  11. //第一个条件理解为线程池的状态若是为STOP、TIDYING或者TERMINATED状态的时候返回false
  12. //第二个以及第三个条件理解为shutDown状态下是不接受新的任务,但任然会处理阻塞队列里
  13. //面的任务,所以该状态下如果传进来的任务(firstTask)为空并且任务队列(workQueue)不
  14. //为空的时候,是允许添加任务的,但是反之就是不准也就是返回false。
  15. if (rs >= SHUTDOWN &&
  16. ! (rs == SHUTDOWN &&
  17. firstTask == null &&
  18. ! workQueue.isEmpty()))
  19. return false;
  20. //第二个循环
  21. for (;;) {
  22. //获取当前线程的个数
  23. int wc = workerCountOf(c);
  24. //如果线程个数大于线程的最大数就返回false,至于大于核心线程数还是大于规定的最大
  25. //线程数就根据传进来的core值了。
  26. if (wc >= CAPACITY ||
  27. wc >= (core ? corePoolSize : maximumPoolSize))
  28. return false;
  29. //如果验证成功,则处于第二个循环里面自旋CAS加1,如果自旋成功,则退出双循环,也就
  30. //是去执行下一步创建新线程了。
  31. if (compareAndIncrementWorkerCount(c))
  32. break retry;
  33. c = ctl.get(); // Re-read ctl
  34. //检查线程池的状态,若是状态已经改变了,则要从第一个循环开始执行重新检查当前线程
  35. //池状态是否符合了。
  36. if (runStateOf(c) != rs)
  37. continue retry;
  38. // else CAS failed due to workerCount change; retry inner loop
  39. }
  40. }
  41. //工作线程是否启动的标识
  42. boolean workerStarted = false;
  43. //工作线程是否添加成功的标识
  44. boolean workerAdded = false;
  45. Worker w = null;
  46. try {
  47. //构建一个worker对象,在Worker构造器里面做了什么操作呢?其实就是传入一个Runnable对
  48. //象并新建一个线程。我们后面可以继续看下Worker对象的源码
  49. w = new Worker(firstTask);
  50. //从worker对象中取出线程
  51. final Thread t = w.thread;
  52. if (t != null) {
  53. final ReentrantLock mainLock = this.mainLock;
  54. //加独占锁,为了实现workers同步,避免多个线程调用execute的时候会同时调用
  55. //addWorker而导致workers不会同步。
  56. mainLock.lock();
  57. try {
  58. int rs = runStateOf(ctl.get());
  59. //当线程池的状态小于SHUTDOWN也就是RUNNING状态,或者当前状态是SHUTDOWN但
  60. //是任务为空也就是只添加线程,不添加新的任务
  61. if (rs < SHUTDOWN ||
  62. (rs == SHUTDOWN && firstTask == null)) {
  63. if (t.isAlive()) // precheck that t is startable
  64. throw new IllegalThreadStateException();
  65. //workers的set集合添加workere
  66. workers.add(w);
  67. int s = workers.size();
  68. //设置线程池中出现过的最大线程数
  69. if (s > largestPoolSize)
  70. largestPoolSize = s;
  71. //表示工作线程创建成功了
  72. workerAdded = true;
  73. }
  74. } finally {
  75. //释放锁
  76. mainLock.unlock();
  77. }
  78. //如果worker添加成功的话,则启动线程
  79. if (workerAdded) {
  80. t.start();
  81. workerStarted = true;
  82. }
  83. }
  84. } finally {
  85. //如果添加失败,就需要调用addWorkerFailed方法,也就是递减工作线程数量(我们在第一步
  86. //中增加了工程数了)
  87. if (! workerStarted)
  88. addWorkerFailed(w);
  89. }
  90. return workerStarted;
  91. }

(3)接下来我们看下Work的构造函数以及addWorkerFailed函数

  1. //addWorker方法中如果添加worker并且启动线程失败的话,则需要做失败后的处理
  2. private void addWorkerFailed(Worker w) {
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. //需要从workers集合中移除这个worker
  7. if (w != null)
  8. workers.remove(w);
  9. //原子递减核心线程数(因为在前面新增了原子增加)
  10. decrementWorkerCount();
  11. //尝试结束线程池(这个方法在后面详细讲解)
  12. tryTerminate();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. }
  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  2. private static final long serialVersionUID = 6138294804551838833L;
  3. //真正执行任务的线程
  4. final Thread thread;
  5. //这就是要执行的任务
  6. Runnable firstTask;
  7. //完成的任务数,用于线程池统计
  8. volatile long completedTasks;
  9. //构造函数
  10. Worker(Runnable firstTask) {
  11. //初始状态 -1,防止在调用 runWorker(),也就是真正执行 task前中断 thread。
  12. setState(-1);
  13. this.firstTask = firstTask;
  14. //使用默认的ThreadFactory线程工程生产线程
  15. this.thread = getThreadFactory().newThread(this);
  16. }
  17. //执行 runWorker方法,真正的执行体
  18. public void run() {
  19. runWorker(this);
  20. }
  21. protected boolean isHeldExclusively() {
  22. return getState() != 0;
  23. }
  24. //重写了tryAcquire方法,这里worker是自己使用AQS来实现独占锁的,为什么不用
  25. //ReentrantLock锁呢,因为该锁是允许重入的,但是我们这里自己实现的锁是不要重入的
  26. protected boolean tryAcquire(int unused) {
  27. if (compareAndSetState(0, 1)) {
  28. setExclusiveOwnerThread(Thread.currentThread());
  29. return true;
  30. }
  31. return false;
  32. }
  33. //释放锁的时候设置state状态为0,也就是未上锁
  34. protected boolean tryRelease(int unused) {
  35. setExclusiveOwnerThread(null);
  36. setState(0);
  37. return true;
  38. }
  39. //worker变量上锁的操作
  40. public void lock() { acquire(1); }
  41. public boolean tryLock() { return tryAcquire(1); }
  42. //worker变量解锁的操作
  43. public void unlock() { release(1); }
  44. public boolean isLocked() { return isHeldExclusively(); }
  45. //shutDownNow操作时候会调用该函数,其实是中断所有的线程
  46. void interruptIfStarted() {
  47. Thread t;
  48. //但是这里也只会中断state>=0的线程,如果初始化worker的时候状态等于-1的话是不会被中
  49. //断的,并且该线程不为空,而且中断状态为未中断的
  50. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  51. try {
  52. //就可以选择中断它了,那么这里为什么可以中断它呢?interrupt方法只不过是设置
  53. //了这个线程的中断标志位而已,其实在后面的getTask方法时候如果取不到任务必定
  54. //会被阻塞所以必定会被中断的。这个在后面的shutDownNow会详细讲解。
  55. t.interrupt();
  56. } catch (SecurityException ignore) {
  57. }
  58. }
  59. }
  60. }

(4)runWorker方法

上面我们已经了解了addWorker方法,主要作用就是增加工作线程 ,而Worker简单的理解其实就是一个线程,里面重写了run方法,这块是线程池中执行任务的真正处理逻辑。主要步骤:

1、如果task不为空则开始执行task

2、如果task为空则通过getTask()方法再去取任务,并赋值给task,如果取到的Runnable不为空则执行该任务

3、执行完毕后通过while循环继续去getTask()取任务,有可能阻塞队列中无任务就会暂时阻塞在这里

4、如果getTask()取到的任务最终是空的,那么整个runWorker()方法执行完毕

                   

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. //获取当前需要执行的任务
  4. Runnable task = w.firstTask;
  5. //设置为空拉
  6. w.firstTask = null;
  7. //然后解锁,这里为什么要解锁呢??因为 new Worker默认的 state=-1,此处是调用Worker类的
  8. //tryRelease()方法将state设置为0,允许该线程被中断,
  9. w.unlock();
  10. boolean completedAbruptly = true;
  11. try {
  12. //线程在这里会进行一个循环取任务然后执行任务,有可能阻塞队列为空,getTask阻塞在此处
  13. //也有可能getTask为null然后就跳出循环
  14. while (task != null || (task = getTask()) != null) {
  15. //为worker上锁,其实这里上锁并不是为了并发,而是为了在shutdown()时不终止正在运
  16. //行的worker
  17. w.lock();
  18. //这个地方我理解的有点困难,纠结了超级久。字面上的意思就是确保只有在线程stop状
  19. //态下才会被设置中断标示,否则就清除中断标示。
  20. //1、这里设置了中断标示有什么用呢,在哪里会真正的被中断呢?stop状态下不是说会清
  21. //除正在执行的任务吗???其实我们这里设置了中断标识,在后面task.run()也就是执
  22. //行任务的时候,自己去判断中断标识位是否要中断任务而已,至于stop状态下也不一定
  23. //就会中断正在运行的任务,就要看任务里面自己有没有对中断标识位判断并处理了,没有
  24. //的话就一直执行下去
  25. //2、为什么Thread.interrupted()清除了中断标识位后,又去设置中断标识位?
  26. //我们可以先看下runStateAtLeast(ctl.get(), STOP)这里是判断当前线程池的状态是
  27. //否>=stop状态,然后当前线程如果没有设置中断状态的话,那绝逼要设置一下中断状态
  28. //啊,保证stop状态下线程是中断状态的。如果一开始判断线程池状态时小于stop的,也
  29. //就是running状态,那么我们判断此时Thread.interrupted()的当前线程中断状态,如
  30. //果是中断状态,那么也就是线程被中断了,然后就清除了中断标志位,我们此时就需要再
  31. //次判断一下当前的状态是否是stop了,如果是stop状态的话就需要再次设置中断标识位
  32. //了(有可能线程池调用了shutdownNow了),如果还是running状态,那么为了保证
  33. //runnning状态且加锁状态下不会被中断的,我们就不需要管了。这里有可能这个线程在
  34. //上次执行任务的时候调用了interrupt()方法,所以我们为了不能让接下来执行的任务受
  35. //到之前任务的影响,就必须设置为false。
  36. if ((runStateAtLeast(ctl.get(), STOP) ||
  37. (Thread.interrupted() &&
  38. runStateAtLeast(ctl.get(), STOP))) &&
  39. !wt.isInterrupted())
  40. wt.interrupt();
  41. try {
  42. //在执行任务之前所做的事情,这里默认是没有实现的,在一些特定的场景中我们可
  43. //以自己继承 ThreadpoolExecutor 自己重写
  44. beforeExecute(wt, task);
  45. Throwable thrown = null;
  46. try {
  47. //执行真正的任务
  48. task.run();
  49. } catch (RuntimeException x) {
  50. thrown = x; throw x;
  51. } catch (Error x) {
  52. thrown = x; throw x;
  53. } catch (Throwable x) {
  54. thrown = x; throw new Error(x);
  55. } finally {
  56. //这里默认也是没有实现
  57. afterExecute(task, thrown);
  58. }
  59. } finally {
  60. //置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask()取) +
  61. //记录该 Worker 完成任务数量 + 解锁)
  62. task = null;
  63. w.completedTasks++;
  64. w.unlock();
  65. }
  66. }
  67. //不会抛异常
  68. completedAbruptly = false;
  69. } finally {
  70. //线程结束了(也就是该线程可能要被回收了),需要做的操作
  71. processWorkerExit(w, completedAbruptly);
  72. }
  73. }

(5)getTask方法

我们在上面的runWorker中已经可以知道,线程通常会while循环阻塞通过getTask方法获取任务

  1. private Runnable getTask() {
  2. //这里表示在下面的for循环中,上一次的task是否超时了
  3. boolean timedOut = false;
  4. for (;;) {
  5. int c = ctl.get();
  6. //获取当前线程池的状态
  7. int rs = runStateOf(c);
  8. //如果当前线程池等于shutDown状态并且任务队列是空的 或者 当前线程池状态至少为stop状
  9. //态那么直接返回空的任务了,runWorker方法一旦getTask为null的话,直接跳出while循环
  10. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  11. //其实就是ctl减1
  12. decrementWorkerCount();
  13. return null;
  14. }
  15. //查询它的工作线程数
  16. int wc = workerCountOf(c);
  17. //timed变量是用于判断是否需要进行超时控制的
  18. //allowCoreThreadTimeOut默认为false,也就是核心线程不允许进行超时;
  19. //wc > corePoolSize表示当前线程数量超过核心线程数,对于超过的核心线程的这些线程也需
  20. //要超时控制,不超过的话(而且当前还是SHUTDOWN状态,workQueue不为空的话)就会维护核
  21. //心线程数直到队列为空,才会回收。
  22. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  23. //1、线程数量超过了maximumPoolSize可能是线程池在运行时被调用了
  24. //setMaximumPoolSize()改变了大小,否则已经addWorker成功是不可能会超过最大线程数的
  25. //2、timed表示当前操作需要进行超时判断,而timedOut表示内循环的上一次操作已经被设置
  26. //为超时了,这里就是体现了空闲线程的存活时间,超过一段时间处于空闲状态就会被回收。
  27. if ((wc > maximumPoolSize || (timed && timedOut))
  28. && (wc > 1 || workQueue.isEmpty())) {
  29. //worker数量-1(操作ctl),减一是用了cas操作,然后返回null,在runWorker()会去
  30. //减少worker线程的,这里无需操作,如果本次操作失败(减1不成功),就继续内层循环
  31. //再次尝试减1
  32. if (compareAndDecrementWorkerCount(c))
  33. return null;
  34. continue;
  35. }
  36. try {
  37. //若是需要超时判断则采用poll方法,如果不需要则采用take方法
  38. //1、poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null,
  39. //interrupt()方法时候不会抛异常,但是会有中断响应。
  40. //2、take:如果阻塞队列为空,当前线程会被一直阻塞;当队列中有任务加入的时才会被
  41. //唤醒。interrupt()时不会抛异常,但会有中断响应。
  42. Runnable r = timed ?
  43. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  44. workQueue.take();
  45. //如果任务不为空,则直接返回到runWorker方法去执行
  46. if (r != null)
  47. return r;
  48. //如果任务为空,则设置为已经超时了,在下一次循环的时候就会将worker数量减一
  49. timedOut = true;
  50. } catch (InterruptedException retry) {
  51. //响应中断interrupt()有可能阻塞的时候被中断了,这里进行捕获异常,并重置超时为
  52. //false,然后进行下一次循环。有可能是shutDown对空闲线程进行中断,那么下次循环的
  53. //时候就会返回null了,然后runWorker方法就会对该线程进行回收了。
  54. timedOut = false;
  55. }
  56. }
  57. }

(6)processWorkerExit方法

runWorker的while循环执行完毕之后,也就是该线程已经获取不到任务了或者超时等等原因,这时要被回收了,所以在finally执行processWorkerExit方法。

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. //如果退出的时候是突然终止的,说明是task执行的时候导致的也就是run方法执行的时候发生的异
  3. //常,那么这时候work线程数量就必须要减一,如果不是突然终止的,说明worker线程是没有任务执
  4. //行了,它在getTask()方法里面线程数量已经减1了
  5. if (completedAbruptly)
  6. decrementWorkerCount();
  7. final ReentrantLock mainLock = this.mainLock;
  8. mainLock.lock();
  9. try {
  10. //worker的完成任务数添加到线程池的完成任务数量
  11. completedTaskCount += w.completedTasks;
  12. //从HashSet<Worker>中移除掉worker
  13. workers.remove(w);
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. //线程终止的方法,对线程池进行减数的时候都必须要执行该操作。
  18. //该方法主要是判断线程池是否满足终止状态了,如果已经满足了那么判断线程池中是否还有线程,
  19. //若是有线程则对其发出中断响应,让其退出线程。若是没有线程则直接更新状态为tidying-
  20. //>terminated
  21. tryTerminate();
  22. //这里主要是说明是否需要增加worker线程
  23. //首先第一个条件是线程状态必须是RUNNING或者SHUTDOWN状态
  24. //如果线程是突然终止的也就是说是执行任务的时候抛异常的,那么不管三七二十一必须加线程回去
  25. //如果线程不是突然终止的,那么判断当前线程数量是否小于核心线程数量,如果是则addWorker,
  26. //为什么呢?因为就算线程池调用shutDown(),但是直到workQueue任务队列为空之前,线程池都会
  27. //维护corePoolSize个线程,然后再慢慢的销毁corePoolSize个线程
  28. int c = ctl.get();
  29. if (runStateLessThan(c, STOP)) {
  30. if (!completedAbruptly) {
  31. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  32. if (min == 0 && ! workQueue.isEmpty())
  33. min = 1;
  34. if (workerCountOf(c) >= min)
  35. return; // replacement not needed
  36. }
  37. //添加一个没有firstTask的worker
  38. addWorker(null, false);
  39. }
  40. }

(7)shutdown方法

调用shutdown方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是会执行完的,该方法会立刻返回不会等待队列任务完成后再返回的。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. //mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,
  4. //需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数
  5. //据时需要先获取mainLock
  6. mainLock.lock();
  7. try {
  8. //权限检查,检查当前是否设置了安全管理器,是则查看当前调用shutdown的线程是否有关闭线程的权
  9. //限,如果有权限则看调用线程是否有中断工作线程的权限,如果没有则抛出异常。
  10. checkShutdownAccess();
  11. //使用CAS设置当前线程池为SHUTDOWN状态
  12. advanceRunState(SHUTDOWN);
  13. //设置所有空闲的线程中断标志
  14. interruptIdleWorkers();
  15. //留给子类去实现
  16. onShutdown(); // hook for ScheduledThreadPoolExecutor
  17. } finally {
  18. mainLock.unlock();
  19. }
  20. //尝试终止线程就像在processWorkerExit方法说的每个线程在做减数操作的时候都可能导致线程状态变
  21. //为TERMINATED状态的。
  22. //那么什么时候就会触发这个条件呢??
  23. //1、当前状态为SHUTDOWN并且线程池和任务队列都有空
  24. //2、当前状态为STOP并且线程池为空。
  25. tryTerminate();
  26. }

shutdown操作中会调用interruptIdleWorkers方法,那么这个方法有什么用呢?

  1. private void interruptIdleWorkers() {
  2. //false表示终止所有的空闲线程,而不是只终止一个
  3. interruptIdleWorkers(false);
  4. }
  5. //shutDown操作会调用该方法来终止线程池中所有的空闲线程。
  6. //那么何为空闲线程呢?
  7. //其实空闲线程就是阻塞在getTask()方法的线程,从前面的代码中分析我们可以得知如果此时阻塞
  8. //队列中没有任务的话,我们的线程就会阻塞在getTask()方法中的take函数或者poll函数,那么
  9. //我们继续推断一下,该work也就被阻塞在getTask中了(runWorker方法不会继续往下运行了)。此时
  10. //该worker也不会上锁(继承了AQS自定义的锁),所以也可以这样子认为没有上锁的线程就是空闲线程
  11. private void interruptIdleWorkers(boolean onlyOne) {
  12. final ReentrantLock mainLock = this.mainLock;
  13. //当要操作worker时候需要上锁的(都不是volatile)
  14. mainLock.lock();
  15. try {
  16. for (Worker w : workers) {
  17. Thread t = w.thread;
  18. //这时候需要对所有的空闲线程进行设置中断标志位
  19. //1、如果该线程已经设置了中断标志位了则无需再设置
  20. //2、该worker尝试获取锁,如果获取成功说明之前没有上过锁,也就是该线程
  21. //没有在运行任务,是空闲的。
  22. if (!t.isInterrupted() && w.tryLock()) {
  23. try {
  24. //设置中断标志位
  25. t.interrupt();
  26. } catch (SecurityException ignore) {
  27. } finally {
  28. //worker解锁
  29. w.unlock();
  30. }
  31. }
  32. //如果只中断一个空闲线程,那么循环一次就退出循环
  33. if (onlyOne)
  34. break;
  35. }
  36. } finally {
  37. mainLock.unlock();
  38. }
  39. }
  40. //其实上面的tryLock实际上调用了Worker自身实现的tryAcquire()方法,这也是AQS规定子类实现的方法。
  41. //其实就是尝试设置AQS的state从0->1,返回true代表上锁成功,该锁是不可重入的。
  42. protected boolean tryAcquire(int unused) {
  43. if (compareAndSetState(0, 1)) {
  44. setExclusiveOwnerThread(Thread.currentThread());
  45. return true;
  46. }
  47. return false;
  48. }

其实上面正阻塞在getTask()获取任务的worker在被中断后,就会抛出InterruptedException的异常,不再阻塞任务,捕获中断异常后,将继续循环到getTask()最开始的判断线程池的状态,如果此时状态为shutDown状态,且workQueue.isEmpty为空就会return null,然后runWork线程就会退出逻辑了。

某些情况下,interruptdleWorkers()时还有多个worker正在运行呢,所以不会对其发出中断信号,假设此时workQueue也不为空,那么多个worker运行结束后,会继续到workQueue阻塞获取任务,获取到就执行任务,没获取到的话如果是核心线程那么就一直在workQueue.take()阻塞住,线程无法终止,因为workQueue已经空了啊,而且shutDown后也不会接收新任务了。那怎么办呢???这就需要在shutDown()后,还可以发出中断信号呢。所以它就在所有可能导致线程池终止的地方(1、减少worker的数量,2、shutdown时从queue中移除任务

)都安置了tryTerminate()尝试线程池终止的逻辑,并在其中判断如果线程池已经进入了终止流程的话(没有任务可以执行了,但是线程池还有线程),那么就必须中断唤醒一个空闲线程。

tryTerminate方法:线程终止方法

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. //如果是以下几种情况,那么不符合终止的条件,直接返回
  5. //1、当前线程池状态还是处于RUNNING
  6. //2、当前线程池状态至少是TIDYING状态
  7. //3、当前线程池状态为SHUTDOWN,但是阻塞队列还不为空
  8. if (isRunning(c) ||
  9. runStateAtLeast(c, TIDYING) ||
  10. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  11. return;
  12. //走到了这一步说明已经符合了终止线程的条件,那么检测当前线程池是否还有线程,如果还有
  13. //线程的话,那么就选择进行中断线程了,但是这里只是中断唤醒一个空闲线程。因为这样可以
  14. //把中断信号一直传递下去,当一个线程中断唤醒之后就会进入processWorkerExit方法,接着
  15. //又会回到了tryTerminate()方法。直到线程中数量为0为止。
  16. if (workerCountOf(c) != 0) { // Eligible to terminate
  17. interruptIdleWorkers(ONLY_ONE);
  18. return;
  19. }
  20. final ReentrantLock mainLock = this.mainLock;
  21. mainLock.lock();
  22. try {
  23. //首先设置状态为TIDYING
  24. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  25. try {
  26. //由子类实现
  27. terminated();
  28. } finally {
  29. //最后设置状态为TERMINATED
  30. ctl.set(ctlOf(TERMINATED, 0));
  31. //然后唤醒所有等待线程池终止的线程 awaitTermination(),termination是
  32. //条件队列
  33. termination.signalAll();
  34. }
  35. return;
  36. }
  37. } finally {
  38. mainLock.unlock();
  39. }
  40. // else retry on failed CAS
  41. }
  42. }

那么接下来我们再讲下awaitTermination()方法是啥玩意呢???我们可以先看下怎么用法。

其实就是我们自己在代码中,直接调用awaitTermination()来判断当前线程是否已经终止

  1. //此时我们的代码对线程池发出了shutDown操作
  2. executorService.shutdown();
  3. try{
  4. //然后调用awaitTermination()方法阻塞在这里,判断当前线程池是否已经终止
  5. while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
  6. LOGGER.debug("Waiting for terminate");
  7. }
  8. }
  9. catch (InterruptedException e) {
  10. //中断处理
  11. }

其实就是阻塞判断线程池是否已经终止的方法而已,那么什么时候会阻塞呢?这就需要我们看下awaitTermination的源码了。

  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (;;) {
  8. //至少是TERMINATED状态,直接返回true,说明已经终止了
  9. if (runStateAtLeast(ctl.get(), TERMINATED))
  10. return true;
  11. //超时时间到了,直接返回false,说明终止线程池失败
  12. if (nanos <= 0)
  13. return false;
  14. nanos = termination.awaitNanos(nanos);
  15. }
  16. } finally {
  17. mainLock.unlock();
  18. }
  19. }

从上面的结论可以得知,阻塞等待过程中发生以下具体情况会解除阻塞

1、如果发生了termination.signalAll()就会唤醒阻塞等待的线程,而且由于ThreadPoolExecutor只有在tryTerminated()方法中才会尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),所以awaitTermination()会再次判断当前的状态,如果是已经终止了那么就返回true。

2、如果达到了超时时间termication.awaitNanos()时候也会返回,此时nano==0,再次循环判断return false,失败了。

3、当前线程被Thread.interrupt(),awaitNanos()就会向上抛异常,awaitTermination继续向上抛给调用的线程,以抛异常的方法解除阻塞。

(8)shutdownNow方法

调用shutdownNow方法之后,线程池就不会再接收新的任务了,并且会丢弃工作丢列里面的任务,正在运行的任务也会被设置中断标志,该方法会立刻返回的,并不会等待激活的任务执行完毕,返回值为此时被丢弃的任务列表。

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. //权限检查
  7. checkShutdownAccess();
  8. //设置状态为stop
  9. advanceRunState(STOP);
  10. //中断所有的线程
  11. interruptWorkers();
  12. //将队列任务移动到tasks中
  13. tasks = drainQueue();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. return tasks;
  19. }
  20. //中断所有的线程
  21. private void interruptWorkers() {
  22. final ReentrantLock mainLock = this.mainLock;
  23. mainLock.lock();
  24. try {
  25. //这里无论三七二十一了,不管它空不空闲或者有没有在执行任务,都直接设置中断标志了
  26. for (Worker w : workers)
  27. w.interruptIfStarted();
  28. } finally {
  29. mainLock.unlock();
  30. }
  31. }
  32. //但是呢至少worker的state也是要为0或者1的,如果是-1就相当于初始化完还没有进行runWorker的状态
  33. void interruptIfStarted() {
  34. Thread t;
  35. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  36. try {
  37. t.interrupt();
  38. } catch (SecurityException ignore) {
  39. }
  40. }
  41. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/944575
推荐阅读
  

闽ICP备14008679号