当前位置:   article > 正文

Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)

do {} while (! compareanddecrementworkercount(ctl.get()));

这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情。并发课题对于Java来说是一个又重要又难的一大块,除非气定神闲、精力满满,否则我本身是不敢随便写这个话题的。随便出一个生涩、浮于表面的文章,我觉得意义不大。所以一直就搁置到现在。这一次开启,有一个小小的契机:我自己面试中,已经被问烂了的构造函数的几个参数有什么意义,这种问题,发现其实很多人并不了解。就着这次的机会,我就重开这个课题。

一、基本的一些准备知识

李老爷子的线程池堪称经典,老爷子也因此风靡全球开发者圈子,阅读了源码,你才能感受到什么叫做编程思想,我们普普通通的CRUD简直都弱爆了!老爷子牛逼点也在于,源码中的注释非常完备,这不得不佩服:思想牛逼一方面,能把思想完善、由浅入深的表述出来,我觉得更牛逼!其中对于这个ThreadPoolExecutor的基础知识的了解,我觉得完全可以看注释就可以学全了。要想了解线程池源码,我们先要了解如下几个方面:

  • 线程池的几种状态
  • 线程池的状态表述
  • 状态的使用的方式
  • 线程池的构造函数

1、线程池的几种状态

最关键的是几种扭转的状态,让我们直接上老爷子的注释:

  1. /* The runState provides the main lifecycle control, taking on values:
  2. *
  3. * RUNNING: Accept new tasks and process queued tasks
  4. * SHUTDOWN: Don't accept new tasks, but process queued tasks
  5. * STOP: Don't accept new tasks, don't process queued tasks,
  6. * and interrupt in-progress tasks
  7. * TIDYING: All tasks have terminated, workerCount is zero,
  8. * the thread transitioning to state TIDYING
  9. * will run the terminated() hook method
  10. * TERMINATED: terminated() has completed
  11. *
  12. * The numerical order among these values matters, to allow
  13. * ordered comparisons. The runState monotonically increases over
  14. * time, but need not hit each state. The transitions are:
  15. *
  16. *
  17. * (下面是几种转态转换的根本的基本方式,很简单的英文,不用翻译)
  18. * RUNNING -> SHUTDOWN
  19. * On invocation of shutdown(), perhaps implicitly in finalize()
  20. * (RUNNING or SHUTDOWN) -> STOP
  21. * On invocation of shutdownNow()
  22. * SHUTDOWN -> TIDYING
  23. * When both queue and pool are empty
  24. * STOP -> TIDYING
  25. * When pool is empty
  26. * TIDYING -> TERMINATED
  27. * When the terminated() hook method has completed
  28. *
  29. * Threads waiting in awaitTermination() will return when the
  30. * state reaches TERMINATED.
  31. */
  • RUNNING:接受新的任务,并且也继续运行阻塞队列里面的任务
  • SHUTDOWN:不接受新的任务了,但是可以继续执行阻塞队列里面的任务
  • STOP:不接受新的任务了,也不运行阻塞队列里面的任务了,并且去打断正在执行的任务
  • TIDYING:所有的任务都已经终止了,workerCount(任务数)是0,线程池运行到了这个状态之后,会去调terminated()这个方法
  • TERMINATED:terminated()这个方法执行完成

2、线程池的状态表述

同样,上源码:

  1. // ctl这是一个很重要的参数,使用位标记方式,将当前线程池状态和当前线程池创建的任务多少杂糅到了一起
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 预留三位
  4. private static final int COUNT_BITS = Integer.SIZE - 3;
  5. // 线程池最大线程大小:(2^29)-1 (about 500 million)
  6. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  7. // 线程池状态位,使用int的高三位进行储存
  8. private static final int RUNNING = -1 << COUNT_BITS;
  9. private static final int SHUTDOWN = 0 << COUNT_BITS;
  10. private static final int STOP = 1 << COUNT_BITS;
  11. private static final int TIDYING = 2 << COUNT_BITS;
  12. private static final int TERMINATED = 3 << COUNT_BITS;
  13. // 通过ctl值计算出运行线程池状态值
  14. private static int runStateOf(int c) { return c & ~CAPACITY; }
  15. // 通过ctl值计算出线程池当前任务多少的值
  16. private static int workerCountOf(int c) { return c & CAPACITY; }
  17. // 通过运行状态和任务多少的两个值,生成ctl这个包装的值
  18. private static int ctlOf(int rs, int wc) { return rs | wc; }

