当前位置:   article > 正文

ThreadPoolExecutor 源码解析

threadpoolexcetor源码解析

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。合理地使用线程池能够带来3个好处:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的主要处理流程

线程池主要处理流程

ThreadPoolExecutor 类图

ThreadPoolExecutor

java中的线程池都是基于ThreadPoolExecutor 来实现的。

核心属性

  1. // 状态控制属性:高3位表示线程池的运行状态,剩下的29位表示当前有效的线程数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,
  4. // 即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于
  5. // 线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,
  6. // 线程池会提前创建并启动所有基本线程。
  7. private volatile int corePoolSize;
  8. // 线程池线程最大数量,如果队列满了,并且已创建的线程数小于最大线程数,
  9. // 则线程池会再创建新的线程执行任务。如果使用了无界的任务队列这个参数就没什么效果。
  10. private volatile int maximumPoolSize;
  11. // 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。
  12. private volatile ThreadFactory threadFactory;
  13. // 饱和策略,默认情况下是AbortPolicy。
  14. private volatile RejectedExecutionHandler handler;
  15. // 线程池的工作线程空闲后,保持存活的时间。如果任务很多,并且每个任务执行的时间比较短,
  16. // 可以调大时间,提高线程的利用率。
  17. private volatile long keepAliveTime;
  18. // 用于保存等待执行的任务的阻塞队列,具体可以参考[JAVA并发容器-阻塞队列](https://www.jianshu.com/p/5646fb5faee1)
  19. private final BlockingQueue<Runnable> workQueue;
  20. // 存放工作线程的容器,必须获取到锁才能访问
  21. private final HashSet<Worker> workers = new HashSet<Worker>();
  22. // ctl的拆包和包装
  23. private static int runStateOf(int c) { return c & ~CAPACITY; }
  24. private static int workerCountOf(int c) { return c & CAPACITY; }
  25. private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 阻塞队列可以参考JAVA并发容器-阻塞队列
  • ctl状态控制属性,高3位表示线程池的运行状态(runState),剩下的29位表示当前有效的线程数量(workerCount
  • 线程池最大线程数是 (1 << COUNT_BITS) - 1 = 536 870 911

线程池的运行状态runState

状态解释
RUNNING运行态,可处理新任务并执行队列中的任务
SHUTDOW关闭态,不接受新任务,但处理队列中的任务
STOP停止态,不接受新任务,不处理队列中任务,且打断运行中任务
TIDYING整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
TERMINATED结束态,terminated() 方法已完成
线程池运行状态

RejectedExecutionHandler(拒绝策略)

  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

核心内部类 Worker

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  2. // 正在执行任务的线程
  3. final Thread thread;
  4. // 线程创建时初始化的任务
  5. Runnable firstTask;
  6. // 完成任务计数器
  7. volatile long completedTasks;
  8. Worker(Runnable firstTask) {
  9. // 在runWorker方法运行之前禁止中断,要中断线程必须先获取worker内部的互斥锁
  10. setState(-1); // inhibit interrupts until runWorker
  11. this.firstTask = firstTask;
  12. this.thread = getThreadFactory().newThread(this);
  13. }
  14. /** delegates main run loop to outer runworker */
  15. // 直接委托给外部runworker方法
  16. public void run() {
  17. runWorker(this);
  18. }
  19. ...
  20. }

Worker 类他将执行任务的线程封装到了内部,在初始化Worker 的时候,会调用ThreadFactory初始化新线程;Worker 继承了AbstractQueuedSynchronizer,在内部实现了一个互斥锁,主要目的是控制工作线程的中断状态。

线程的中断一般是由其他线程发起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法,它在调用过程中会去中断worker内部的工作线程,Work的互斥锁可以保证正在执行的任务不被打断。它是怎么保证的呢?在线程真正执行任务的时候,也就是runWorker方法被调用时,它会先获取到Work的锁,当我们在其他线程需要中断当前线程时也需要获取到work的互斥锁,否则不能中断。

构造函数

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

通过构造函数我们可以发现,构造函数就是在对线程池核心属性进行赋值,下面我们来介绍一下这些核心属性:

  • corePoolSize:核心线程数
  • maximumPoolSize:线程池最大数量
  • keepAliveTime:线程池的工作线程空闲后,保持存活的时间。
  • unit:线程活动保持时间的单位。
  • workQueue:用于保存等待执行的任务的阻塞队列,具体可以参考JAVA并发容器-阻塞队列
  • threadFactory:用于设置创建线程的工厂
  • handler:饱和策略,默认情况下是AbortPolicy。

