当前位置:   article > 正文

ThreadPoolExecutor源代码解析_threadpoolexecutor 源码

threadpoolexecutor 源码

目录

一、线性池

二、字段信息

 三、构造函数

四、Execute()方法

五、addWorker()方法


一、线性池

        线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

        线程池的工作原理大致分为4步,如下图所示:

        

  • 当有任务要执行的时候,会计算线程池中存在的线程数量与核心线程数量(corePoolSize)进行比较,如果小于,则在线程池中创建线程,否则,进行下一步判断。
  • 如果不满足上面的条件,则会将任务添加到阻塞队列,等待线程池中的线程空闲下来后,获取队列中的任务进行执行。
  • 如果队列中也塞满了任务,那么会计算线程池中存在的线程数量与最大线程数量(maxnumPoolSize)进行比较,如果小于,则在线程池中创建线程。
  • 如果上面都不满足,则会执行对应的拒绝策略。

二、字段信息

  1. //标记线程池状态,同时记录线程个数,默认running,然后线程个数0,
  2. // 高三位用来记录线程池状态,低29位存放线程个数
  3. //直接使用位运算来进行计算,32位的情况下能保证操作的原子性
  4. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  5. //个数掩码
  6. private static final int COUNT_BITS = Integer.SIZE - 3;//32 - 3 =29位
  7. /**
  8. * 计算CAPACITY = (1 << COUNT_BITS) - 1
  9. * 第一步:(1 << COUNT_BITS) = 1 << 29
  10. * 00100000000000000000000000000000 (1后面有29个0)
  11. * 第二步:00100000000000000000000000000000 - 1
  12. * 00011111111111111111111111111111 (29个1)
  13. * 结果:
  14. * int CAPACITY = 00011111111111111111111111111111 = 536870911 (十进制)
  15. */
  16. //线程最大个数,低29位都是1
  17. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  18. // runState存储在高阶位中
  19. //接受新任务,并且处理任务队列中的数据 对应 1110 0000 0000 0000 0000 0000 0000 0000
  20. private static final int RUNNING = -1 << COUNT_BITS;
  21. //拒绝新任务但是能能处理任务队列中的数据 对应 0000 0000 0000 0000 0000 0000 0000 0000
  22. private static final int SHUTDOWN = 0 << COUNT_BITS;
  23. //拒绝任务,抛弃阻塞队列任务,中断正在处理的任务 0010 0000 0000 0000 0000 0000 0000 0000
  24. private static final int STOP = 1 << COUNT_BITS;
  25. //将要调用terminated方法,线程池和任务队列都为空 0100 0000 0000 0000 0000 0000 0000 0000
  26. private static final int TIDYING = 2 << COUNT_BITS;
  27. //terminated方法完后进入该状态 0110 0000 0000 0000 0000 0000 0000 0000
  28. private static final int TERMINATED = 3 << COUNT_BITS;
  29. //重要参数,工作队列,用于保存等待执行的阻塞队列,
  30. private final BlockingQueue<Runnable> workQueue;
  31. //主锁
  32. private final ReentrantLock mainLock = new ReentrantLock();
  33. /**
  34. * 包含池中所有工作线程的集合。仅当持有mainLock时访问。
  35. */
  36. //线程池里面的线程
  37. private final HashSet<Worker> workers = new HashSet<Worker>();
  38. /**
  39. * 等待条件以支持awaitterminate
  40. */
  41. private final Condition termination = mainLock.newCondition();
  42. /**
  43. * 跟踪所获得的最大池大小。只能在主锁下访问。
  44. */
  45. private int largestPoolSize;
  46. /**
  47. * 已完成任务的计数器。仅在工作线程终止时更新。只能在主锁下访问。
  48. */
  49. private long completedTaskCount;
  50. //创建线程的工厂
  51. private volatile ThreadFactory threadFactory;
  52. //拒绝策略
  53. private volatile RejectedExecutionHandler handler;
  54. //最大生存时间
  55. private volatile long keepAliveTime;
  56. /**
  57. * 如果为false(默认值),核心线程即使在空闲时也会保持活动状态。
  58. * 如果为true,核心线程使用keepAliveTime超时等待工作。
  59. */
  60. private volatile boolean allowCoreThreadTimeOut;
  61. //核心线程数量
  62. private volatile int corePoolSize;
  63. //最大线程数量
  64. private volatile int maximumPoolSize;
  65. //默认拒绝策略,可以看到默认的是丢弃任务并且抛弃异常
  66. private static final RejectedExecutionHandler defaultHandler =
  67. new AbortPolicy();

 三、构造函数

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

corePoolSize:核心线程池的大小

maximumPoolSize:最大线程池的大小

keepAliveTime:最大线程空闲时间

unit:时间单位