思想也很简单:大家熟知的int类型,是占四字节,32位的。为了状态操作的高效与空间节约,老爷子使用了位操作来控制。其中32位的高三位用来存储线程池的状态;低29位用来控制当前线程池有多少个线程。上面的源码就是对位操作的基本实现(都是基本的位操作,我这里不在累赘)

3、状态的使用的方式

这里会给出几个源码中,对状态和线程数量操控的方式:

  1. // (c:ctl,s:state)当前线程池的状态,是不是小于给定的状态
  2. private static boolean runStateLessThan(int c, int s) {
  3. return c < s;
  4. }
  5. // (c:ctl,s:state)当前线程池的状态,是不是大于等于给定的状态
  6. private static boolean runStateAtLeast(int c, int s) {
  7. return c >= s;
  8. }
  9. // 当前线程池的状态是RUNNING的吗
  10. private static boolean isRunning(int c) {
  11. return c < SHUTDOWN;
  12. }
  13. // 使用CAS原理对当前线程池线程数量值加一
  14. private boolean compareAndIncrementWorkerCount(int expect) {
  15. return ctl.compareAndSet(expect, expect + 1);
  16. }
  17. // 使用CAS原理对当前线程池线程数量值减一
  18. private boolean compareAndDecrementWorkerCount(int expect) {
  19. return ctl.compareAndSet(expect, expect - 1);
  20. }
  21. // 使用CAS原理对当前线程池线程数量值减一,直到成功为止
  22. private void decrementWorkerCount() {
  23. do {} while (! compareAndDecrementWorkerCount(ctl.get()));
  24. }

下面的源码是对线程状态修改源码:

  1. private void advanceRunState(int targetState) {
  2. // 这是一个死循环,直到修改成功才break
  3. for (;;) {
  4. int c = ctl.get();
  5. if (runStateAtLeast(c, targetState) ||
  6. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  7. break;
  8. }
  9. }

这里有两个判断条件,只要一个成功就会break循环

  • runStateAtLeast:如果当前状态和要设置的状态相等,或者比要设置的状态大。说明线程池状态的不可逆,说明,如果一个线程池已经是SHUTDOWN了,是不能设置回RUNNING状态的
  • compareAndSet:CAS设置ctl值。根据短路原理,到了这个方法执行已经说明当前状态是小于要设置状态了,所以可以修改ctl的状态位值。如果设置失败,返回false,继续死循环。成功,break

3、线程池的构造函数

常用的JDK推荐的,或者各大“api使用”书籍介绍的,无非都是下面的几个方法,进行创建线程池:

  • Executors.newCachedThreadPool
  • Executors.newFixedThreadPool
  • Executors.newScheduledThreadPool
  • Executors.newSingleThreadExecutor

可是当我们深入源码,才发现:这几个方法的内部无非都调用了ThreadPoolExecutor的构造函数,即使是newScheduledThreadPool这个方法,表面调用了ScheduledThreadPoolExecutor类,可是深入源码才发现:ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor,并且构造函数使用了super进行了构建。这就给我们了一个很好的切入口:只要研究ThreadPoolExecutor构造函数就行。进一步,还会发现,ThreadPoolExecutor有四个构造函数,入参不一样,也都不约而同,最终调用了入参最多的那个(入参少的时候使用默认值),我们看看ThreadPoolExecutor中入参最多的构造函数的源码:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. // 对入参进行合法性校验
  9. if (corePoolSize < 0 ||
  10. maximumPoolSize <= 0 ||
  11. maximumPoolSize < corePoolSize ||
  12. keepAliveTime < 0)
  13. throw new IllegalArgumentException();
  14. if (workQueue == null || threadFactory == null || handler == null)
  15. throw new NullPointerException();
  16. // 获取系统安全管理器(不做分析)
  17. this.acc = System.getSecurityManager() == null ?
  18. null : AccessController.getContext();
  19. // 核心几大参数的赋值操作
  20. this.corePoolSize = corePoolSize;
  21. this.maximumPoolSize = maximumPoolSize;
  22. this.workQueue = workQueue;
  23. this.keepAliveTime = unit.toNanos(keepAliveTime);
  24. this.threadFactory = threadFactory;
  25. this.handler = handler;
  26. }
  • corePoolSize:核心运行线程数
  • maximumPoolSize:最大运行运行程
  • workQueue:阻塞队列
  • keepAliveTime:当线程大于核心线程数时,且阻塞队列没有元素,最大等待时间
  • threadFactory:生成线程的工厂类
  • handler:超出线程池最大承受能力之后的失败策略方法对象

