当前位置:   article > 正文

ThreadPoolExecutor核心源码分析_缺省拒绝

缺省拒绝

目录

看前须知

关于线程回收的情况:

走拒绝策略的情况

线程池的状态

shutdown方法和shutdownNow方法的区别:

4种拒绝策略

重要属性和小方法

内部类 Worker

ThreadPoolExecutor构造方法

execute方法

addWorker方法

runWorker方法

getTask方法

shutdown方法和shutdownNow方法


看前须知

方法

关于线程回收的情况:

 在线程池running状态下,

  1. 非核心线程,在指定的空闲时间内,如果获取不到任务,就会被回收,走退出逻辑。

  2. 核心线程,如果 allowCoreThreadTimeOut = true,在空闲时间keepAliveTime内,获取不到任务,也会被回收。

  3. 核心线程,如果allowCoreThreadTimeOut = false,即使空闲,也不会回收。

如果调用了shutdown()方法,会中断所有空闲线程,去回收它们。

如果调用了shutdownNow()方法,会中断所有线程,去回收它们。

走拒绝策略的情况

        走到拒绝策略的情况:①线程池里的线程数达到了maximunPoolSize。②调用了shutdown或shutdowNow方法来关闭线程池,线程池的状态不是running。

线程池的状态

 

调用shutdown方法后,处于shutdown状态的线程池,提交的任务会被拒绝。shutdown状态下,会先把任务队列中的任务消费完,任务队列为空后,线程池的线程才会依次退出。最后一个退出的线程会把线程池的状态改为tidying,然后调用 terminated方法,最后将线程池改为terminated状态。

shutdown方法和shutdownNow方法的区别:

        shutdown方法将线程池改为shutdown状态,拒绝之后提交的任务。然后会遍历线程池中的所有线程,尝试获取线程的独占锁。如果获取失败,就什么也不做。如果拿到了某个线程的独占锁,说明这个线程处于空闲状态,就给这个线程一个中断信号(调用interrupt方法)。收到中断信号的线程会从waiting状态切换为runnable状态。这个线程一看线程池是shutdown状态了,就会执行退出相关的逻辑。shutdown方法会中断所有没有正在执行任务的线程,但会保证线程池以及任务队列中的任务都执行完毕。

        shutdownNow方法将线程池改为stop状态,拒绝之后提交的任务。遍历线程池中的线程,不检查线程的独占锁,直接给线程一个中断信号(调用interrupt方法)。此时的线程可能是空闲的,可能是正在执行任务。 正在执行任务的线程,收到中断信号后,还是会继续执行这个task,除非task会响应中断。如果不响应中断,就什么也不会发生。处于空闲状态的线程收到中断信号后,一看当前线程池状态为stop,它就不管任务队列中还有没有task,直接就退出了。

4种拒绝策略

  1. AbortPolicy(默认的,常用的):直接抛出异常
  2. CallerRunsPolicy:只用调用者所在线程来运行任务。线程a 向线程池提交了一个任务,但线程池拒绝了,最后还是得线程a来执行task。
  3. DiscardOldestPolicy:丢弃队列里最近的任务,并执行当前任务。线程池是running状态,任务队列已经满了,线程池也满了。会把任务队列中最老的任务丢掉。
  4. DiscardPolicy:不处理,丢掉。