execute() 提交线程

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. // 获取控制的值
  5. int c = ctl.get();
  6. // 判断工作线程数是否小于corePoolSize
  7. if (workerCountOf(c) < corePoolSize) {
  8. // 新创建核心线程
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. // 工作线程数大于或等于corePoolSize
  14. // 判断线程池是否处于运行状态,如果是将任务command入队
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. int recheck = ctl.get();
  17. // 再次检查线程池的运行状态,如果不在运行中,那么将任务从队列里面删除,并尝试结束线程池
  18. if (! isRunning(recheck) && remove(command))
  19. // 调用驱逐策略
  20. reject(command);
  21. // 检查活跃线程总数是否为0
  22. else if (workerCountOf(recheck) == 0)
  23. // 新创建非核心线程
  24. addWorker(null, false);
  25. }
  26. // 队列满了,新创建非核心线程
  27. else if (!addWorker(command, false))
  28. // 调用驱逐策略
  29. reject(command);
  30. }

该方法是没有返回值的

addWorker() 新创建线程

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 仅在必要的时候检查队列是否为NULL
  7. // 检查队列是否处于非运行状态
  8. if (rs >= SHUTDOWN &&
  9. ! (rs == SHUTDOWN &&
  10. firstTask == null &&
  11. ! workQueue.isEmpty()))
  12. return false;
  13. for (;;) {
  14. // 获取活跃线程数
  15. int wc = workerCountOf(c);
  16. // 判断线程是否超过最大值,当队列满了则验证线程数是否大于maximumPoolSize,
  17. // 没有满则验证corePoolSize
  18. if (wc >= CAPACITY ||
  19. wc >= (core ? corePoolSize : maximumPoolSize))
  20. return false;
  21. // 增加活跃线程总数,否则重试
  22. if (compareAndIncrementWorkerCount(c))
  23. // 如果成功跳出外层循环
  24. break retry;
  25. c = ctl.get(); // Re-read ctl
  26. // 再次校验一下线程池运行状态
  27. if (runStateOf(c) != rs)
  28. continue retry;
  29. // else CAS failed due to workerCount change; retry inner loop
  30. }
  31. }
  32. // 工作线程是否启动
  33. boolean workerStarted = false;
  34. // 工作线程是否创建
  35. boolean workerAdded = false;
  36. Worker w = null;
  37. try {
  38. // 新创建线程
  39. w = new Worker(firstTask);
  40. // 获取新创建的线程
  41. final Thread t = w.thread;
  42. if (t != null) {
  43. // 创建线程要获得全局锁
  44. final ReentrantLock mainLock = this.mainLock;
  45. mainLock.lock();
  46. try {
  47. // Recheck while holding lock.
  48. // Back out on ThreadFactory failure or if
  49. // shut down before lock acquired.
  50. int rs = runStateOf(ctl.get());
  51. // 检查线程池的运行状态
  52. if (rs < SHUTDOWN ||
  53. (rs == SHUTDOWN && firstTask == null)) {
  54. // 检查线程的状态
  55. if (t.isAlive()) // precheck that t is startable
  56. throw new IllegalThreadStateException();
  57. // 将新建工作线程存放到容器
  58. workers.add(w);
  59. int s = workers.size();
  60. if (s > largestPoolSize) {
  61. // 跟踪线程池最大的工作线程总数
  62. largestPoolSize = s;
  63. }
  64. workerAdded = true;
  65. }
  66. } finally {
  67. mainLock.unlock();
  68. }
  69. // 启动工作线程
  70. if (workerAdded) {
  71. t.start();
  72. workerStarted = true;
  73. }
  74. }
  75. } finally {
  76. if (! workerStarted)
  77. // 启动新的工作线程失败,
  78. // 1. 将工作线程移除workers容器
  79. // 2. 还原工作线程总数(workerCount)
  80. // 3. 尝试结束线程
  81. addWorkerFailed(w);
  82. }
  83. return workerStarted;
  84. }

