当前位置:   article > 正文

线程池(二) -- 源码解析ThreadPoolExecutor中的execute()方法_threadpoolexecutor.execute

threadpoolexecutor.execute

ThreadPoolExector实际上就是一个生产消费模式,当调用execute()方法添加任务线程时,相当于生产者生产数据元素,workers线程池中的线程执行任务线程或从阻塞队列中获取任务线程执行时,相当于消费者消费数据元素。

 

在源码分析execute方法之前,需要弄清楚下面几个问题。

1)、线程池中的线程是什么样的线程对象?

2)、线程池中的线程用什么来进行存储?

3)、处于等待状态的任务线程又放在那里?

4)、线程池中的线程是如何获取并执行这些处于等待状态中的任务线程的?

首先区分一下任务线程和线程池中的线程,任务线程就是调用execute(Runnable  command)方法中的command线程。而线程池中的线程是一个个的Worker对象。而这些Worker对象存放在workers属性中。

  1. //workers用来存放线程池中的线程
  2. private final HashSet<Worker> workers = new HashSet<Worker>();

处于等待状态中的任务线程会存放到workQueue中

  1. //存放处于等待状态的任务线程
  2. private final BlockingQueue<Runnable> workQueue;

执行等待中的线程会在Work类中的runWorker()方法进行,会在后面进行讲解

 

execute()方法的源码解析

execute()方法的作用时把任务线程提交到线程池,由线程池来执行该任务线程。

源码如下:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //workerCountof(c)获取运行状态的线程数
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. //isRunning(c)判处线程池处于运行状态。
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. if (! isRunning(recheck) && remove(command))
  15. reject(command);
  16. else if (workerCountOf(recheck) == 0)
  17. addWorker(null, false);
  18. }
  19. else if (!addWorker(command, false))
  20. reject(command);
  21. }

分析execute()方法需要考虑两种情况,一种是corePoolSize大于0的情况,一种是corePoolSize等于0的情况。这里只对ThreadPoolExector如何实现生产消费模式进行分析,而状态只是确保线程池线程的正常运行,这里不进行分析讲解,所以选择忽略来提取骨干代码分析说明。

当corePoolSize大于0,忽略状态分析后的execute()方法源码可简化如下:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //第一种场景
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. //......
  10. }
  11. //第二种场景
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. //........
  14. }
  15. //第三种场景
  16. else if (!addWorker(command, false))
  17. reject(command);
  18. }

可以分为在三种不同场景下使用execute()方法提交任务线程的处理方式。

第一种场景:当线程池中正在运行的线程数量小于corePoolSize(核心线程数量)时,调用execute()方法添加任务线程的场景,处理方式是,调用addWorker()方法,往线程池workers中添加Worker线程,并执行任务线程,下面会对addWoker()方法进行说明。

第二种场景:当线程池中正在运行的线程数量等于corePoolSize(核心线程数量)时,调用execute()方法添加任务线程的场景。处理方式是,直接调用workQueue.offer(command)往阻塞队列中添加任务线程。

第三种场景:当线程池中正在运行的线程数量等于corePoolSize,且workQueue阻塞队列队满的情况下,调用execute()方法添加任务线程的场景。处理方式是,直接调用饱和策略对该任务线程进行处理。

 

addWorker()方法源码分析

boolean addWorker(Runnable firstTask, boolean core):往线程池workers种添加Worker线程,并启动Worker线程

源码如下:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }

