当前位置:   article > 正文

Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理

java线程池threadpool

    相关文章目录:

    Java线程池ThreadPoolExecutor使用和分析(一)

    Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理

    Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

 

    execute()是 java.util.concurrent.Executor接口中唯一的方法,JDK注释中的描述是“在未来的某一时刻执行命令command”,即向线程池中提交任务,在未来某个时刻执行,提交的任务必须实现Runnable接口,该提交方式不能获取返回值。下面是对execute()方法内部原理的分析,分析前先简单介绍线程池有哪些状态,在一系列执行过程中涉及线程池状态相关的判断。以下分析基于JDK 1.7

 

    以下是本文的目录大纲:

    一、线程池执行流程

    二、线程池状态

    三、任务提交内部原理

        1、execute()  --  提交任务

        2、addWorker()  --  添加worker线程

        3、内部类Worker

        4、runWorker()  --  执行任务

        5、getTask()  --  获取任务

        6、processWorkerExit()  --  worker线程退出

 

    若有不正之处请多多谅解,欢迎批评指正、互相讨论。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/trust-freedom/p/6681948.html

一、线程池的执行流程

1、如果线程池中的线程数量少于corePoolSize,就创建新的线程来执行新添加的任务
2、如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到workQueue中
3、如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务
4、如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来执行拒绝策略

二、线程池状态

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  4. // runState is stored in the high-order bits
  5. private static final int RUNNING = -1 << COUNT_BITS;
  6. private static final int SHUTDOWN = 0 << COUNT_BITS;
  7. private static final int STOP = 1 << COUNT_BITS;
  8. private static final int TIDYING = 2 << COUNT_BITS;
  9. private static final int TERMINATED = 3 << COUNT_BITS;
  10. // Packing and unpacking ctl
  11. private static int runStateOf(int c) { return c & ~CAPACITY; }
  12. private static int workerCountOf(int c) { return c & CAPACITY; }
  13. private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl这个AtomicInteger的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量

1、RUNNING:-1<<COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务

2、SHUTDOWN:0<<COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务

3、STOP:1<<COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务

4、TIDYING:2<<COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法

5、TERMINATED:3<<COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态

这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。

 

runStateOf(int c)  方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态

workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量

ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl

三、任务提交内部原理

1、execute()  --  提交任务

  1. /**
  2. * Executes the given task sometime in the future. The task
  3. * may execute in a new thread or in an existing pooled thread.
  4. * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
  5. *
  6. * If the task cannot be submitted for execution, either because this
  7. * executor has been shutdown or because its capacity has been reached,
  8. * the task is handled by the current {@code RejectedExecutionHandler}.
  9. * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
  10. *
  11. * @param command the task to execute
  12. * @throws RejectedExecutionException at discretion of
  13. * {@code RejectedExecutionHandler}, if the task
  14. * cannot be accepted for execution RejectedExecutionException是一个RuntimeException
  15. * @throws NullPointerException if {@code command} is null
  16. */
  17. public void execute(Runnable command) {
  18. if (command == null)
  19. throw new NullPointerException();
  20. /*
  21. * Proceed in 3 steps:
  22. *
  23. * 1. If fewer than corePoolSize threads are running, try to
  24. * start a new thread with the given command as its first
  25. * task. The call to addWorker atomically checks runState and
  26. * workerCount, and so prevents false alarms that would add
  27. * threads when it shouldn't, by returning false.
  28. * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
  29. *
  30. * 2. If a task can be successfully queued, then we still need
  31. * to double-check whether we should have added a thread
  32. * (because existing ones died since last checking) or that
  33. * the pool shut down since entry into this method. So we
  34. * recheck state and if necessary roll back the enqueuing if
  35. * stopped, or start a new thread if there are none.
  36. * 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了
  37. * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
  38. *
  39. * 3. If we cannot queue task, then we try to add a new
  40. * thread. If it fails, we know we are shut down or saturated
  41. * and so reject the task.
  42. * 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)
  43. * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
  44. */
  45. int c = ctl.get();
  46. /**
  47. * 1、如果当前线程数少于corePoolSize(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
  48. */
  49. if (workerCountOf(c) < corePoolSize) {
  50. //addWorker()成功,返回
  51. if (addWorker(command, true))
  52. return;
  53. /**
  54. * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
  55. * 失败的原因可能是:
  56. * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
  57. * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
  58. */
  59. c = ctl.get();
  60. }
  61. /**
  62. * 2、如果线程池RUNNING状态,且入队列成功
  63. */
  64. if (isRunning(c) && workQueue.offer(command)) {
  65. int recheck = ctl.get();//再次校验位
  66. /**
  67. * 再次校验放入workerQueue中的任务是否能被执行
  68. * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
  69. * 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
  70. */
  71. //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command
  72. if (! isRunning(recheck) && remove(comman
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/1008812
推荐阅读
相关标签
  

闽ICP备14008679号