重要属性和小方法

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. //高3位:表示当前线程池运行状态 除去高3位之后的低位:表示当前线程池中所拥有的线程数量
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. //表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位。不直接写29,是为了防止在某个JDK版本中Integer的大小不是4个字节,是8个字节的情况。
  5. private static final int COUNT_BITS = Integer.SIZE - 3;
  6. //低COUNT_BITS位 所能表达的最大线程数值。 000 1111111111111...111(29个1) => 5亿多。
  7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  8. // runState is stored in the high-order bits
  9. //111 00000000...00000(29个0) 转换成整数,其实是一个负数
  10. private static final int RUNNING = -1 << COUNT_BITS;
  11. //000 00000000...00000(29个0)
  12. private static final int SHUTDOWN = 0 << COUNT_BITS;
  13. //001 0000000...00000(29个0)
  14. private static final int STOP = 1 << COUNT_BITS;
  15. //010 0000000...00000(29个0)
  16. private static final int TIDYING = 2 << COUNT_BITS;
  17. //011 0000000...00000(29个0)
  18. private static final int TERMINATED = 3 << COUNT_BITS;
  19. // Packing and unpacking ctl
  20. //获取当前线程池运行状态
  21. //~000 11111111111111...111(29个1) => 111 0000000000000...00000(29个0)
  22. //假设c == ctl = 111 00000000...000000111
  23. //111 000000000...000000111
  24. //111 000000000...000000000
  25. //111 000000000...000000000(一共32位)
  26. private static int runStateOf(int c) { return c & ~CAPACITY; }
  27. //获取当前线程池线程数量
  28. //假设c == ctl = 111 00000000...0000000111
  29. //111 000000000...000000111
  30. //000 111111111...111111111
  31. //000 000000000...000000111 => 7
  32. private static int workerCountOf(int c) { return c & CAPACITY; }
  33. //用在设置当前线程池ctl值时 会用到
  34. //rs 表示线程池状态 wc 表示当前线程池中worker(线程)数量
  35. //111 000000000000000000
  36. //000 000000000000000111
  37. //111 000000000000000111
  38. private static int ctlOf(int rs, int wc) { return rs | wc; }
  39. //比较当前线程池ctl所表示的状态,是否小于某个状态s
  40. //c = 111 000000000000000111 < 000 000000000000000000 == true
  41. //所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
  42. private static boolean runStateLessThan(int c, int s) {
  43. return c < s;
  44. }
  45. //比较当前线程池ctl所表示的状态,是否大于等于某个状态s
  46. private static boolean runStateAtLeast(int c, int s) {
  47. return c >= s;
  48. }
  49. //小于SHUTDOWN 的一定是RUNNING。 SHUTDOWN == 0
  50. private static boolean isRunning(int c) {
  51. return c < SHUTDOWN;
  52. }
  53. /**
  54. * Attempts to CAS-increment the workerCount field of ctl.
  55. */
  56. //使用CAS方式 让ctl值+1 ,成功返回true, 失败返回false
  57. private boolean compareAndIncrementWorkerCount(int expect) {
  58. return ctl.compareAndSet(expect, expect + 1);
  59. }
  60. /**
  61. * Attempts to CAS-decrement the workerCount field of ctl.
  62. */
  63. //使用CAS方式 让ctl值-1 ,成功返回true, 失败返回false
  64. private boolean compareAndDecrementWorkerCount(int expect) {
  65. return ctl.compareAndSet(expect, expect - 1);
  66. }
  67. /**
  68. * Decrements the workerCount field of ctl. This is called only on
  69. * abrupt termination of a thread (see processWorkerExit). Other
  70. * decrements are performed within getTask.
  71. */
  72. //将ctl值减1,这个方法一定成功
  73. private void decrementWorkerCount() {
  74. //这里会一直重试,直到成功为止。
  75. do {} while (! compareAndDecrementWorkerCount(ctl.get()));
  76. }
  77. //任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue
  78. //workQueue 4种阻塞队列,ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue
  79. private final BlockingQueue<Runnable> workQueue;
  80. //线程池全局锁,它是一个可重入锁,增加worker、减少worker、修改线程池运行状态时 ,需要持有mainLock
  81. //同一时刻中,只有一个线程可以往线程池中 加线程 或者 减线程 或者 改线程池的状态,无法并发
  82. private final ReentrantLock mainLock = new ReentrantLock();
  83. /**
  84. * Set containing all worker threads in pool. Accessed only when
  85. * holding mainLock.
  86. */
  87. //线程池中真正存放 worker->thread 的地方。放工作线程的地方
  88. private final HashSet<Worker> workers = new HashSet<Worker>();
  89. /**
  90. * Wait condition to support awaitTermination
  91. */
  92. //当外部线程调用 awaitTermination() 方法时,外部线程会等待当前线程池状态为 Terminated 为止。阻塞的外部线程会在tryTerminate方法中被唤醒termination.signalAll();
  93. //等待是如何实现的? 就是将外部线程 封装成 waitNode 放入到 Condition 队列中了, waitNode.Thread 就是外部线程,会被park掉(处于WAITING状态)。
  94. //当线程池 状态 变为 Termination时,会去唤醒这些线程。通过 termination.signalAll() ,唤醒之后这些线程会进入到 阻塞队列,然后头结点会去抢占mainLock。
  95. //抢占到的线程,会继续执行awaitTermination() 后面的程序。这些线程最后,都会正常执行。
  96. //简单理解:termination.await() 会将线程阻塞在这。
  97. // termination.signalAll() 会将阻塞在这的线程依次唤醒
  98. private final Condition termination = mainLock.newCondition();
  99. /**
  100. * Tracks largest attained pool size. Accessed only under
  101. * mainLock.
  102. */
  103. //记录线程池生命周期内 线程数最大值
  104. private int largestPoolSize;
  105. /**
  106. * Counter for completed tasks. Updated only on termination of
  107. * worker threads. Accessed only under mainLock.
  108. */
  109. //记录线程池所完成任务总数 ,当worker退出时会将 worker完成的任务累积到completedTaskCount
  110. private long completedTaskCount;
  111. //创建线程时会使用 线程工厂,当我们使用 Executors.newFix... newCache... 创建线程池时,使用的是 DefaultThreadFactory
  112. //一般不建议使用Default线程池,推荐自己实现ThreadFactory,这样我们可以自定义命名规则,方便找出是哪块业务逻辑出现了问题。
  113. //因为Default线程池工厂的命名规则不好用,万一出现了bug,无法通过命名来判断是哪条业务线上出现了问题
  114. //DefaultThreadFactory implements ThreadFactory 在Executors 工厂类中
  115. private volatile ThreadFactory threadFactory;
  116. /**
  117. * Handler called when saturated or shutdown in execute.
  118. */
  119. //拒绝策略,juc包提供了4中方式,默认采用 Abort..抛出异常的方式。AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy
  120. private volatile RejectedExecutionHandler handler;
  121. /**
  122. * Timeout in nanoseconds for idle threads waiting for work.
  123. * Threads use this timeout when there are more than corePoolSize
  124. * present or if allowCoreThreadTimeOut. Otherwise they wait
  125. * forever for new work.
  126. */
  127. //空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出部分会被超时。
  128. //allowCoreThreadTimeOut == true 核心数量内的线程 空闲时 也会被回收。
  129. private volatile long keepAliveTime;
  130. /**
  131. * If false (default), core threads stay alive even when idle.
  132. * If true, core threads use keepAliveTime to time out waiting
  133. * for work.
  134. */
  135. //控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。
  136. //false时,线程池会维护线程数量为corePoolSize个
  137. //true时,核心线程获取任务超时 且 (当前线程数量大于1 || 工作队列为空 )时,核心线程会被回收
  138. private volatile boolean allowCoreThreadTimeOut;
  139. //核心线程数量限制。
  140. private volatile int corePoolSize;
  141. //线程池最大线程数量限制。
  142. private volatile int maximumPoolSize;
  143. /**
  144. * The default rejected execution handler
  145. */
  146. //缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式。
  147. private static final RejectedExecutionHandler defaultHandler =
  148. new AbortPolicy();
  149. ......
  150. }