如果启动新线程失败那么addWorkerFailed()这个方法将做一下三件事:

  1. 将工作线程移除workers容器
  2. 还原工作线程总数(workerCount)
  3. 尝试结束线程

execute() 执行过程

  1. 如果当前运行的线程少于corePoolSize,即使有空闲线程也会创建新线程来执行任务,(注意,执行这一步骤 需要获取全局锁)。如果调用了线程池的restartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执 行这一步骤需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

线程任务的执行

线程的正在执行是ThreadPoolExecutor.Worker#run()方法,但是这个方法直接委托给了外部的runWorker()方法,源码如下:

  1. // 直接委托给外部runworker方法
  2. public void run() {
  3. runWorker(this);
  4. }

runWorker() 执行任务

  1. final void runWorker(Worker w) {
  2. // 当前Work中的工作线程
  3. Thread wt = Thread.currentThread();
  4. // 获取初始任务
  5. Runnable task = w.firstTask;
  6. // 初始任务置NULL(表示不是建线程)
  7. w.firstTask = null;
  8. // 修改锁的状态,使需发起中断的线程可以获取到锁(使工作线程可以响应中断)
  9. w.unlock(); // allow interrupts
  10. // 工作线程是否是异常结束
  11. boolean completedAbruptly = true;
  12. try {
  13. // 循环的从队列里面获取任务
  14. while (task != null || (task = getTask()) != null) {
  15. // 每次执行任务时需要获取到内置的互斥锁
  16. w.lock();
  17. // 1. 当前工作线程不是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,我们需要中断当前工作线程
  18. // 2. 当前工作线程是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,我们需要中断当前工作线程
  19. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
  20. && !wt.isInterrupted())
  21. // 中断线程,中断标志位设置成true
  22. wt.interrupt();
  23. try {
  24. // 执行任务前置方法,扩展用
  25. beforeExecute(wt, task);
  26. Throwable thrown = null;
  27. try {
  28. // 执行任务
  29. task.run();
  30. } catch (RuntimeException x) {
  31. thrown = x; throw x;
  32. } catch (Error x) {
  33. thrown = x; throw x;
  34. } catch (Throwable x) {
  35. thrown = x; throw new Error(x);
  36. } finally {
  37. // 执行任务后置方法,扩展用
  38. afterExecute(task, thrown);
  39. }
  40. } finally {
  41. // 任务NULL表示已经处理了
  42. task = null;
  43. w.completedTasks++;
  44. w.unlock();
  45. }
  46. }
  47. completedAbruptly = false;
  48. } finally {
  49. // 将工作线程从容器中剔除
  50. processWorkerExit(w, completedAbruptly);
  51. }
  52. }

正在执行线程的方法,执行流程:

  1. 获取到当前的工作线程
  2. 获取初始化的线程任务
  3. 修改锁的状态,使工作线程可以响应中断
  4. 获取工作线程的锁(保证在任务执行过程中工作线程不被外部线程中断),如果获取到的任务是NULL,则结束当前工作线程
  5. 判断先测试状态,看是否需要中断当前工作线程
  6. 执行任务前置方法beforeExecute(wt, task);
  7. 执行任务(执行提交到线程池的线程)task.run();
  8. 执行任务后置方法afterExecute(task, thrown);,处理异常信息
  9. 修改完成任务的总数
  10. 解除当前工作线程的锁
  11. 获取队列里面的任务,循环第4步
  12. 将工作线程从容器中剔除
  • wt.isInterrupted():获取中断状态,无副作用
  • Thread.interrupted():获取中断状态,并将中断状态恢重置成false(不中断)
  • beforeExecute(wt, task);:执行任务前置方法,扩展用。如果这个方法在执行过程中抛出异常,那么会导致当前工作线程直接死亡而被回收,工作线程异常结束标记位completedAbruptly被设置成true,任务线程不能被执行
  • task.run();: 执行任务
  • afterExecute(task, thrown);:执行任务后置方法,扩展用。这个方法可以收集到任务运行的异常信息,这个方法如果有异常抛出,也会导致当前工作线程直接死亡而被回收,工作线程异常结束标记位completedAbruptly被设置成true
  • 任务运行过程中的异常信息除了RuntimeException以外,其他全部封装成Error,然后被afterExecute方法收集
  • terminated()这也是一个扩展方法,在线程池结束的时候调用