对于线程池表现出来的各种特性,就是通过这几个参数控制的,所以很关键!

二、线程池的基本执行图解

对于线程池源码,我们先主要从execute执行方法入手进行分析,下面主要用一个图进行大致流程的展示:

ThreadPoolExecutor基本运行模型

配合上代码,我们先指出对应代码的大致位置,我们有个大体的概念比较好:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. // 下面大约就是①的过程
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. // 下面大约就是②的过程
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. if (! isRunning(recheck) && remove(command))
  15. reject(command);
  16. else if (workerCountOf(recheck) == 0)
  17. addWorker(null, false);
  18. }
  19. // 下面大约就是③的过程
  20. else if (!addWorker(command, false))
  21. // 下面大约就是④的过程
  22. reject(command);
  23. }

三、线程池细节源码分析

1、addWorker方法

a、addWorker,我们先来看看
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. // 死循环,在某些条件下,会返回挥着break
  3. retry:
  4. for (;;) {
  5. int c = ctl.get();
  6. // 当下线程池运行状态
  7. int rs = runStateOf(c);
  8. // 下面是对线程池状态的一些列判断
  9. // 这个判断稍微有点绕,返回false的条件是:
  10. // 线程池是SHUTDOWN、STOP、TIDYING、TERMINATED其中的任意一个状态
  11. // 且(线程池状态为STOP、TIDYING、TERMINATED 或者 firstTask不为空 或者 阻塞队列为空)
  12. // 同样是返回false,添加失败
  13. if (rs >= SHUTDOWN &&
  14. ! (rs == SHUTDOWN &&
  15. firstTask == null &&
  16. ! workQueue.isEmpty()))
  17. return false;
  18. for (;;) {
  19. // 下面是对线程池当下线程数的一系列判断
  20. int wc = workerCountOf(c);
  21. // 线程数如果大于等于最大线程池允许数量((2^29)-1)或者大于等于设置的
  22. // 核心线程数或者最大线程数
  23. // 同样是返回false,添加失败
  24. if (wc >= CAPACITY ||
  25. // 这里也是一个玄妙之处:
  26. // 如果传入的core为true情况,可见线程数量依赖值为核心线程数
  27. // 如果为false,数量依赖于最大的线程数。通过这个core值,就可以
  28. // 控制什么时候,依赖什么值进行创建线程
  29. wc >= (core ? corePoolSize : maximumPoolSize))
  30. return false;
  31. // 下面是CAS的经典操作:
  32. // 这个第一个if如果设置成功,就结束整体的外部循环。没成功说明有竟态
  33. if (compareAndIncrementWorkerCount(c))
  34. break retry;
  35. // 再次获取一遍ctl,算是double check
  36. c = ctl.get();
  37. // 这里判断,如果为true,说明线程池当下状态已经被修改
  38. // 要重新通过外层循环的状态判断来确定返回值,所以continue了
  39. if (runStateOf(c) != rs)
  40. continue retry;
  41. // 到了这里,说明线程池状态没有被翻转,那就是说当前线程数因为竟态
  42. // 原因没有设置成功,那直接内部循环在执行一次,继续进行CAS的设置
  43. }
  44. }
  45. // 下面是启动线程的主要代码
  46. // 线程是否启动成功
  47. boolean workerStarted = false;
  48. // 线程是否添加成功
  49. boolean workerAdded = false;
  50. // 封装传入的线程对象Worker,这个也是很关键的对象,接下来会分析
  51. Worker w = null;
  52. try {
  53. // 封装线程的初始化工作,下面会分析
  54. w = new Worker(firstTask);
  55. final Thread t = w.thread;
  56. if (t != null) {
  57. // 当下线程池的主锁,最大的一把锁,上锁期间主要对线程池容器进行维护
  58. // 这个容器是一个HashSet,保存当前运行的封装线程Worker
  59. final ReentrantLock mainLock = this.mainLock;
  60. mainLock.lock();
  61. try {
  62. // 再次获取线程池当前状态,因为很有可能期间被人更改了
  63. int rs = runStateOf(ctl.get());
  64. // rs < SHUTDOWN:线程池是RUNNING状态
  65. // rs == SHUTDOWN && firstTask == null:
  66. // 线程池是SHUNTDOWN且firstTask为空,这种情况主要是因为
  67. // 线程池再SHUNDOWN状态了,可是阻塞队列还有没运行完的线程
  68. if (rs < SHUTDOWN ||
  69. (rs == SHUTDOWN && firstTask == null)) {
  70. if (t.isAlive()) // precheck that t is startable
  71. throw new IllegalThreadStateException();
  72. workers.add(w);
  73. int s = workers.size();
  74. if (s > largestPoolSize)
  75. // 保持一个线程最大线程池状态
  76. largestPoolSize = s;
  77. // 到这里线程添加到容器成功
  78. workerAdded = true;
  79. }
  80. } finally {
  81. mainLock.unlock();
  82. }
  83. // 如果添加容器成功,就启动封装的线程,且设置启动标识位为true
  84. if (workerAdded) {
  85. t.start();
  86. workerStarted = true;
  87. }
  88. }
  89. } finally {
  90. // 如果封装线程启动失败,会进行一系列的失败处理
  91. if (! workerStarted)
  92. addWorkerFailed(w);
  93. }
  94. return workerStarted;
  95. }