内部类 Worker

  1. /** 内部类 Worker*/
  2. private final class Worker
  3. extends AbstractQueuedSynchronizer
  4. implements Runnable
  5. {
  6. //Worker采用了AQS的独占模式
  7. //独占模式:两个重要属性 state 和 ExclusiveOwnerThread
  8. //state:0时表示未被占用,=0时,才会尝试去抢锁; > 0时表示被占用,加锁了; < 0 时 表示初始状态,这种情况下不能被抢锁。
  9. //ExclusiveOwnerThread:表示独占锁的线程。谁抢到锁了,ExclusiveOwnerThread(独占者线程)就指向谁
  10. /**
  11. * This class will never be serialized, but we provide a
  12. * serialVersionUID to suppress a javac warning.
  13. */
  14. private static final long serialVersionUID = 6138294804551838833L;
  15. /** Thread this worker is running in. Null if factory fails. */
  16. //worker内部封装的工作线程
  17. final Thread thread;
  18. /** Initial task to run. Possibly null. */
  19. //假设firstTask不为空,那么当worker启动(内部的线程启动)后会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。
  20. Runnable firstTask;
  21. /** Per-thread task counter */
  22. //记录当前worker所完成任务数量。
  23. volatile long completedTasks;
  24. /**
  25. * Creates with given first task and thread from ThreadFactory.
  26. * @param firstTask the first task (null if none)
  27. */
  28. //firstTask可以为null。为null 启动后会到queue中获取任务。
  29. Worker(Runnable firstTask) {
  30. //设置AQS独占模式为初始化中状态,这个时候 不能被抢占锁。
  31. setState(-1); // inhibit interrupts until runWorker
  32. this.firstTask = firstTask;
  33. //使用线程工厂创建了一个线程,并且将当前worker 指定为 Runnable,也就是说当thread启动的时候,会以worker.run()为入口。
  34. //把当前Worker对象 传入了Thread里面。这个Worker对象会被放到 Thread.java 的private Runnable target; 属性中。
  35. //Worker类继承了AbstractQueuedSynchronizer,实现了Runnable接口
  36. //thread.start()后,会调用start0()方法,start0()是一个native方法,它会在操作系统底层申请资源,开启一个真正的线程(Windows)或者轻量级的进程 (Linux)
  37. //这个线程启动后,就会调用 thread.run() ,run()方法会调用 target属性的 run()方法,这其实就是 worker.run()。就会来到 runWorker方法中
  38. this.thread = getThreadFactory().newThread(this);
  39. }
  40. /** Delegates main run loop to outer runWorker */
  41. //当worker启动时,会执行run()
  42. public void run() {
  43. //ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入。
  44. runWorker(this);
  45. }
  46. // Lock methods
  47. //
  48. // The value 0 represents the unlocked state.
  49. // The value 1 represents the locked state.
  50. //判断当前worker的独占锁是否被独占。
  51. //0 表示未被占用
  52. //1 表示已占用
  53. protected boolean isHeldExclusively() {
  54. return getState() != 0;
  55. }
  56. //尝试去占用worker的独占锁
  57. //返回值 表示是否抢占成功
  58. protected boolean tryAcquire(int unused) {
  59. //使用CAS修改 AQS中的 state ,期望值为0(0时表示未被占用),修改成功表示当前线程抢占成功
  60. //那么则设置 ExclusiveOwnerThread 为当前线程。
  61. if (compareAndSetState(0, 1)) {
  62. setExclusiveOwnerThread(Thread.currentThread());
  63. return true;
  64. }
  65. return false;
  66. }
  67. //外部不会直接调用这个方法 这个方法是AQS 内调用的,外部调用unlock时 ,unlock->AQS.release() ->tryRelease()
  68. protected boolean tryRelease(int unused) {
  69. //将当前独占的线程置为null
  70. setExclusiveOwnerThread(null);
  71. //state 改为 0
  72. setState(0);
  73. return true;
  74. }
  75. //加锁,加锁失败时,会阻塞当前线程,直到获取到锁位置。
  76. public void lock() { acquire(1); }
  77. //尝试去加锁,如果当前锁是未被持有状态,那么加锁成功后 会返回true,否则不会阻塞当前线程,直接返回false.
  78. public boolean tryLock() { return tryAcquire(1); }
  79. //一般情况下,咱们调用unlock 要保证 当前线程是持有锁的。
  80. //特殊情况,当worker的state == -1 时,调用unlock 表示初始化state 设置state == 0
  81. //启动worker之前会先调用unlock()这个方法。会强制刷新ExclusiveOwnerThread == null State==0
  82. //worker对象.unlock-> AQS.release -> worker对象.tryRelease。因为方法重写
  83. public void unlock() { release(1); }
  84. //就是返回当前worker的lock是否被占用。
  85. public boolean isLocked() { return isHeldExclusively(); }
  86. void interruptIfStarted() {
  87. Thread t;
  88. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  89. try {
  90. t.interrupt();
  91. } catch (SecurityException ignore) {
  92. }
  93. }
  94. }
  95. }

ThreadPoolExecutor构造方法

  1. /**构造方法 */
  2. public ThreadPoolExecutor(int corePoolSize,//核心线程数限制
  3. int maximumPoolSize,//最大线程限制
  4. long keepAliveTime,//空闲线程存活时间
  5. TimeUnit unit,//时间单位 seconds nano..
  6. BlockingQueue<Runnable> workQueue,//任务队列
  7. ThreadFactory threadFactory,//线程工厂
  8. RejectedExecutionHandler handler/*拒绝策略*/) {
  9. //判断参数是否越界
  10. if (corePoolSize < 0 ||
  11. maximumPoolSize <= 0 ||
  12. maximumPoolSize < corePoolSize ||
  13. keepAliveTime < 0)
  14. throw new IllegalArgumentException();
  15. //工作队列 和 线程工厂 和 拒绝策略 都不能为空。
  16. if (workQueue == null || threadFactory == null || handler == null)
  17. throw new NullPointerException();
  18. this.acc = System.getSecurityManager() == null ?
  19. null :
  20. AccessController.getContext();
  21. this.corePoolSize = corePoolSize;
  22. this.maximumPoolSize = maximumPoolSize;
  23. this.workQueue = workQueue;
  24. this.keepAliveTime = unit.toNanos(keepAliveTime);
  25. this.threadFactory = threadFactory;
  26. this.handler = handler;
  27. }