workQueue:阻塞队列,当核心线程数无空闲时,将新任务放入阻塞队列

Handle:拒绝策略,当最大线程数无空闲时,执行拒绝策略

四、Execute()方法

  1. public void execute(Runnable command) {
  2. //不能提交任务
  3. if (command == null)
  4. throw new NullPointerException();
  5. //获取到我们的线程ctl,包含线程状态以及线程的数量
  6. int c = ctl.get();
  7. //判断线程池个数是否小于corePoolSize,小于就会新建一个核心线程
  8. //workerCountOf:运行的线程个数
  9. if (workerCountOf(c) < corePoolSize) {
  10. //添加一个Worker
  11. if (addWorker(command, true))
  12. //直接返回了
  13. return;
  14. //添加失败了,更新我们的ctl
  15. c = ctl.get();
  16. }
  17. //如果线程池处于running并且往工作队列里面添加成功
  18. if (isRunning(c) && workQueue.offer(command)) {
  19. //重新检查
  20. /**
  21. * 为啥要进行双重检查?
  22. * 因为在多线程下,ctl方法不是线程安全的,可能会出现获取了以后就改变了
  23. * 所以需要判断加完以后的状态,是不是在加的过程中发生了改变
  24. */
  25. int recheck = ctl.get();
  26. //如果不是running状态并且删除成功,开始使用拒绝策略来执行,进行回滚
  27. if (! isRunning(recheck) && remove(command))
  28. reject(command);
  29. //说明是running状态,同时线程池个数是空的
  30. else if (workerCountOf(recheck) == 0)
  31. //如果阻塞队列里面有任务,但是线程池没有线程去执行
  32. // 这就是建立个新线程去执行队列里面的任务
  33. addWorker(null, false);
  34. }
  35. //如果队列是满的,尝试添加一个新的线程进去
  36. else if (!addWorker(command, false))
  37. //失败的话就会进行拒绝策略
  38. reject(command);
  39. }

        ctl讲解(是如何算的):以下的代码和注解就是ctl所包含的方法

  1. //标记线程池状态,同时记录线程个数,默认running,然后线程个数0,
  2. // 高三位用来记录线程池状态,低29位存放线程个数
  3. //直接使用位运算来进行计算,32位的情况下能保证操作的原子性
  4. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  5. //个数掩码
  6. private static final int COUNT_BITS = Integer.SIZE - 3;//32 - 3 =29位
  7. /**
  8. * 计算CAPACITY = (1 << COUNT_BITS) - 1
  9. * 第一步:(1 << COUNT_BITS) = 1 << 29
  10. * 00100000000000000000000000000000 (1后面有29个0)
  11. * 第二步:00100000000000000000000000000000 - 1
  12. * 00011111111111111111111111111111 (29个1)
  13. * 结果:
  14. * int CAPACITY = 00011111111111111111111111111111 = 536870911 (十进制)
  15. */
  16. //线程最大个数,低29位都是1
  17. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  18. // runState存储在高阶位中
  19. //接受新任务,并且处理任务队列中的数据 对应 1110 0000 0000 0000 0000 0000 0000 0000
  20. private static final int RUNNING = -1 << COUNT_BITS;
  21. //拒绝新任务但是能能处理任务队列中的数据 对应 0000 0000 0000 0000 0000 0000 0000 0000
  22. private static final int SHUTDOWN = 0 << COUNT_BITS;
  23. //拒绝任务,抛弃阻塞队列任务,中断正在处理的任务 0010 0000 0000 0000 0000 0000 0000 0000
  24. private static final int STOP = 1 << COUNT_BITS;
  25. //将要调用terminated方法,线程池和任务队列都为空 0100 0000 0000 0000 0000 0000 0000 0000
  26. private static final int TIDYING = 2 << COUNT_BITS;
  27. //terminated方法完后进入该状态 0110 0000 0000 0000 0000 0000 0000 0000
  28. private static final int TERMINATED = 3 << COUNT_BITS;
  29. // 获取运行状态 RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED
  30. private static int runStateOf(int c){
  31. return c & ~CAPACITY; //~CAPACITY = 11100000000000000000000000000000
  32. }
  33. //取出低位29位的值,表示获得当前活动的线程数
  34. private static int workerCountOf(int c){
  35. return c & CAPACITY; //CAPACITY = 00011111111111111111111111111111
  36. }
  37. //计算ctl值 ctl = [3位]线程池状态 + [29位]线程池中线程数量
  38. private static int ctlOf(int rs, int wc){
  39. return rs | wc;//按位取或,即:同为0时为0,否则为1.
  40. }

        可以将execute()方法总结为以下流程:

  1. 首先检测线程池运行状态,如果不是running,则直接拒绝,线程池要保证在running的状态下执行任务。
  2. 如果workerCount<corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount>=corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumP oolSize,且线程池内的阻塞队列已经满了,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount>=maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