b、下面是对addWorkerFailed方法的解说
  1. private void addWorkerFailed(Worker w) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. // 同样的,获取主锁
  4. mainLock.lock();
  5. try {
  6. // 不为空的情况将封装线程从容器中移除
  7. // 为空的情况,主要是new Worker的时候报错
  8. if (w != null)
  9. workers.remove(w);
  10. // 循环登陆,减少一个线程数
  11. decrementWorkerCount();
  12. // 试着看看,能不能结束线程池,就是把线程池TERMINASTE了
  13. tryTerminate();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. }
c、下面是tryTerminate方法的解说
  1. final void tryTerminate() {
  2. // 发现没,又是个死循环,老爷子很喜欢这种方式啊,而且是用for!
  3. for (;;) {
  4. int c = ctl.get();
  5. // 三种情况直接方法返回:
  6. // 1、正处在RUNNING状态的线程池
  7. // 2、线程池的状态是TIDYING或者是TERMINATE
  8. // 3、线程池是SHUNDOWN状态的,但是阻塞队列不为空
  9. if (isRunning(c) ||
  10. runStateAtLeast(c, TIDYING) ||
  11. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  12. return;
  13. // 走到这里,线程池的状态可能是:SHUTDOWN(且阻塞队列空)、STOP
  14. // 如果此时线程数不为0的话,要进行打断操作了
  15. if (workerCountOf(c) != 0) {
  16. // 这里入参的意思是只打断容器里第一个封装线程里面的线程
  17. interruptIdleWorkers(ONLY_ONE);
  18. return;
  19. }
  20. // 执行到这里,说明线程池的状态是:SHUTDOWN(阻塞队列为空)、STOP
  21. // 此时线程数为0,说明线程池可以进行终结操作了
  22. final ReentrantLock mainLock = this.mainLock;
  23. mainLock.lock();
  24. try {
  25. // CAS先将线程池设置成TIDYING的状态
  26. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  27. try {
  28. // 执行用户实现的terminated方法
  29. terminated();
  30. } finally {
  31. // 无论怎么样都会将线程池设置成TERMINATED状态
  32. ctl.set(ctlOf(TERMINATED, 0));
  33. termination.signalAll();
  34. }
  35. // 到这里说明终结成功,不过根据Java原理,返回前
  36. // 先执行finally里面的解主锁的方法
  37. return;
  38. }
  39. } finally {
  40. mainLock.unlock();
  41. }
  42. // 如果能执行到这里,说明CAS设置TIDYING状态失败
  43. // 说明是竟态状态
  44. }
  45. }
  46. private void interruptIdleWorkers(boolean onlyOne) {
  47. final ReentrantLock mainLock = this.mainLock;
  48. mainLock.lock();
  49. try {
  50. for (Worker w : workers) {
  51. Thread t = w.thread;
  52. //线程没有被打断且获取到封装线程的锁
  53. if (!t.isInterrupted() && w.tryLock()) {
  54. try {
  55. t.interrupt();
  56. } catch (SecurityException ignore) {
  57. } finally {
  58. w.unlock();
  59. }
  60. }
  61. if (onlyOne)
  62. break;
  63. }
  64. } finally {
  65. mainLock.unlock();
  66. }
  67. }
  68. // 用户自己实现的结束方法
  69. protected void terminated() { }