execute方法

  1. //command 可以是普通的Runnable 实现类,也可以是 FutureTask
  2. /*execute 方法就是为了提交任务到线程池,创建线程去执行它。execute方法的核心是addWorker方法、workQueue.offer方法、reject方法
  3. 情况1.当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,去执行任务。
  4. 如果addWorker成功,则返回。
  5. 如果失败了,就有2种情况,
  6. 1.1.线程池running状态下,存在并发现象。多个线程同时调用execute方法,其他线程创建worker后,线程数量又达到核心线程数了。
  7. 尝试将task放入任务队列中。
  8. 1.1.1 如果放入队列成功,就获取ctl值,再次检查线程池状态。
  9. 如果任务提交到工作队列后,调用了shutdown、shutdownNow,就需要remove刚刚提交的任务。
  10. 如果提交的任务还没有被线程池中的线程消费,就remove成功,将任务从工作队列中删除,再执行拒绝策略。
  11. 如果提交之后,在shutdown() shutdownNow()之前,就任务被线程池中的线程 给处理,就remove失败。
  12. 如果任务提交后,线程池还是running状态,就会有一个担保机制:
  13. 当前线程池是running,但是线程池中的存活线程数量是0,就会调用addWorker方法,创建一个线程去执行任务。
  14. 1.1.2 如果放入队列失败,则说明使用的是有界队列,且队列满了。
  15. 此时就看maximumPoolSize了,如果线程数大于等于maximum,就走拒绝策略。否则,就创建一个新的Worker去执行任务。
  16. 1.2.当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, core=true|false) 一定会失败。
  17. 此时,会采用拒绝策略,拒绝提交任务了。
  18. 情况2.当前线程数量大于等于核心线程数,分2类情况:
  19. 2.1.线程池是running状态,尝试将task放入工作队列中,后面的情况之前说过,就不说了。
  20. 2.2.线程池不是running状态,会采用拒绝策略,拒绝提交任务了。
  21. * */
  22. public void execute(Runnable command) {
  23. //非空判断..
  24. if (command == null)
  25. throw new NullPointerException();
  26. //获取ctl最新值赋值给c,ctl :高3位 表示线程池状态,低位表示当前线程池线程数量。
  27. int c = ctl.get();
  28. //workerCountOf(c) 获取出当前线程池中的线程数量
  29. //条件成立:表示当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新的线程。
  30. if (workerCountOf(c) < corePoolSize) {
  31. //addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask
  32. //core == true 表示采用核心线程数量限制 false表示采用 maximumPoolSize
  33. if (addWorker(command, true))
  34. //创建成功后,直接返回。addWorker方法里面会启动新创建的worker,将firstTask执行。
  35. return;
  36. //执行到这条语句,说明addWorker一定是失败了...
  37. //有几种可能呢??
  38. //1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize成立后,
  39. //其它线程可能也成立了,并且向线程池中创建了worker。这个时候线程池中的核心线程数已经达到,所以...
  40. //2.当前线程池状态发生改变了。 RUNNING SHUTDOWN STOP TIDYING TERMINATED
  41. //当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, true|false) 一定会失败。
  42. //SHUTDOWN 状态下,也有可能创建成功。前提 firstTask == null 而且当前 queue 不为空。特殊情况。
  43. c = ctl.get();
  44. }
  45. //执行到这里有几种情况?
  46. //1.当前线程数量已经达到corePoolSize
  47. //2.addWorker失败..
  48. //条件成立:说明当前线程池处于running状态,则尝试将 task 放入到workQueue中。
  49. if (isRunning(c) && workQueue.offer(command)) {
  50. //执行到这里,说明offer提交任务成功了..
  51. //再次获取ctl保存到recheck。
  52. int recheck = ctl.get();
  53. //条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
  54. //这种情况 需要把刚刚提交的任务删除掉。
  55. //条件二:remove(command) 有可能成功,也有可能失败
  56. //成功:提交之后,线程池中的线程还未消费(处理)
  57. //失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理。就返回了。
  58. if (! isRunning(recheck) && remove(command))
  59. //提交之后线程池状态为 非running 且 任务出队成功,走个拒绝策略。
  60. reject(command);
  61. //有几种情况会到这里?
  62. //1.当前线程池是running状态(这个概率最大)
  63. //2.线程池状态是非running状态 但是remove提交的任务失败.
  64. //担心 当前线程池是running状态,但是线程池中的存活线程数量是0,这个时候,如果是0的话,会很尴尬,任务没线程去跑了,
  65. //这里其实是一个担保机制,保证线程池在running状态下,最起码得有一个线程在工作。
  66. else if (workerCountOf(recheck) == 0)
  67. addWorker(null, false);
  68. }
  69. //执行到这里,有几种情况?
  70. //1.offer失败,使用的是有界队列,队列已经满了
  71. //2.当前线程池是非running状态
  72. //1.offer失败,需要做什么? 说明当前queue 满了!这个时候 如果当前线程数量尚未达到maximumPoolSize的话,会创建新的worker直接执行command
  73. //假设当前线程数量达到maximumPoolSize的话,即使线程池running,addWorker也会失败,也走拒绝策略。
  74. //2.线程池状态为非running状态,这个时候因为 command != null addWorker 一定是返回false。
  75. else if (!addWorker(command, false))
  76. reject(command);
  77. }

