当前位置:   article > 正文

ThreadPoolExecutor源码分析

threadpoolexecutor源码分析

一、线程和任务的概念

首先区分概念,任务和线程。可以简单理解为任务为Runnable,线程为Thread。ThreadPoolExecutor内部维持的是线程池,因为创建线程比较耗时耗资源。而内部维护任务使用的是BlockingQueue。

 

二、ThreadPoolExecutor源码

2.1 ctl变量

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是AtomicInteger的,所以是线程安全的。ctl维护两个概念上的参数:workCount和runState。workCount表示有效的线程数量,runState表示线程池的运行状态。运行状态只要有五个,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。AtomicInteger是一个32位的整数,为了将状态和数量放在一起,所以高3位用于表示表示状态,低29位表示数量。下面是状态和一些参数定义:

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. // 低29位表示线程数量
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1; //即二进制11111111111111111111111111111 (29位)
  5. // runState is stored in the high-order bits 线程池运行状态存储在32位的高3位
  6. private static final int RUNNING = -1 << COUNT_BITS; // 即二进制11100000000000000000000000000000(32位)
  7. private static final int SHUTDOWN = 0 << COUNT_BITS; // 即二进制00000000000000000000000000000000(32位)
  8. private static final int STOP = 1 << COUNT_BITS; // 即二进制00100000000000000000000000000000(32位)
  9. private static final int TIDYING = 2 << COUNT_BITS; // 即二进制01000000000000000000000000000000(32位)
  10. private static final int TERMINATED = 3 << COUNT_BITS; // 即二进制01100000000000000000000000000000(32位)

 

下面介绍各个线程池各个状态的含义:

RUNNING 
接受新任务并且处理已经进入队列的任务 
SHUTDOWN 
不接受新任务,但是处理已经进入队列的任务 
STOP 
不接受新任务,不处理已经进入队列的任务,并且中断正在执行的任务 
TIDYING 
所有任务执行完成,workerCount为0。线程转到了状态TIDYING会执行terminated()钩子方法 
TERMINATED 
terminated()已经执行完成 
状态之间可以相互转换

RUNNING -> SHUTDOWN 
调用了shutdown()方法 
(RUNNING 或 SHUTDOWN) -> STOP 
调用了shutdownNow() 
SHUTDOWN -> TIDYING 
当队列和线程池为空 
STOP -> TIDYING 
当线程池为空 
TIDYING -> TERMINATED 
当terminated()钩子方法执行完成 

å¨è¿éæå¥å¾çæè¿°

  1. private static int runStateOf(int c) { return c & ~CAPACITY; }
  2. private static int workerCountOf(int c) { return c & CAPACITY; }
  3. private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl的初始值是:ctlOf(RUNNING, 0)即11100000000000000000000000000000,如果新建一个线程,ctl就增加1,等于11100000000000000000000000000001,workerCountOf计算工作线程数量,即11100000000000000000000000000001 &11111111111111111111111111111 = 1,runStateOf计算状态值,~CAPACITY(取反) = 11100000000000000000000000000000;即11100000000000000000000000000001 & 11100000000000000000000000000000 = 11100000000000000000000000000000,等于RUNNING 状态。JDK源码中有不少地方巧妙的运用了位运算,熟悉很有必要。

 

2.2 ThreadPoolExecutor构造方法

ThreadPoolExecutor有几个构造方法,构造方法中有几个参数,分别是corePoolSize、maximunPoolSize、keepAliveTime、unit、workQueue、threadFactory和handler。下面分别介绍这个几个参数: 
corePoolSize 
核心线程的数量。默认是没有超时的,也就是说就算线程闲置,也不会被处理。但是如果设置了allowCoreTimeOut为true,那么当核心线程闲置时,会被回收。 
maximumPoolSize 
最大线程池尺寸,被CAPACITY限制(2^29-1)。 
keepAliveTime 
闲置线程被回收的时间限制 
unit 
keepAliveTime的单位 
workQueue 
用于存放任务的队列 
threadFactory 
创建线程的工厂类 
handler 
当任务执行失败时,使用handler通知调用者
 

2.3 ThreadPoolExecutor流程