五、addWorker()方法

        addWorker()方法主要分为两部分,第一部分:判断+CAS设置线程数+1;第二部分:workerCount成功+1后,创建Worker(也是一个Runnable),加入集合workers中,并启动Worker线程。

        首先看第一部分的源码和注释:

  1. retry:
  2. /** 步骤一:判断+CAS设置线程数+1 */
  3. for (;;) {
  4. int c = ctl.get();
  5. //获取运行状态runStateOf
  6. int rs = runStateOf(c);
  7. /**
  8. * 只有如下两种情况可以新增worker,继续执行下去:
  9. * case one: rs == RUNNING
  10. * case two: rs == SHUTDOWN && fristTask == null && !WorkQueue.isEmpty()
  11. * */
  12. if (rs >= SHUTDOWN && //即: 非RUNNING状态。线程池异常,表示不再去接收新的线程任务,返回false
  13. /**
  14. * 当线程是SHUTDOWN状态时,表示不再接收新的任务了,所以:
  15. * case1:如果firstTask != null,表示要添加新任务,则新增worker失败,返回false
  16. * case2:如果firstTask == null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了
  17. * 则:新增worker失败返回false
  18. */
  19. ! (rs == SHUTDOWN &&
  20. firstTask == null &&
  21. ! workQueue.isEmpty()))
  22. return false;
  23. for (;;) {
  24. //获取当前线程池里的线程数
  25. int wc = workerCountOf(c);
  26. /**
  27. * 满足如下任意情况,则新增worker失败,返回false
  28. * case1:大于等于最大线程容量
  29. * case2:当core为true时:>=核心线程数
  30. * 当core是false时:>=最大线程数
  31. * */
  32. if (wc >= CAPACITY ||
  33. wc >= (core ? corePoolSize : maximumPoolSize))
  34. return false;
  35. //当前工作线程数加1
  36. if (compareAndIncrementWorkerCount(c))
  37. break retry;//成功加1,则跳出retry标识的这两层for循环
  38. //如果线程数加1失败,则获取当前最新的线程池运行状态
  39. c = ctl.get(); // Re-read ctl
  40. //判断线程池运行状态(rs)是否改变;如果不同,则说明方法处理期间线程池运行状态发生了变化
  41. //重新获取最新runState
  42. if (runStateOf(c) != rs)
  43. continue retry;//跳出内层循环,继续从第一个for循环执行
  44. // else CAS failed due to workerCount change; retry inner loop
  45. }
  46. }

        第二部分源码和注释:

  1. /**
  2. * 步骤二:workerCount成功+1后,创建Workers后,并且启动Worker线程
  3. * */
  4. boolean workerStarted = false;//用于判断新的worker实列是否已经开始执行Thread.start()
  5. boolean workerAdded = false;//用于判断新的worker实列是否已经被添加到线程池的workers队列中
  6. Worker w = null;
  7. try {
  8. w = new Worker(firstTask);//创建Worker实列,每个Worker对象都会针对入参firstTask来创建一个线程
  9. final Thread t = w.thread;//从Worker中获得新建得线程
  10. if (t != null) {
  11. final ReentrantLock mainLock = this.mainLock;//加入锁
  12. mainLock.lock();//lock():尝试加锁操作,获得锁后继续执行,没获得则等待直到获得锁为止
  13. try {
  14. int rs = runStateOf(ctl.get());//获得线程池当前得运行状态runStatus
  15. /**
  16. * 满足如下任意条件,即可向线程池中添加线程:
  17. * case1:线程池状态为RUNNING。
  18. * case2:线程池状态为SHUTDOWN并且firstTask为null.
  19. * */
  20. if (rs < SHUTDOWN ||
  21. (rs == SHUTDOWN && firstTask == null)) {
  22. if (t.isAlive()) // 因为t是新构建得线程,还没有启动。所以,如果是alive状态,说明已经被启动了,则抛出异常
  23. throw new IllegalThreadStateException();
  24. workers.add(w);//workers中保存线程池中存在得所有work实列集合
  25. int s = workers.size();
  26. if (s > largestPoolSize)//largestPoolSize用于记录线程池中曾经存在得最大得线程数量
  27. largestPoolSize = s;
  28. workerAdded = true;
  29. }
  30. } finally {
  31. mainLock.unlock();//unlock解锁操作
  32. }
  33. if (workerAdded) {
  34. t.start();//开启线程,执行Worker.run()
  35. workerStarted = true;
  36. }
  37. }
  38. } finally {
  39. if (! workerStarted)//如果没有开启线程
  40. addWorkerFailed(w);//往线程池中添加worker失败了
  41. }
  42. return workerStarted;
  43. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/944565
推荐阅读
相关标签
  

闽ICP备14008679号