getTask() 获取任务

  1. private Runnable getTask() {
  2. // 记录最后一次获取任务是不是超时了
  3. boolean timedOut = false; // Did the last poll() time out?
  4. for (;;) {
  5. int c = ctl.get();
  6. // 获取线程池状态
  7. int rs = runStateOf(c);
  8. // 线程池是停止状态或者状态是关闭并且队列为空
  9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  10. // 扣减工作线程总数
  11. decrementWorkerCount();
  12. return null;
  13. }
  14. // 获取工作线程总数
  15. int wc = workerCountOf(c);
  16. // 工作线程是否需要剔除
  17. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  18. if ((wc > maximumPoolSize || (timed && timedOut))
  19. && (wc > 1 || workQueue.isEmpty())) {
  20. // 扣减工作线程总数
  21. if (compareAndDecrementWorkerCount(c))
  22. // 剔除工作线程,当返回为NULL的时候,runWorker方法的while循环会结束
  23. return null;
  24. continue;
  25. }
  26. try {
  27. Runnable r = timed ?
  28. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  29. workQueue.take();
  30. if (r != null)
  31. return r;
  32. timedOut = true;
  33. } catch (InterruptedException retry) {
  34. timedOut = false;
  35. }
  36. }
  37. }

getTask() 阻塞或定时获取任务。当该方法返回NULL时,当前工作线程会结束,最后被回收,下面是返回NULL的几种情况:

  1. 当前工作线程总数wc大于maximumPoolSize最大工作线程总数。maximumPoolSize可能被setMaximumPoolSize方法改变。
  2. 当线程池处于停止状态时。
  3. 当线程池处于关闭状态且阻塞队列为空。
  4. 当前工作线程超时等待任务,并且当前工作线程总数wc大于corePoolSize或者allowCoreThreadTimeOut=true允许核心线程超时被回收,默认是false。

线程池在运行过程中可以调用setMaximumPoolSize()方法来修改maximumPoolSize值,新的值必须大于corePoolSize,如果新的maximumPoolSize小于原来的值,那么在该方法会去中断当前的空闲线程(工作线程内置锁的是解锁状态的线程为空闲线程)。

processWorkerExit() 工作线程结束

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. // 判断是否是异常情况导致工作线程被回收
  3. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  4. // 如果是扣减工作线程总数,如果不是在getTask()方法就已经扣减了
  5. decrementWorkerCount();
  6. final ReentrantLock mainLock = this.mainLock;
  7. mainLock.lock();
  8. try {
  9. // 将当前工作线程完成任务的总数加到completedTaskCount标志位上
  10. completedTaskCount += w.completedTasks;
  11. // 剔除当前工作线程
  12. workers.remove(w);
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. // 尝试结束线程池
  17. tryTerminate();
  18. // 判刑是否需要新实例化工程线程
  19. int c = ctl.get();
  20. if (runStateLessThan(c, STOP)) {
  21. if (!completedAbruptly) {
  22. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  23. if (min == 0 && ! workQueue.isEmpty())
  24. min = 1;
  25. if (workerCountOf(c) >= min)
  26. return; // replacement not needed
  27. }
  28. addWorker(null, false);
  29. }
  30. }

剔除线程流程:

  1. 判断是否是异常情况导致工作线程被回收,如果是workerCount--
  2. 获取到全局锁
  3. 将当前工作线程完成任务的总数加到completedTaskCount标志位上
  4. 剔除工作线程
  5. 解锁
  6. 尝试结束线程池tryTerminate()
  7. 判刑是否需要重新实例化工程线程放到workers容器

结束线程池

shutdown() 关闭线程池

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. // 检查权限
  6. checkShutdownAccess();
  7. // 设置线程池状态为关闭
  8. advanceRunState(SHUTDOWN);
  9. // 中断线程
  10. interruptIdleWorkers();
  11. // 扩展方法
  12. onShutdown(); // hook for ScheduledThreadPoolExecutor
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. // 尝试结束线池
  17. tryTerminate();
  18. }
  • 通过遍历工作线程容器workers,然后逐个中断工作线程,如果无法响应中断的任务可能永远无法终止
  • shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