当创建好一个ThreadPoolExecutor对象后,调用execute(Runnable r)方法执行任务。下面是execute方法的实现:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get();
  25. // 1.
  26. if (workerCountOf(c) < corePoolSize) {
  27. if (addWorker(command, true))
  28. return;
  29. c = ctl.get();
  30. }
  31. // 2.
  32. if (isRunning(c) && workQueue.offer(command)) {
  33. int recheck = ctl.get();
  34. if (! isRunning(recheck) && remove(command))
  35. reject(command);
  36. else if (workerCountOf(recheck) == 0)
  37. /* 线程池处于Running状态,创建一个非核心线程,没有把任务command交给它呢是因为任务已经
  38. 进入队列了,null表示新建线程,但不会启动它,false表示最大线程树与非核心线程数为界限*/
  39. addWorker(null, false);
  40. }
  41. // 3.
  42. else if (!addWorker(command, false))
  43. reject(command);
  44. }

一个任务command进入线程池的执行流程,方法execute(command)分3步进行:

1,判断当前线程数是否大于核心线程数,小于则通过addWorker方法创建一个新线程(核心线程)执行任务;

2,如果当前线程数大于核心线程数,判断线程池状态,处于RUNNING,则将任务添加到阻塞队列;再次检查线程池状态,

  • 2.1,如果不在RUNNING状态,从阻塞队列移除任务command,并执行相关拒绝策略;
  • 2.2,如果处于RUNNING状态,但当前没有线程,则新建非核心线程,addWorker(null, false),使用null是因为任务已经进入队列,使用false是将线程池的最大线程数从corePoolSize调整为maximumPoolSize。

3,如果任务command添加到阻塞队列失败,直接创建一个非核心线程执行它,失败则执行拒绝策略。

为什么需要double check线程池的状态? 
 在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

任务进入线程池的大体流程是,先创建核心线程去执行它,如果核心线程已超限,那么就把任务添加进队列;队列满了就创建非核心线程,非核心线程也超限了,则执行相关拒绝策略。

2.4 Worker

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. // 非重入锁
  6. protected boolean tryAcquire(int unused) {
  7. if (compareAndSetState(0, 1)) {
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. return true;
  10. }
  11. return false;
  12. }

       Worker包装了任务线程,主要是为了维护中断控制状态和其他次要状态记录,及任务的执行。Worker同时继承了AQS,在任务线程执行前lock,任务执行完unlock。加锁的目的主要是保护任务线程的执行,线程池唤醒一个任务线程等待任务,而不是中断当前正在执行任务的线程去执行任务。Worker使用了一个 非重入互质锁,而不是ReentrantLock,这样做的目的是以防在任务执行的过程,线程池控制方法的改变,对任务线程执行的影响,比如setCorePoolSize方法。另外为了防止任务线程在实际执行前被中断,我们初始化锁状态为-1,在runWorker方法中,我们会清除它。runWorker执行任务时,首先释放锁,此时锁打开,允许中断,如果线程池正在stop,确保线程池已中断,否则 做执行前工作,执行任务,做执行后工作,如果任务被中断,则工作线程数量减1; 如果任务完成,则更新完成任务数量,从工作任务集中移除工作线程,尝试结束线程池。

 addWorker方法

从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务 
线程池创建新线程执行任务时,需要获取全局锁:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. //外循环死循环
  3. retry:
  4. for (;;) {
  5. int c = ctl.get();
  6. //得到运行状态
  7. int rs = runStateOf(c);
  8. // 检查状态
  9. if (rs >= SHUTDOWN &&
  10. ! (rs == SHUTDOWN &&
  11. firstTask == null &&
  12. ! workQueue.isEmpty()))
  13. return false; // addWorker失败,退出方法
  14. //状态符合跳球,死循环
  15. for (;;) {
  16. int wc = workerCountOf(c);
  17. //如果worker数量超过了容量或者超过了corePoolSize或者maximumPoolSize,直接返回false
  18. if (wc >= CAPACITY ||
  19. wc >= (core ? corePoolSize : maximumPoolSize))
  20. return false;
  21. //如果成功将worker数+1,那么跳出外循环
  22. if (compareAndIncrementWorkerCount(c))
  23. break retry;
  24. //否则,重新读取ctl
  25. c = ctl.get(); // Re-read ctl
  26. if (runStateOf(c) != rs)
  27. continue retry;
  28. // else CAS failed due to workerCount change; retry inner loop
  29. }
  30. }
  31. //添加Worker
  32. boolean workerStarted = false;
  33. boolean workerAdded = false;
  34. Worker w = null;
  35. try {
  36. //以firstTask作为Worker的第一个任务创建Worker
  37. w = new Worker(firstTask);
  38. final Thread t = w.thread;
  39. if (t != null) {
  40. //对整个线程池加锁
  41. final ReentrantLock mainLock = this.mainLock;
  42. mainLock.lock();
  43. try {
  44. //再次检查ctl状态
  45. int rs = runStateOf(ctl.get());
  46. if (rs < SHUTDOWN ||
  47. (rs == SHUTDOWN && firstTask == null)) {
  48. if (t.isAlive()) // precheck that t is startable
  49. throw new IllegalThreadStateException();
  50. workers.add(w);
  51. int s = workers.size();
  52. if (s > largestPoolSize)
  53. largestPoolSize = s;
  54. workerAdded = true;
  55. }
  56. } finally {
  57. mainLock.unlock();
  58. }
  59. if (workerAdded) {
  60. t.start();
  61. workerStarted = true;
  62. }
  63. }
  64. } finally {
  65. if (! workerStarted)
  66. addWorkerFailed(w);
  67. }
  68. return workerStarted;
  69. }