addWorker方法

  1. /**
  2. addWorker方法逻辑,线程池中的线程不分核心,不核心,都是线程,只不过创建的时候,会根据线程池数量阈值(corePoolSize/maximumPoolSize),去执行不同的判断逻辑。
  3. 1.retry自旋中。判断 当前线程池状态 是否允许创建worker(添加线程)。
  4. 1.1.不允许创建worker的情况为
  5. 一、线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION);
  6. 二、rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null;
  7. 1.2.如果不是这些情况,就开始内部自旋,获取创建线程的令牌。
  8. (所谓的令牌就是看能不能通过CAS的方式将ctl +1,这代表增加一个worker)
  9. 内部自旋中,会根据形参core,来判断当前线程池中线程数量 是否 大于等于corePoolSize/maximumPoolSize。如果达到了指定限制,就创建worker失败,return false。
  10. 如果没达到限制,就通过CAS的方式将线程数量 +1.
  11. 1.2.1. CAS成功,就跳出retry自旋。
  12. 1.2.2. CAS失败,重新检查线程池的状态,会跳到对应的自旋中。发生竞争/其他线程调用了shutdown、shutdownNow导致线程池状态改变了,都会使CAS失败。
  13. 2.已经通过CAS的方式将ctl +1了,线程数量加1。开始尝试创建worker,并启动对应的thread。
  14. 2.1.先获取全局锁 mainLock,
  15. 2.2.检查当前线程池的状态。确保添加worker 发生在running状态 或 (shutdown状态 且 firstTask为空)下。
  16. 2.3.检查当前worker的thread 是否 已经启动了。如果启动了,就抛出异常。
  17. 2.4.将worker添加到workers(HashSet)中。
  18. 2.5.判断一下最大线程数量是否突破历史最大值?largestPoolSize
  19. 2.6.将workerAdded 标志设为true。释放全局锁。将当前worker对应的线程 start。将workerStarted设为 true;
  20. 2.7.如果thread启动失败,就要做一些清理工作。
  21. 2.7.1.获取全局锁
  22. 2.7.2.在workers中remove worker
  23. 2.7.3.通过CAS的方式将线程数量-1
  24. 2.7.4.释放全局锁。
  25. 方法返回值总结:
  26. true 表示创建worker成功,把这个worker放入workers(HashSet)中了,且线程启动。
  27. false 表示创建失败。
  28. 1.线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
  29. 2.rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
  30. 3.当前线程池已经达到指定指标(coprePoolSize 或者 maximumPoolSize)
  31. 4.threadFactory 创建的线程是null
  32. */
  33. private boolean addWorker(Runnable firstTask, boolean core) {
  34. //自旋 判断当前线程池状态是否允许创建线程的事情。
  35. retry:
  36. for (;;) {
  37. //获取当前ctl值保存到c
  38. int c = ctl.get();
  39. //获取当前线程池运行状态 保存到rs长
  40. int rs = runStateOf(c);
  41. //条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是running状态
  42. //条件二:前置条件,当前的线程池状态不是running状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
  43. //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
  44. //表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的。 & 当前任务队列不是空
  45. //排除掉这种情况,当前线程池是SHUTDOWN状态,但是队列里面还有任务尚未处理完,这个时候是允许添加worker,但是不允许再次提交task。
  46. if (rs >= SHUTDOWN &&
  47. ! (rs == SHUTDOWN &&
  48. firstTask == null &&
  49. ! workQueue.isEmpty()))
  50. //什么情况下回返回false?
  51. //线程池状态 rs > SHUTDOWN
  52. //rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
  53. return false;
  54. //上面这些代码,就是判断 当前线程池状态 是否允许添加线程。
  55. //内部自旋 获取创建线程令牌的过程。
  56. // 所谓的创建线程的令牌,就是看能不能通过CAS的方式将ctl +1,代表增加一个worker
  57. for (;;) {
  58. //获取当前线程池中线程数量 保存到wc中
  59. int wc = workerCountOf(c);
  60. //条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
  61. //条件二:wc >= (core ? corePoolSize : maximumPoolSize)
  62. //core == true ,判断当前线程数量是否>=corePoolSize,会拿核心线程数量做限制。
  63. //core == false,判断当前线程数量是否>=maximumPoolSize,会拿最大线程数量做限制。
  64. if (wc >= CAPACITY ||
  65. wc >= (core ? corePoolSize : maximumPoolSize))
  66. //执行到这里,说明当前无法添加线程了,已经达到指定限制了
  67. return false;
  68. //条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌。
  69. //条件失败:说明可能有其它线程,修改过ctl这个值了。
  70. //可能发生过什么事?
  71. //1.其它线程execute() 申请过令牌了,在这之前。导致CAS失败
  72. //2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生变化了,咱们知道 ctl 高3位表示状态
  73. //状态改变后,cas也会失败。
  74. if (compareAndIncrementWorkerCount(c))
  75. //进入到这里面,一定是cas成功啦!申请到令牌了
  76. //直接跳出了 retry 外部这个for自旋。
  77. break retry;
  78. //CAS失败,没有成功的申请到令牌
  79. //获取最新的ctl值
  80. c = ctl.get(); // Re-read ctl
  81. //判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown. shutdownNow 会导致状态变化。
  82. if (runStateOf(c) != rs)
  83. //状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程。
  84. continue retry;
  85. // else CAS failed due to workerCount change; retry inner loop
  86. }
  87. }
  88. //现在可以允许尝试创建worker了
  89. //表示创建的worker是否已经启动,false未启动 true启动
  90. boolean workerStarted = false;
  91. //表示创建的worker是否添加到池子中了 默认false 未添加 true是添加。
  92. boolean workerAdded = false;
  93. //w表示后面创建worker的一个引用。
  94. Worker w = null;
  95. try {
  96. //创建Worker,执行完后,线程应该是已经创建好了。
  97. w = new Worker(firstTask);
  98. //将新创建的worker节点的线程 赋值给 t
  99. final Thread t = w.thread;
  100. //为什么要做 t != null 这个判断?
  101. //为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现。
  102. //万一哪个 小哥哥 脑子一热,有bug,创建出来的线程 是null、、
  103. //Doug lea考虑的比较全面。肯定会防止他自己的程序报空指针,所以这里一定要做!
  104. if (t != null) {
  105. //将全局锁的引用保存到mainLock
  106. final ReentrantLock mainLock = this.mainLock;
  107. //持有全局锁,可能会阻塞,直到获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持锁。
  108. mainLock.lock();
  109. //从这里加锁之后,其它线程 是无法修改当前线程池状态的。
  110. try {
  111. // Recheck while holding lock.
  112. // Back out on ThreadFactory failure or if
  113. // shut down before lock acquired.
  114. //获取最新线程池运行状态保存到rs中
  115. int rs = runStateOf(ctl.get());
  116. //条件一:rs < SHUTDOWN 成立:最正常状态,当前线程池为RUNNING状态.
  117. //条件二:前置条件:当前线程池状态不是RUNNING状态。
  118. //(rs == SHUTDOWN && firstTask == null) 当前状态为SHUTDOWN状态且firstTask为空。其实判断的就是SHUTDOWN状态下的特殊情况,
  119. //只不过这里不再判断队列是否为空了
  120. if (rs < SHUTDOWN ||
  121. (rs == SHUTDOWN && firstTask == null)) {
  122. //t.isAlive() 当线程start后,线程isAlive会返回true。
  123. //防止脑子发热的程序员,ThreadFactory创建线程返回给外部之前,将线程start了。。
  124. if (t.isAlive()) // precheck that t is startable
  125. throw new IllegalThreadStateException();
  126. //将咱们创建的worker添加到线程池中。
  127. workers.add(w);
  128. //获取最新当前线程池线程数量
  129. int s = workers.size();
  130. //条件成立:说明当前线程数量是一个新高。更新largestPoolSize
  131. if (s > largestPoolSize)
  132. largestPoolSize = s;
  133. //表示线程已经追加进线程池中了。
  134. //其实线程池中的线程不分核心,不核心,都是线程,
  135. // 只不过创建的时候,会根据线程池数量阈值(corePoolSize/maximumPoolSize),去执行不同的判断逻辑。
  136. workerAdded = true;
  137. }
  138. } finally {
  139. //释放线程池全局锁。
  140. mainLock.unlock();
  141. }
  142. //条件成立:说明 添加worker成功
  143. //条件失败:说明线程池在lock之前,线程池状态发生了变化,导致添加失败。
  144. if (workerAdded) {
  145. //成功后,则将创建的worker启动,线程启动。
  146. t.start();
  147. //启动标记设置为true
  148. workerStarted = true;
  149. }
  150. }
  151. }finally {
  152. //条件成立:! workerStarted 说明启动失败,需要做清理工作。
  153. if (! workerStarted)
  154. //失败时做什么清理工作?
  155. //1.释放令牌,就是通过CAS的方式将 ctl -1
  156. //2.将当前worker清理出workers集合
  157. addWorkerFailed(w);
  158. }
  159. //返回新创建的线程是否启动。
  160. return workerStarted;
  161. }