到这里,已经讲完了一个很主要的内部方法:addWorker。下面我们对封装线程对象Worker进行讲解

2、Worker对象

这个东西,是一个很很很很很很很很经典的Java并发模型:AQS。这片文章不做AQS的讲解,放到后续

a、具体的Worker对象张什么样
  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. private static final long serialVersionUID = 6138294804551838833L;
  6. // 这个就是最终启动的线程,看到了吧
  7. final Thread thread;
  8. // 我们传入的Runnable对象被放到了这里
  9. Runnable firstTask;
  10. // 这里记录完成的任务数。
  11. // 这里说明下一个理念:一个Worker,是最终被运行的Runnanle对象
  12. // 在很大的情况下(下面做分析)Worker这个线程会一直存在
  13. // 存在的意义是不断读取阻塞队列里面存储的我们传进来的Runnable对象
  14. // 然后运行。所以我们实现的Runnable对象的run方法,最终不是被
  15. // start方法调用执行的,而是通过直接调用执行的!
  16. volatile long completedTasks;
  17. Worker(Runnable firstTask) {
  18. setState(-1); // AQS对象状态!也是一大难的东西!
  19. this.firstTask = firstTask;
  20. // 这里的getThreadFactory方法使用的就是我们传入的threadFactory
  21. // 对象
  22. this.thread = getThreadFactory().newThread(this);
  23. }
  24. public void run() {
  25. // 看到了吧,这里执行了外层对象的方法,去直接调用传入的
  26. // Runnable中的run方法,等下解说
  27. runWorker(this);
  28. }
  29. // 下面的几个函数都是AQS必须要实现的方法,这里不累赘
  30. protected boolean isHeldExclusively() {
  31. return getState() != 0;
  32. }
  33. protected boolean tryAcquire(int unused) {
  34. if (compareAndSetState(0, 1)) {
  35. setExclusiveOwnerThread(Thread.currentThread());
  36. return true;
  37. }
  38. return false;
  39. }
  40. protected boolean tryRelease(int unused) {
  41. setExclusiveOwnerThread(null);
  42. setState(0);
  43. return true;
  44. }
  45. public void lock() { acquire(1); }
  46. public boolean tryLock() { return tryAcquire(1); }
  47. public void unlock() { release(1); }
  48. public boolean isLocked() { return isHeldExclusively(); }
  49. void interruptIfStarted() {
  50. Thread t;
  51. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  52. try {
  53. t.interrupt();
  54. } catch (SecurityException ignore) {
  55. }
  56. }
  57. }
  58. }
  59. public ThreadFactory getThreadFactory() {
  60. return threadFactory;
  61. }