首先是两个死循环,外循环主要检查线程池运行状态,内循环检查workerCount之后再检查运行状态。下面简单分析一下哪些情况下才可以进入到内循环,否则就直接返回false了。下面是可以进入到内循环的情况: 

  1. rs>=SHUTDOWN为false,即线程池处于RUNNING状态 
  2. rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()这个条件为true,也就意味着三个条件同时满足,即线程池状态为SHUTDOWN且firstTask为null且队列不为空,这种情况为处理队列中剩余任务。上面提到过当处于SHUTDOWN状态时,不接受新任务,但是会处理完队列里面的任务。如果firstTask不为null,那么就属于添加新任务;如果firstTask为null,并且队列为空,那么就不需要再处理了。 

当进入到内循环后,会首先获取当前运行的线程数量。首先判断当前运行线程数量是否大于等于CAPACITYA(2^29-1),其次根据是否是核心线程与corePoolSize或者maximumPoolSize比较。所以线程的数量不会超过CAPACITY和maximumPoolSize的较小值。如果数量符合条件,那么就让ctl加1,然后跳出外部循环。如果线程数量达到了最大,那么回再判断当前状态,如果状态和之前的不一致了,那么继续外循环。下面是可以跳出外循环的情况:

  1. 如果是核心线程,当前线程数量小于CAPACITY和corePoolSize中的较小值 
  2. 如果是非核心线程,当前线程数量小于CAPACITY和maximumPoolSize中的较小值。 

一旦跳出外循环,表示可以创建创建线程,这里具体是Worker对象,Worker实现了Runnable接口并且继承AbstractQueueSynchronizer,内部维持一个Runnbale的队列。try块中主要就是创建Worker对象,然后将其保存到workers中,workers是一个HashSet,表示工作线程的集合。然后如果添加成功,则开启Worker所在的线程。如果开启线程失败,则调用addWorkerFailed方法,addWokerFailed用于回滚worker线程的创建。下面是addWorkerFailed的实现:

  1. private void addWorkerFailed(Worker w) {
  2. //对整个线程成绩加锁
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. //移除Worker对象
  7. if (w != null)
  8. workers.remove(w);
  9. //减小worker数量
  10. decrementWorkerCount();
  11. //检查termination状态
  12. tryTerminate();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. }

从代码中可以看出,addWorkerFailed首先从workers集合中移除线程,然后将wokerCount减1,最后检查终结。下面是tryTerminate的实现,该方法用于检查是否有必要将线程池状态转移到TERMINATED。

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (isRunning(c) ||
  5. runStateAtLeast(c, TIDYING) ||
  6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  7. return;
  8. if (workerCountOf(c) != 0) { // Eligible to terminate
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  16. try {
  17. terminated();
  18. } finally {
  19. ctl.set(ctlOf(TERMINATED, 0));
  20. termination.signalAll();
  21. }
  22. return;
  23. }
  24. } finally {
  25. mainLock.unlock();
  26. }
  27. // else retry on failed CAS
  28. }
  29. }

 tryTerminate内部是一个死循环,首先判断状态,下面是跳出循环的情况: 

  1. 线程池处于RUNNING状态 
  2. 线程池状态处于TIDYING状态 
  3. 线程池状态处于SHUTDOWN状态并且队列不为空 