shutdown() 关闭线程池

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. // 检查权限
  7. checkShutdownAccess();
  8. // 设置线程池状态为停止状态
  9. advanceRunState(STOP);
  10. // 中断线程
  11. interruptIdleWorkers();
  12. // 将所有任务移动到list容器
  13. tasks = drainQueue();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. // 尝试结束线池
  18. tryTerminate();
  19. // 返回所有未执行的任务
  20. return tasks;
  21. }
  • 通过遍历工作线程容器workers,然后逐个中断工作线程,如果无法响应中断的任务可能永远无法终止
  • shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表

tryTerminate() 尝试结束线程池

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. // 判断是否在运行中,如果是直接返回
  5. if (isRunning(c) ||
  6. // 判断是否进入整理状态,如果进入了直接返回
  7. runStateAtLeast(c, TIDYING) ||
  8. // 如果是状态是关闭并且队列非空,也直接返回(关闭状态需要等到队列里面的线程处理完)
  9. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  10. return;
  11. // 判断工作线程是否都关闭了
  12. if (workerCountOf(c) != 0) { // Eligible to terminate
  13. // 中断空闲线程
  14. interruptIdleWorkers(ONLY_ONE);
  15. return;
  16. }
  17. final ReentrantLock mainLock = this.mainLock;
  18. mainLock.lock();
  19. try {
  20. // 将状态替换成整理状态
  21. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  22. try {
  23. // 整理发放执行
  24. terminated();
  25. } finally {
  26. // 状态替换成结束状态
  27. ctl.set(ctlOf(TERMINATED, 0));
  28. termination.signalAll();
  29. }
  30. return;
  31. }
  32. } finally {
  33. mainLock.unlock();
  34. }
  35. // else retry on failed CAS
  36. }
  37. }

结束线程池大致流程为:

  1. 判断是否在运行中,如果是则不结束线程
  2. 判断是否进入整理状态,如果是也不用执行后面内容了
  3. 判断如果线程池是关闭状态并且队列非空,则不结束线程池(关闭状态需要等到队列里面的线程处理完)
  4. 判断工作线程是否都关闭了,如果没有就发起中断工作线程的请求
  5. 获取全局锁将线程池状态替换成整理状态
  6. 调用terminated();扩展方法(这也是一个扩展方法,在线程池结束的时候调用)
  7. 将线程池状态替换成结束状态
  8. 解除全局锁
  • 注意:
  • 我们可以通过的shutdownshutdownNow方法来结束线程池。他们都是通过遍历工作线程容器,然后逐个中断工作线程,所以无法响应中断的任务 可能永远无法终止。
  • shutdownshutdownNow的区别在于:shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;而 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线 程。
  • 只要调用了shutdownshutdownNow那么isShutdown方法就会返回true
  • 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true

线程池的监控

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。

getTaskCount()

  1. public long getTaskCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. long n = completedTaskCount;
  6. for (Worker w : workers) {
  7. n += w.completedTasks;
  8. if (w.isLocked())
  9. ++n;
  10. }
  11. return n + workQueue.size();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. }

获取线程池需要执行的任务数量。 总数=已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)+正在执行的任务数(w.isLocked())+还未执行的任务数(workQueue.size())

getCompletedTaskCount()

  1. public long getCompletedTaskCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. long n = completedTaskCount;
  6. for (Worker w : workers)
  7. n += w.completedTasks;
  8. return n;
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

获取线程池在运行过程中已完成的任务数量。 总数=已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)

getLargestPoolSize()

  1. public int getLargestPoolSize() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. return largestPoolSize;
  6. } finally {
  7. mainLock.unlock();
  8. }
  9. }

获取线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。

getPoolSize()

  1. public int getPoolSize() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. // Remove rare and surprising possibility of
  6. // isTerminated() && getPoolSize() > 0
  7. return runStateAtLeast(ctl.get(), TIDYING) ? 0
  8. : workers.size();
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

获取线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。

getActiveCount()

  1. public int getActiveCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. int n = 0;
  6. for (Worker w : workers)
  7. if (w.isLocked())
  8. ++n;
  9. return n;
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. }

获取活动的线程数。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。

  • 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能 执行。
  • 可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
  • **建议使用有界队列。**有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。无界队列在某些异常情况下可能会撑爆内存。

N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化,详情可以参考线程数究竟设多少合理

参考

《java并发编程的艺术》

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

转载于:https://my.oschina.net/u/3748347/blog/3082758

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/944570
推荐阅读
相关标签
  

闽ICP备14008679号