b、默认的线程工厂DefaultThreadFactory:
  1. static class DefaultThreadFactory implements ThreadFactory {
  2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3. private final ThreadGroup group;
  4. private final AtomicInteger threadNumber = new AtomicInteger(1);
  5. private final String namePrefix;
  6. DefaultThreadFactory() {
  7. SecurityManager s = System.getSecurityManager();
  8. group = (s != null) ? s.getThreadGroup() :
  9. Thread.currentThread().getThreadGroup();
  10. // 这里记录了线程名的前缀,可见会将线程池序号进行递增操作
  11. namePrefix = "pool-" +
  12. poolNumber.getAndIncrement() +
  13. "-thread-";
  14. }
  15. public Thread newThread(Runnable r) {
  16. // 这里就是生成喜闻乐见的Thread对象了,结合上面这里的r就是我们的Worker对象
  17. Thread t = new Thread(group, r,
  18. namePrefix + threadNumber.getAndIncrement(),
  19. 0);
  20. if (t.isDaemon())
  21. t.setDaemon(false);
  22. if (t.getPriority() != Thread.NORM_PRIORITY)
  23. t.setPriority(Thread.NORM_PRIORITY);
  24. return t;
  25. }
  26. }

3、runWorker方法

a、接下来又是一个关键性方法runWorker
  1. final void runWorker(Worker w) {
  2. // 获取当前运行着的Worker线程
  3. Thread wt = Thread.currentThread();
  4. // 这个就是我们当下传入的Runnable
  5. Runnable task = w.firstTask;
  6. // 置空的意思是:Worker其实是一个壳子,以后会一直运行着,不断执行其他阻塞队列
  7. // 里面的Runnable对象的run方法
  8. w.firstTask = null;
  9. // 这里做解锁操作,是表示下面所有操作是可以被打断的
  10. // 另外AQS默认情况下不做unlock操作,lock会阻塞
  11. w.unlock();
  12. // 这个标志位表示线程执行过程中有没有被打断,或者运行异常
  13. boolean completedAbruptly = true;
  14. try {
  15. // 这个While循环里面的语句相当关键,包含了线程池执行流程的枢纽!
  16. // 我先大致说一下,下面会详细分析getTask方法:
  17. // 主要就是判断如果当前Worker里面的Runnable对象不为空
  18. // 就会执行这个对象的run方法;执行完之后,还会回到这个循环
  19. // 再下面的finally块里面将task置空了,所以就去调用getTask方法
  20. // 而getTask方法是一个很大可能阻塞的方法,阻塞的原因就是等待
  21. // 阻塞队列里面放入对象!所以也就形成了,一个Worker对象,循环
  22. // 不停的执行传入的Runnable对象run方法。这也就构成了corePoolSize
  23. // 与maxPoolSize两个参数控制系统级别的线程多少的目的!也就是说,
  24. // 这就是线程池里面,“池”这个概念的由来~
  25. while (task != null || (task = getTask()) != null) {
  26. w.lock();
  27. // 这里主要是判断是否要打断当前Worker所在的线程
  28. // 要满足两个个条件:
  29. // 1、当前线程池是STOP、TIDYING、TERMINATED
  30. // 2、当前线程是没有被打断的情况
  31. // 其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
  32. // 主要用于清除线程终端标志,因为很大可能线程池刚刚转换成STOP
  33. if ((runStateAtLeast(ctl.get(), STOP) ||
  34. (Thread.interrupted() &&
  35. runStateAtLeast(ctl.get(), STOP))) &&
  36. !wt.isInterrupted())
  37. wt.interrupt();
  38. try {
  39. // 在执行线程体之前执行的方法,用户实现
  40. beforeExecute(wt, task);
  41. Throwable thrown = null;
  42. try {
  43. task.run();
  44. } catch (RuntimeException x) {
  45. // 请看下面几个异常,都是直接抛了出去
  46. // 而并没有处理,所以处理内部异常也是
  47. // 线程池的一个关键点
  48. thrown = x; throw x;
  49. } catch (Error x) {
  50. thrown = x; throw x;
  51. } catch (Throwable x) {
  52. thrown = x; throw new Error(x);
  53. } finally {
  54. // 在执行完线程体之后的方法,用户实现
  55. // 异常同时也传入这里了,所以可以自己实现一个子类
  56. // 自己实现这个方法,进行异常处理
  57. afterExecute(task, thrown);
  58. }
  59. } finally {
  60. // 这个地方肯定会被执行,所以无论run方法怎么样
  61. // Worker运行完成线程数都会加一
  62. task = null;
  63. w.completedTasks++;
  64. // 这里进行解锁操作
  65. w.unlock();
  66. }
  67. }
  68. // 注意代码执行到了这里说明while循环跳出来了
  69. // 大致有如下几种情况:
  70. // 1、阻塞队列里面没值了
  71. // 2、线程池状态翻转,便成了大于等于SHUTDOWN状态的了
  72. // 由于是正常结束,所以异常结束标志是false
  73. completedAbruptly = false;
  74. } finally {
  75. // 这里肯定会被执行,但是有两种情况跳入这个代码块
  76. // 1、run方法没有抛异常,completedAbruptly为false
  77. // 2、run方法抛异常,completedAbruptly为true
  78. // 下面也会进行解说
  79. processWorkerExit(w, completedAbruptly);
  80. }
  81. }