runWorker方法

  1. /**
  2. 从整体来看,workers.size() 与 ctl低位表示的线程数量 是一致的。
  3. runWorker方法的调用逻辑是
  4. 在addWorker里面启动thread后,w.thread.start -> start0方法 -> 操作系统中开启真正的线程
  5. -> thread.run方法 ->worker.run方法 -> ThreadPoolExecutor.runWorker方法
  6. runWorker的大体逻辑:
  7. 1.调用unlock方法,初始化state == 0 和 exclusiveOwnerThread ==null
  8. 2.while(当前worker.firstTask不为空 || getTask()的返回值不为空,当前线程从工作队列中成功获取到了任务) 时,即将开始执行任务。getTask()这个方法是会阻塞的。
  9. 2.1.当前线程获取worker的独占锁
  10. 2.2.如果线程池目前处于STOP/TIDYING/TERMINATED且当前线程中断标记位为false,则给当前线程一个中断信号
  11. 2.3.调用钩子方法 beforeExecute(wt, task)
  12. 2.4.调用task.run()方法。
  13. task 可能是FutureTask(调用submit方法提交的任务) 也可能是 普通的Runnable接口实现类(调用execute方法提交的任务)
  14. 2.5.调用钩子方法afterExecute(task, thrown)
  15. 2.6.在finally块中,更新worker完成任务数量(包括task.run()抛出异常的任务)。w.completedTasks++; 并 释放掉worker的独占锁。
  16. 此时,正常情况下,会再次回到getTask()从工作队列中获取任务 while(getTask...)
  17. 但也可能,task.run()时内部抛出异常了。注意:try/catch仅仅捕捉异常,是不会跳出while循环的。但用了throw关键字,就会跳出去。
  18. 3.如果getTask方法返回空,说明当前线程应该执行退出逻辑了。
  19. 4.当前线程可能是正常退出 completedAbruptly == false;也可能异常退出 completedAbruptly == true。
  20. worker退出的逻辑在processWorkerExit方法里
  21. 对于正常退出,
  22. 1.获取全局锁,将当前worker完成的task数量,汇总到线程池的completedTaskCount;将worker从池子中移除
  23. 2.释放全局锁。调用 tryTerminate()方法
  24. 3.如果当前线程池为running 或 (shutdown 但工作队列不空),
  25. 3.1.根据allowCoreThreadTimeOut,看看核心线程是否支持超时机制,
  26. 以此来计算线程池需要维持的最低线程数量。有3种可能,0,1,corePoolSize。
  27. 如果当前线程数量 大于等于 最低值,则return。
  28. 如果当前线程数量 小于 最低值,就会 addWorker(null, false)
  29. 对于异常退出,
  30. 1.通过CAS的方式,将ctl -1.
  31. 2.获取全局锁,将当前worker完成的task数量,汇总到线程池的completedTaskCount;将worker从池子中移除
  32. 3.释放全局锁。调用 tryTerminate()方法
  33. 4.如果当前线程池状态为 RUNNING 或 (shutdown 但工作队列不空),就需要创建一个新worker顶上去,addWorker(null, false)
  34. */
  35. final void runWorker(Worker w) {
  36. //wt == w.thread
  37. Thread wt = Thread.currentThread();
  38. //将初始执行task赋值给task
  39. Runnable task = w.firstTask;
  40. //清空当前w.firstTask引用
  41. w.firstTask = null;
  42. //这里为什么先调用unlock? 就是为了初始化worker state == 0 和 exclusiveOwnerThread ==null
  43. w.unlock();
  44. //是否是突然退出,true->发生异常了,当前线程是突然退出,回头需要做一些处理
  45. //false->正常退出。
  46. boolean completedAbruptly = true;
  47. try {
  48. //条件一:task != null 指的就是firstTask是不是null,如果不是null,直接执行循环体里面。
  49. //条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask这个方法是一个会阻塞线程的方法
  50. //getTask如果返回null,当前线程需要执行结束逻辑。
  51. while (task != null || (task = getTask()) != null) {
  52. //worker设置独占锁 为当前线程
  53. //为什么要设置独占锁呢?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作。
  54. //调用shutdown方法时,会去判断一下能不能拿到当前worker的独占锁,拿不到就去检查下一个worker。如果拿到了,就会给对应的线程一个中断信号。
  55. //线程收到中断信号后,会被唤醒。
  56. w.lock();
  57. //条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池目前处于STOP/TIDYING/TERMINATED 此时线程一定要给它一个中断信号
  58. //中断是线程的一个标记位。如果程序是响应中断的话,就会去检查这个标记位,然后做一些相关的逻辑。如果程序不响应中断,那这个中断信号,就没用。
  59. //条件一成立:runStateAtLeast(ctl.get(), STOP)&& !wt.isInterrupted()
  60. //上面如果成立:说明当前线程池状态是>=STOP 且 当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断。
  61. //假设:runStateAtLeast(ctl.get(), STOP) == false
  62. // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) 在干吗呢?
  63. // Thread.interrupted() 获取当前中断状态,且设置中断位为false。连续调用两次,这个interrupted()方法 第二次一定是返回false.
  64. // runStateAtLeast(ctl.get(), STOP) 大概率这里还是false.
  65. // 其实它在强制刷新当前线程的中断标记位 false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位 设置为了 true,且没有处理
  66. // 这里一定要强制刷新一下。不会再影响到后面的task了。
  67. //假设:Thread.interrupted() == true 且 runStateAtLeast(ctl.get(), STOP)) == true
  68. //这种情况有发生几率么?
  69. //有可能,因为外部线程在 第一次 (runStateAtLeast(ctl.get(), STOP) == false 后,有机会调用shutdown 、shutdownNow方法,将线程池状态修改
  70. //这个时候,也会将当前线程的中断标记位 再次设置回 中断状态。
  71. if ((runStateAtLeast(ctl.get(), STOP) ||
  72. (Thread.interrupted() &&
  73. runStateAtLeast(ctl.get(), STOP))) &&
  74. !wt.isInterrupted())
  75. wt.interrupt();
  76. try {
  77. //钩子方法,留给子类实现的
  78. beforeExecute(wt, task);
  79. //表示异常情况,如果thrown不为空,表示 task运行过程中 向上层抛出异常了。
  80. Throwable thrown = null;
  81. try {
  82. //task 可能是FutureTask(调用submit方法提交的任务) 也可能是 普通的Runnable接口实现类(调用execute方法提交的任务)。
  83. //如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。这个不清楚,请看上一期,在b站。
  84. task.run();
  85. } catch (RuntimeException x) {
  86. thrown = x; throw x;
  87. } catch (Error x) {
  88. thrown = x; throw x;
  89. } catch (Throwable x) {
  90. thrown = x; throw new Error(x);
  91. } finally {
  92. //钩子方法,留给子类实现的
  93. afterExecute(task, thrown);
  94. }
  95. } finally {
  96. //将局部变量task置为Null
  97. task = null;
  98. //更新worker完成任务数量
  99. w.completedTasks++;
  100. //worker处理完一个任务后,会释放掉独占锁
  101. //1.正常情况下,会再次回到getTask()那里获取任务 while(getTask...)
  102. //2.task.run()时内部抛出异常了..
  103. w.unlock();
  104. }
  105. }
  106. //什么情况下,会来到这里?
  107. //getTask()方法返回null时,说明当前线程应该执行退出逻辑了。
  108. completedAbruptly = false;
  109. }finally {
  110. //task.run()内部抛出异常时,直接从 w.unlock() 那里 跳到这一行。
  111. //正常退出 completedAbruptly == false
  112. //异常退出 completedAbruptly == true
  113. processWorkerExit(w, completedAbruptly);
  114. }
  115. }