忽略状态状态分析,提取骨干代码如下:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. //第一部分
  3. retry:
  4. for (;;) {
  5. //....忽略.....
  6. for (;;) {
  7. //...忽略....
  8. if (compareAndIncrementWorkerCount(c))
  9. break retry;
  10. //....忽略...
  11. }
  12. }
  13. //第二部分
  14. boolean workerStarted = false;
  15. boolean workerAdded = false;
  16. Worker w = null;
  17. try {
  18. //使用传入的任务线程创建Worker线程
  19. w = new Worker(firstTask);
  20. final Thread t = w.thread;
  21. if (t != null) {
  22. final ReentrantLock mainLock = this.mainLock;
  23. mainLock.lock();
  24. try {
  25. int rs = runStateOf(ctl.get());
  26. if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { //这里的直接看成true就可以了
  27. //......
  28. //把线程添加到线程池
  29. workers.add(w);
  30. //....
  31. }
  32. } finally {
  33. mainLock.unlock();
  34. }
  35. if (workerAdded) {
  36. //启动该线程
  37. t.start();
  38. workerStarted = true;
  39. }
  40. }
  41. } finally {
  42. //把worker线程添加到线程池失败的处理场景
  43. if (! workerStarted)
  44. addWorkerFailed(w);
  45. }
  46. return workerStarted;
  47. }

忽略状态后,可以把addworker()分为两部分

第一部分:调用CAS方法给正在运行的线程数量加1,但worker线程还没有启动执行

第二部分:主要可以分为如下三个步骤

1)、使用传入的任务线程创建worker线程

2)、把worker线程添加到workers线程池中

3)、启动该worker线程。

 

工作线程Worker的执行

上面已经的addWorker()方法已经启动Worker线程,那接下分析Worker线程的run方法。源码如下:

  1. public void run() {
  2. runWorker(this);
  3. }

runWorker()方法源码分析

源码如下:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

忽略状态处理和部分异常提取骨干代码如下:

  1. final void runWorker(Worker w) {
  2. //....
  3. Runnable task = w.firstTask;
  4. // ...
  5. try {
  6. while (task != null || (task = getTask()) != null) {
  7. w.lock();
  8. // ....
  9. try {
  10. //..执行任务线程前干一些事情
  11. beforeExecute(wt, task);
  12. Throwable thrown = null;
  13. try {
  14. //任务线程执行
  15. task.run();
  16. }
  17. //...忽略异常..
  18. } finally {
  19. //执行任务线程后干一些事情
  20. afterExecute(task, thrown);
  21. }
  22. } finally {
  23. //任务线程设置为null
  24. task = null;
  25. w.completedTasks++;
  26. w.unlock();
  27. }
  28. }
  29. completedAbruptly = false;
  30. } finally {
  31. processWorkerExit(w, completedAbruptly);
  32. }
  33. }

骨干代码分析:这里主要关注的是while循环中的代码。

while第一次循环的时候,执行的任务线程是在第一种场景也就是线程池中正在运行的线程数量小于corePoolSize时调用execute()方法传入的任务线程,之后的每次循环执行都是调用getTask()方法获取到的阻塞队列workQueue中的任务线程。

每次任务线程结束,task都会设置为null,是为了能够调用getTask()方法获取任务线程。

 

getTask()方法的骨干代码如下:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. //......
  5. // 队列为空时返回null,忽略状态判断
  6. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  7. decrementWorkerCount();
  8. return null;
  9. }
  10. //获取timed的值
  11. int wc = workerCountOf(c);
  12. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  13. //......
  14. try {
  15. //根据timed获取阻塞队列中任务线程
  16. Runnable r = timed ?
  17. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  18. workQueue.take();
  19. if (r != null)
  20. return r;
  21. timedOut = true;
  22. } catch (InterruptedException retry) {
  23. timedOut = false;
  24. }
  25. }
  26. }

忽略状态分析getTask()方法,大致可以分为三个步骤

1、如果阻塞队列workQueue中没有任务线程,返回null

2、获取timed的值时true还是false,allowCoreThreadTimeOut的初始值为false,而当corePoolSize的值大于0时,调用getTast()方法时正在运行的线程只能是等于或者小于corePoolsize的值,所以当corePoolSize的值大于0时,timed的值为false。

3、根据timed获取阻塞队列workQueue中的任务线程。当corePoolSize的值大于0时,timed的值为false,所以使用take()方法获取阻塞队列中的值。

 

这只是个人的理解和分析,如果有不正确的地方麻烦留言指正

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/1008835
推荐阅读
相关标签
  

闽ICP备14008679号