b、我们来看核心的getTask方法
  1. private Runnable getTask() {
  2. // 这个标志位主要用于后面的poll方法是否超时
  3. boolean timedOut = false;
  4. // 又来了,李老爷子!是一个死循环判断!
  5. for (;;) {
  6. int c = ctl.get();
  7. // 获取当前线程池运行状态
  8. int rs = runStateOf(c);
  9. // 如果同时符合下面两种情况,直接返回null,并减少线程数量
  10. // 1、线程池状态是:SHUTDOWN、STOP、TIDYING、TERMINTED
  11. // 2、线程池的状态是STOP、TIDYING、TERMINTED或者队列为空
  12. // 这预示着线程池要进行关闭操作了,此Worker要结束声明周期!
  13. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  14. // 这里是循环指导CAS成功设置
  15. decrementWorkerCount();
  16. return null;
  17. }
  18. // 这里获取当前线程池的线程数
  19. int wc = workerCountOf(c);
  20. // 这个标识位要解释解释:
  21. // 1、allowCoreThreadTimeOut成员变量,可设置
  22. // 2、wc > corePoolSize线程数是否大于核心线程数
  23. // 简单说就是:这个标志位控制线程池的收缩!
  24. // 很关键是不是!
  25. // 正常情况下只要超出核心线程数的线程才要进行收缩的
  26. // 收缩的条件是根据传入的阻塞队列超时时间
  27. // 但是我们可以通过设置allowCoreThreadTimeOut为true
  28. // 这样核心线程也可以收缩!
  29. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  30. // 这里maximumPoolSize不能为零和负数
  31. // 这里判断很复杂,简单理解就是:
  32. // 如果线程池线程数超出了设置的最大线程数或者阻塞队列被打断了
  33. // 且当前Worker所在线程不是最后一个线程或者阻塞队列空了。
  34. // 这里如果wc>maximumPoolSize,那一定大于1,那就说明
  35. // 一定会执行if方法体;如果小于等于maximumPoolSize情况,
  36. // 那就说明是线程合理的收缩,这种时候,只有allowCoreThreadTimeOut
  37. // 被置位或者线程数大于核心线程数,当然如果要是只有一个线程数且队列不为空
  38. // 的情况也不能收缩,要保证有封装线程能执行阻塞队列里面线程
  39. if ((wc > maximumPoolSize || (timed && timedOut))
  40. && (wc > 1 || workQueue.isEmpty())) {
  41. if (compareAndDecrementWorkerCount(c))
  42. // 这里如果设置成功说明可以终结这个Worker了
  43. return null;
  44. // 这里是continue,因为有竟态
  45. continue;
  46. }
  47. try {
  48. // 注意这里的timed的取值,timed为true的时候是:
  49. // 1、allowCoreThreadTimeOut被置位
  50. // 2、或者线程数大于核心线程数
  51. // 其他情况是直接take方法,直接阻塞的。除非被打断
  52. Runnable r = timed ?
  53. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  54. workQueue.take();
  55. if (r != null)
  56. // 正常情况是拿到了Runnable,直接返回了
  57. return r;
  58. // 这种是阻塞队列超时了
  59. timedOut = true;
  60. } catch (InterruptedException retry) {
  61. // 打断情况并非阻塞队列超时,所以这里设置成false
  62. timedOut = false;
  63. }
  64. }
  65. }