如果不满足上述的情况,那么目前状态属于SHUTDOWN且队列为空,或者状态属于STOP,那么调用interruptIdleWorkers方法停止一个Worker线程,然后退出。 
接下来如果没有退出循环的话,那么就首先将状态设置成TIDYING,然后调用terminated方法,最后设置状态为TERMINATED。terminated方法是个空实现,用于当线程池终结时处理一些事情。 
下面看Worker的实现,
 

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. private static final long serialVersionUID = 6138294804551838833L;
  6. /** Thread this worker is running in. Null if factory fails. */
  7. final Thread thread;
  8. /** Initial task to run. Possibly null. */
  9. Runnable firstTask;
  10. /** Per-thread task counter */
  11. volatile long completedTasks;
  12. /**
  13. * Creates with given first task and thread from ThreadFactory.
  14. * @param firstTask the first task (null if none)
  15. */
  16. Worker(Runnable firstTask) {
  17. setState(-1); // inhibit interrupts until runWorker
  18. this.firstTask = firstTask;
  19. this.thread = getThreadFactory().newThread(this);
  20. }
  21. /** Delegates main run loop to outer runWorker */
  22. public void run() {
  23. runWorker(this);
  24. }
  25. // Lock methods
  26. //
  27. // The value 0 represents the unlocked state.
  28. // The value 1 represents the locked state.
  29. protected boolean isHeldExclusively() {
  30. return getState() != 0;
  31. }
  32. protected boolean tryAcquire(int unused) {
  33. if (compareAndSetState(0, 1)) {
  34. setExclusiveOwnerThread(Thread.currentThread());
  35. return true;
  36. }
  37. return false;
  38. }
  39. protected boolean tryRelease(int unused) {
  40. setExclusiveOwnerThread(null);
  41. setState(0);
  42. return true;
  43. }
  44. public void lock() { acquire(1); }
  45. public boolean tryLock() { return tryAcquire(1); }
  46. public void unlock() { release(1); }
  47. public boolean isLocked() { return isHeldExclusively(); }
  48. void interruptIfStarted() {
  49. Thread t;
  50. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  51. try {
  52. t.interrupt();
  53. } catch (SecurityException ignore) {
  54. }
  55. }
  56. }
  57. }