getTask方法

  1. /**
  2. getTask方法是用来给线程池中的线程从工作队列中获取任务的。
  3. getTask方法的大体逻辑:
  4. 开启for自旋
  5. 1.获取ctl值、当前线程池状态。
  6. 如果状态>= STOP || 状态为SHUTDOWN 且 工作队列为空,则通过CAS的方式将ctl -1。CAS一定成功。返回null。
  7. 2.获取线程数量wc,判断当前线程获取task 是否支持超时机制,timed。
  8. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  9. 为什么这样写?因为当前线程可能是核心线程,可能是非核心线程。核心、非核心都是worker
  10. 核心线程对应 allowCoreThreadTimeOut。非核心对应wc > corePoolSize
  11. timed为true 表示支持超时机制,使用queue.poll(xxx,xxx);获取任务。如果超时了,那线程拿到的Runnable 为空。下次自旋,当前线程就可能会被回收。
  12. timed为false 表示不支持超时机制的,当前线程会使用 queue.take();获取任务。无限期的阻塞。
  13. 2.1.如果线程池中的线程数量 超过 最大限制maximumPoolSize时,通过CAS的方式让ctl-1(线程数量-1)。即将回收当前worker,当前线程要退出。
  14. CAS成功则返回null。CAS失败(可能是竞争,可能是状态变了),则再次自旋。
  15. 2.2.如果当前线程 获取任务超时,且当前线程支持超时机制,
  16. 此时,如果线程数量 大于1 或 工作队列为空,通过CAS的方式让ctl-1(线程数量-1)。即将回收当前worker,当前线程要退出。
  17. 3.获取任务。
  18. 如果线程拿到的Runnable不为空,则返回。
  19. 如果为空,说明当前线程超时了。自旋。
  20. 什么情况下会返回null?
  21. 1.rs >= STOP 成立说明:当前的状态最低也是STOP状态,
  22. 2. SHUTDOWN 且 工作队列为空
  23. 3.线程池中的线程数量 超过 最大限制maximumPoolSize时,会有一部分线程返回Null
  24. 4.线程池中的线程数超过corePoolSize时,会有一部分线程 超时后,返回null。
  25. */
  26. private Runnable getTask() {
  27. //表示当前线程获取任务是否超时 默认false true表示已超时
  28. boolean timedOut = false;
  29. //自旋
  30. for (;;) {
  31. //获取最新ctl值保存到c中。
  32. int c = ctl.get();
  33. //获取线程池当前运行状态
  34. int rs = runStateOf(c);
  35. //条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING状态,可能是 SHUTDOWN/STOP....
  36. //条件二:(rs >= STOP || workQueue.isEmpty())
  37. //2.1:rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
  38. //2.2:前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()条件成立:说明当前线程池状态为SHUTDOWN状态 且 任务队列已空,此时一定返回null。
  39. //返回null,runWorker方法就会将返回Null的线程执行线程退出线程池的逻辑。
  40. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  41. //使用CAS+死循环的方式让 ctl值 -1
  42. decrementWorkerCount();
  43. return null;
  44. }
  45. //执行到这里,有几种情况?
  46. //1.线程池是RUNNING状态
  47. //2.线程池是SHUTDOWN状态 但是队列还未空,此时可以创建线程。
  48. //获取线程池中的线程数量
  49. int wc = workerCountOf(c);
  50. //timed == true 表示当前这个线程 获取 task 时 是支持超时机制的,使用queue.poll(xxx,xxx); 当获取task超时的情况下,下一次自旋就可能返回null了。
  51. //timed == false 表示当前这个线程 获取 task 时 是不支持超时机制的,当前线程会使用 queue.take();
  52. //情况1:allowCoreThreadTimeOut == true 表示核心线程数量内的线程 也可以被回收。
  53. //所有线程 都是使用queue.poll(xxx,xxx) 超时机制这种方式获取task.
  54. //情况2:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程。
  55. //wc > corePoolSize
  56. //条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程,都是用poll 支持超时的方式去获取任务,
  57. //这样,就会可能有一部分线程获取不到任务,获取不到任务 返回Null,然后..runWorker会执行线程退出逻辑。
  58. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  59. //条件一:(wc > maximumPoolSize || (timed && timedOut))
  60. //1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize()方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
  61. //1.2: (timed && timedOut) 条件成立:前置条件,当前线程使用 poll方式获取task。上一次循环时 使用poll方式获取任务时,超时了
  62. //条件一 为true 表示 线程可以被回收,达到回收标准,当确实需要回收时再回收。
  63. //条件二:(wc > 1 || workQueue.isEmpty())
  64. //2.1: wc > 1 条件成立,说明当前线程池中还有其他线程,当前线程可以直接回收,返回null
  65. //2.2: workQueue.isEmpty() 前置条件 wc == 1, 条件成立:说明当前任务队列 已经空了,最后一个线程,也可以放心的退出。
  66. if ((wc > maximumPoolSize || (timed && timedOut))
  67. && (wc > 1 || workQueue.isEmpty())) {
  68. //使用CAS机制 将 ctl值 -1 ,减1成功的线程,返回null
  69. //CAS成功的,返回Null
  70. //CAS失败? 为什么会CAS失败?
  71. //1.其它线程先你一步退出了
  72. //2.线程池状态发生变化了。
  73. if (compareAndDecrementWorkerCount(c))
  74. return null;
  75. //再次自旋时,timed有可能就是false了,因为当前线程cas失败,很有可能是因为其它线程成功退出导致的,再次咨询时
  76. //检查发现,当前线程 就可能属于 不需要回收范围内了。
  77. continue;
  78. }
  79. try {
  80. //获取任务的逻辑
  81. //1.timed为true,表示前线程获取 task 时,是支持超时机制的。如果超时了,那线程拿到的Runnable 为空。
  82. // 下次自旋,当前线程就可能会被回收。
  83. //2.timed为false,表示当前线程是不支持超时机制的,就会调用take()方法,无限期的阻塞。
  84. Runnable r = timed ?
  85. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  86. workQueue.take();
  87. //条件成立:返回任务
  88. if (r != null)
  89. return r;
  90. //说明当前线程超时了...
  91. timedOut = true;
  92. } catch (InterruptedException retry) {
  93. timedOut = false;
  94. }
  95. }
  96. }