c、下面是对processWorkerExit分析
  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. // 这个判断说明当前Worker所在的线程执行Runnable中的run方法抛了异常
  3. // 所以这个时候,要将线程数减一
  4. if (completedAbruptly)
  5. decrementWorkerCount();
  6. // 获取主锁
  7. final ReentrantLock mainLock = this.mainLock;
  8. mainLock.lock();
  9. try {
  10. // 将当前Worker存在期间一共执行了多少个Runnable累加到
  11. // 线程池的统计字段上面
  12. completedTaskCount += w.completedTasks;
  13. // 将封装线程从容器中移除
  14. workers.remove(w);
  15. } finally {
  16. mainLock.unlock();
  17. }
  18. // 上面的方法在这里执行了,分析请看上面
  19. tryTerminate();
  20. int c = ctl.get();
  21. // 如果现在线程池的状态是:RUNNING、SHUTDOWN,执行if代码块
  22. if (runStateLessThan(c, STOP)) {
  23. // 如果没有抛异常情况,执行这个if代码块
  24. if (!completedAbruptly) {
  25. // 这个代码块,主要是要保证如果阻塞队列中还有Runnable线程
  26. // 而又走到了即将结束当前WOrker的代码,线程池要保证,至少还有
  27. // 运行着的Worker对阻塞队列中的线程进行处理,执行
  28. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  29. // 为0的情况表示允许核心线程收缩,或者核心线程直接设成了0
  30. // 阻塞队列不为空要保证最小可用的Worker为1
  31. if (min == 0 && ! workQueue.isEmpty())
  32. min = 1;
  33. // 判断当前线程数是不是比最小的还要小
  34. if (workerCountOf(c) >= min)
  35. // 这里表明,有足够的Worker去执行
  36. return;
  37. // 代码运行到这里,表明没有足够的Worker了,下面去创建
  38. }
  39. // 这里添加一个Worker的原因是:
  40. // RUNNING和SHUTDOWN状态都是允许继续执行阻塞队列中的线程的
  41. // 所以这里创建一个firstTask为null,依赖getTast去获取队列中的
  42. // 线程去执行。false的原因是创建依据maximumPoolSize
  43. addWorker(null, false);
  44. }
  45. }

四、结尾

到此为止,线程池的主要源码,都分析了,剩下,还有几个附加功能源码,留着接下来有精力再一点点回补吧。当然,对于下一步的深入,就要到AQS的分析了。可见,这里的Worker本身就是一个AQS,在Worker上面调用lock或是unlock方法,都是进入一个内部的阻塞队列的管理的。其中最最底层,还会涉及到操作系统中线程的同步原语:mutex!接下来,我会分析那个,敬请期待!

转载于:https://my.oschina.net/UBW/blog/2055052

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/812569
推荐阅读
相关标签
  

闽ICP备14008679号