Worker继承自AbstractQueuedSynchronizer并实现Runnbale接口。AbstractQueuedSynchronizer提供了一个实现阻塞锁和其他同步工具,比如信号量、事件等依赖于等待队列的框架。Worker的构造方法中会使用threadFactory构造线程变量并持有,run方法调用了runWorker方法,将线程委托给主循环线程。runWorker方法的实现如下所示:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. //当任务不为null时
  9. while (task != null || (task = getTask()) != null) {
  10. //对Worker加锁
  11. w.lock();
  12. //如果线程池停止了,那么中断线程
  13. if ((runStateAtLeast(ctl.get(), STOP) ||
  14. (Thread.interrupted() &&
  15. runStateAtLeast(ctl.get(), STOP))) &&
  16. !wt.isInterrupted())
  17. wt.interrupt();
  18. try {
  19. beforeExecute(wt, task);
  20. Throwable thrown = null;
  21. try {
  22. //执行任务
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

runWoker方法主要不断从队列中取得任务并执行。首先获得Worker所在的线程,在addWorker中获得Worker的Thread变量并调用start方法,所以Worker是运行在Worker的Thread中,而thread变量是通过threadFactory创建的。可以看到首先获取Worker的firstTask对象,该对象有可能为空,初始时对于核心线程不为空,但是对于非核心线程就为空,下面是一个循环,跳出循环的条件为task==null&&(task=getTask())==null,也就是说当没有任何任务的时候,就跳出循环了,跳出循环也就意味着Worker的run方法执行结束,也就意味着线程结束;否则会一直尝试着从队列中获取任务来执行,getTask会阻塞,一旦获取到任务,就对Worker加锁,然后判断状态,如果状态处于STOP状态及之上,就不处理任务了;否则处理任务,在处理任务之前,首先会调用beforeExecute,然后调用Runnbale方法的run方法,最后调用afterExecute,其中beforeExecute和afterExecute都是空实现,继承ThreadPoolExecutor时可以实现,在每个任务运行之前和之后做一些处理工作。一旦一个任务执行完毕后,将task置为null,然后继续尝试从队列中取出任务。( while (task != null || (task = getTask()) != null) 这里并发下会不会导致任务的重复执行呢?比如两个线程在这里拿到了同一个task,其中一个执行任务,另外一个等待获取锁再执行,答案肯定是不会,但没看明白这里是怎么实现的)下面看一下getTask方法的实现:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 必要时检查队列是否为空
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // 是否允许线程超时
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14. //如果worker数量大于maximumPoolSize或者允许超时
  15. if ((wc > maximumPoolSize || (timed && timedOut))
  16. && (wc > 1 || workQueue.isEmpty())) {
  17. if (compareAndDecrementWorkerCount(c))
  18. return null;
  19. continue;
  20. }
  21. try {
  22. Runnable r = timed ?
  23. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  24. workQueue.take();
  25. if (r != null)
  26. return r;
  27. timedOut = true;
  28. } catch (InterruptedException retry) {
  29. timedOut = false;
  30. }
  31. }
  32. }

getTask从队列中取出任务;但是在以下几种情况下会返回null,上面说过如果返回null也就标识了runWorker中循环跳出,上面说过,runWorker中的循环跳出意味着Worker线程执行完毕会回收,所以调用了decrementWorkerCount将Worker数量减1。下面返回null的情况 :

  1. 由于调用了setMaximumPoolSize导致Worker的数量超过maximumPoolSize 
  2. 线程池处于STOP状态,STOP状态不再处理队列中的任务 
  3. 线程池处于SHUTDOWN并且queue为空。SHUTDOWN状态仍然处理已经在队列中的任务,但是如果queue为空,自然就不再处理了 
  4. Worker在等待队列时超时 getTask内部依然是一个死循环,首先依然是判断状态,如果状态是STOP及以上,那么返回null;如果状态是SHUTDOWN且队列为空,那么也返回null。这对应于情况2和3。 接下来是比较Worker的数量,首先获取Worker的数量以及是否需要超时标志,如果设置了allCoreThreadTimeOut为true,那么就意味着所以线程都得检验超时;而如果没有设置为true,那么只需要在Worker数量超过corePoolSize时检查超时。接下来是判断数量是否超过maximumPoolSize,如果超过了,则需要结束多余的Worker;如果超时了并且有时间限制,也需要停止线程。如果没有进入到if语句中,那么将会尝试从队列中获取任务,如果需要有时间限制,那么就调用workQueue的poll方法,如果没有则调用take方法,如果可以从队列中取到任务,那么就返回任务交由runWorker中去执行;但是如果返回失败,那么需要设置timeOut为true,这样在下一次进入循环时,会清除一个Worker。 

上面是ThreadPoolExecutor调用execute方法提交任务后的执行流程,下面总结一下: 

  1. 当Worker数量小于corePoolSize时,新建核心Worker,并将任务作为firstTask参数传入,然后返回;由于runWorker方法中firstTask不为null,所以核心线程在第一次进入循环时会将firstTask执行完成后,再进入循环时getTask时会阻塞,因为此时队列里面任务为空 
  2. 如果Worker数量超过corePoolSize,那么会首先将任务加入队列;如果可以成功加入队列,那么就再判断是否还在运行状态,如果不在运行状态,那么就从队列中删除任务并且调用reject方法;否则如果因为Worker数量为0,那么就创建一个非核心线程处理队列中的任务。 
  3. 如果2中由于队列已满不能加入队列,那么就尝试着开启一个非核心线程,如果开启非核心线程失败了,那么就调用reject处理;否则就等待着非核心线程从队列中取数据。 

addWorker方法中会两个死循环,外循环检查线程池状态是否还可以接受新任务;内循环根据是否是核心线程与corePoolSize或maximumPoolSize比较,如果数量符合则创建线程,否则添加失败。
 

三、常见线程池

创建线程池一般都通过Executors的工厂方法创建线程,一般有四种线程,分别是FixedSizeThreadPoolc、SingleThreadPool、CachedThreadPool和ScheduledThreadPool。

3.1 FixedSizeThreadPool

FixedSizeThreadPoolExecutor只有核心线程,没有非核心线程,并且核心线程的数量固定。下面是创建该线程池的方法:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

可以看到corePoolSize和maximumPoolSize均为nThreads参数,并且没有超时且队列是没有边界的。所以该线程池一旦开启,最多会有nThreads个线程,且线程一旦创建,就不会销毁(getTask方法使用workQueue.take()阻塞获取任务,runWorker会一直执行)。只要有任务提交,就会添加给核心线程或加入队列。

3.2 SingleThreadPool

SingleThreadPool创建一个Worker线程操作一个无边界的队列。如果使用该线程池,那么所有提交的任务将会按照顺序被执行。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

3.3 CachedThreadPool

CachedThreadPool只要需要新线程就会创建线程,如果之前创建的线程还可以复用,那么就会复用之前的线程。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

从构造方法中可以看出,corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,超时为60s且队列为SynchronousQueue。从前面的分析我们知道,线程池中线程的最大数量为CAPACITY,所以就算这边设置了Integer.MAX_VALUE,但是最大数量也只能达到2^29-1个线程。SynchronousQueue不会持有任务,一旦拥有任务就会将任务交给线程。所以说会不断创建线程,而如果线程没有销毁的话,就会从调用getTask尝试从队列中获取任务,如果长时间没有新任务,那么之前的线程会由于超时而销毁;而如果在这期间新加了任务,那么getTask就可以获取到任务,那么之前创建的线程也就可以得到复用。

3.4 ScheduledThreadPool

ScheduledThreadPool核心线程数量固定,非核心线程数量为Integer.MAX_VALUE,该线程池主要用于执行周期性的任务或在延时一段时间后执行任务。

  1. public static ScheduledExecutorService newScheduledThreadPool(
  2. int corePoolSize, ThreadFactory threadFactory) {
  3. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  4. }

四、总结

1. ThreadPoolExecutor构造器中各个参数的含义
在addWorker、getTask等方法中都需要使用到构造器中的参数,主要包括四部分,第一部分是线程数量,corePoolSize和maximumPoolSize;第二部分是超时,unit和keepAliveTime;第三部分是创建运行Worker的线程,即threadFactory;第四部分是RejectedExecutionHandler,即handler。 
有些线程池会设置maximumPoolSize为Integer.MAX_VALUE,但是由于高三位需要作为线程池的状态,所以线程池中线程的最大数量为CAPACITY

2. ThreadPoolExecutor各个状态的含义以及状态转换
ThreadPoolExecutor的状态保存在ctl变量的高三位,具有五种状态,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMITERNED。理解每个状态下线程池对任务和线程的操作,才能清楚在各个方法中为什么那么处理。

3. 执行流程
执行流程分为三步, 
1. 如果当前线程数小于corePoolSize,那么创建线程 
2. 如果当前线程数大于等于corePoolSize,那么将任务加入队列;如果成功加入,那么就等待线程的getTask获取到任务再去执行; 
3. 如果第2步中加入队列失败,那么尝试开启线程。如果当前线程数小于maximumPoolSize,那么创建线程成功,如果大于等于maximumPoolSize,那么创建线程失败。 
在上面为了讲解,区分出核心线程和非核心线程的区别,但是其实都一样,只不过是一个有初始的任务,一个firstTask为null,一旦当核心线程执行完初始的任务后,它就变得和非核心线程一样。如果设置了超时,那么并不会因为它是所谓的“核心线程”而不销毁,那么个时候所有线程都一样,一旦哪个线程阻塞在getTask那儿,就可能因为超时而销毁。 
在整个执行流程中,各个方法中会有多个死循环,要清楚在哪些状态下会跳出那些死循环。
 

便于更直接的了解线程池的执行流程,测试案例走起,核心线程数1,非核心线程数2,非核心线程空闲结束时间5秒,阻塞队列大小8:

  1. public class ThreadPoolExecutorTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, 2, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(8));
  4. for (int i = 1; i <=10; i++) {
  5. //创建线程类对象
  6. Test myTask = new Test(i);
  7. //开启线程
  8. pool.execute(myTask);
  9. //获取线程池中线程的相应参数
  10. System.out.println("任务"+ myTask.num + "进入线程池"+",线程池中线程数目场景1:" +pool.getPoolSize() + ",队列中等待执行的任务数目:"+pool.getQueue().size() + ",已执行完的任务数目:"+pool.getCompletedTaskCount());
  11. }
  12. Thread.sleep(2000);
  13. System.out.println("线程池中线程数目场景2:" +pool.getPoolSize() + ",队列中等待执行的任务数目:"+pool.getQueue().size() + ",已执行完的任务数目:"+pool.getCompletedTaskCount());
  14. Thread.sleep(5000);
  15. System.out.println("线程池中线程数目场景3:" +pool.getPoolSize() + ",队列中等待执行的任务数目:"+pool.getQueue().size() + ",已执行完的任务数目:"+pool.getCompletedTaskCount());
  16. Thread.sleep(5000);
  17. System.out.println("线程池中线程数目场景4:" +pool.getPoolSize() + ",队列中等待执行的任务数目:"+pool.getQueue().size() + ",已执行完的任务数目:"+pool.getCompletedTaskCount());
  18. pool.shutdown();
  19. System.out.println("线程池中线程数目场景5:" +pool.getPoolSize() + ",队列中等待执行的任务数目:"+pool.getQueue().size() + ",已执行完的任务数目:"+pool.getCompletedTaskCount());
  20. }
  21. static class Test implements Runnable {
  22. private int num;
  23. public Test(int num) {
  24. this.num = num;
  25. }
  26. @Override
  27. public void run() {
  28. try {
  29. Thread.sleep(1000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. System.out.println(Thread.currentThread().getName());
  34. }
  35. }
  36. }

打印结果:

  1. //任务1进来,创建一个核心线程,执行任务1
  2. 任务1进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:0,已执行完的任务数目:0
  3. 任务2进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:1,已执行完的任务数目:0
  4. 任务3进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:2,已执行完的任务数目:0
  5. 任务4进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:3,已执行完的任务数目:0
  6. 任务5进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:4,已执行完的任务数目:0
  7. 任务6进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:5,已执行完的任务数目:0
  8. 任务7进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:6,已执行完的任务数目:0
  9. 任务8进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:7,已执行完的任务数目:0
  10. //任务2-任务9共8个任务进入阻塞队列
  11. 任务9进入线程池,线程池中线程数目场景11,队列中等待执行的任务数目:8,已执行完的任务数目:0
  12. //阻塞队列已满,任务10进来,创建一个非核心线程去执行它,此时工作线程2
  13. 任务10进入线程池,线程池中线程数目场景12,队列中等待执行的任务数目:8,已执行完的任务数目:0
  14. // 如果将for循环10改成11,这里报异常,但前面10个任务照样进行
  15. pool-1-thread-2
  16. pool-1-thread-1
  17. // 队列中还有任务,工作线程为2
  18. 线程池中线程数目场景22,队列中等待执行的任务数目:6,已执行完的任务数目:2
  19. pool-1-thread-2
  20. pool-1-thread-1
  21. pool-1-thread-2
  22. pool-1-thread-1
  23. pool-1-thread-2
  24. pool-1-thread-1
  25. pool-1-thread-1
  26. pool-1-thread-2
  27. // 任务全部完成,非核心线程还未超过空闲时间,工作线程为2
  28. 线程池中线程数目场景32,队列中等待执行的任务数目:0,已执行完的任务数目:10
  29. // 任务全部完成,非核心线程超过空闲时间,工作线程为1
  30. 线程池中线程数目场景41,队列中等待执行的任务数目:0,已执行完的任务数目:10
  31. // 关闭线程池,工作线程为0
  32. 线程池中线程数目场景50,队列中等待执行的任务数目:0,已执行完的任务数目:10

非核心线程超时结束分析,getTask部分代码:

  1. /* 所有任务结束,此时工作线程大于核心线程wc > corePoolSize为true,
  2. 队列以非阻塞方法poll获取任务,队列为空,故r == null,timedOut = true,进入for下一次循环,
  3. 此时timed && timedOut为true,队列为空,执行compareAndDecrementWorkerCount方法,退出循环,线程结束*/
  4. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  5. if ((wc > maximumPoolSize || (timed && timedOut))
  6. && (wc > 1 || workQueue.isEmpty())) {
  7. if (compareAndDecrementWorkerCount(c))
  8. return null;
  9. continue;
  10. }
  11. try {
  12. Runnable r = timed ?
  13. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  14. workQueue.take();
  15. if (r != null)
  16. return r;
  17. timedOut = true;
  18. } catch (InterruptedException retry) {
  19. timedOut = false;
  20. }

 

Java线程池实现原理与源码解析(jdk1.8)

线程池源码分析

ThreadPoolExecutor源码分析

线程池运行原理分析

ThreadPoolExecutor解析三(线程池执行提交任务)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/944573
推荐阅读
  

闽ICP备14008679号