shutdown方法和shutdownNow方法

  1. public void shutdown() {
  2. shutdown方法的大体逻辑
  3. 1.获取全局锁mainLock。做权限判断checkShutdownAccess。假设有权限:
  4. 2.设置线程池状态为SHUTDOWN
  5. 3.中断所有空闲线程,interruptIdleWorkers方法。
  6. 如果worker.thread正在执行任务,就不管。
  7. 如果worker.thread在getTask中处于阻塞状态,则给一个中断信号,去执行退出逻辑。
  8. 4.调用钩子方法onShutdown()
  9. 5.释放全局锁
  10. 6.调用tryTerminate()方法
  11. }
  12. public List<Runnable> shutdownNow() {
  13. //返回值引用
  14. List<Runnable> tasks;
  15. 1.获取线程池全局锁mainLock。做权限判断checkShutdownAccess。假设有权限:
  16. 2.设置线程池状态为STOP
  17. 3.中断线程池中所有线程,interruptWorkers方法。如果worker内的thread 是启动状态,则给它一个中断信号
  18. 4.导出未处理的tasks列表
  19. 5.调用tryTerminate()方法
  20. 6.返回当前任务队列中未处理的tasks列表
  21. }
  22. /**
  23. 尝试终止
  24. */
  25. final void tryTerminate() {
  26. for (;;) {
  27. 获取最新的ctl值
  28. if(
  29. 线程池状态为running
  30. || 状态大于等于 TIDYING
  31. || 状态为 SHUTDOWN && 工作队列不空
  32. ){
  33. return;
  34. }
  35. //如果线程池状态为STOP 或 SHUTDOWN 且 队列已经空了,线程唤醒后,都会执行退出逻辑。
  36. // 线程在queue.poll()/.take()处唤醒后,先在getTask中ctl-1 -> runWorker
  37. // -> processWorkerExit中 移除worker 并调用tryTerminate方法,唤醒下一空闲线程。
  38. if (当前线程池中的线程数量 > 0) { // Eligible to terminate
  39. //中断一个空闲线程。
  40. //空闲线程,在哪空闲呢? 在getTask方法中断queue.take() | queue.poll() 处阻塞
  41. //1.唤醒后的线程 会在getTask()方法返回null
  42. //2.执行退出逻辑(processWorkerExit)的时候会再次调用tryTerminate() 唤醒下一个空闲线程
  43. //3.因为线程池状态是 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了) 最终调用addWorker时,会失败。
  44. //最终空闲线程都会在这里退出,非空闲线程 当执行完当前task时,也会调用tryTerminate方法,有可能会走到这里。
  45. interruptIdleWorkers(ONLY_ONE);
  46. return;
  47. }
  48. //workerCountOf(c) == 0时,代表当前线程是 最后一个要退出的线程
  49. 获取全局锁mainLock
  50. 通过CAS的方式,将线程池状态设为TIDYING
  51. 调用钩子方法terminated()
  52. 将线程池状态设为TERMINATED
  53. 唤醒调用 awaitTermination()而阻塞的 外部线程。termination.signalAll()
  54. 释放全局锁
  55. }
  56. }

本文图片来源于小刘老师(哔哩哔哩:小刘讲源码)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/812554
推荐阅读
相关标签
  

闽ICP备14008679号