赞
踩
目录
在线程池running状态下,
非核心线程,在指定的空闲时间内,如果获取不到任务,就会被回收,走退出逻辑。
核心线程,如果 allowCoreThreadTimeOut = true,在空闲时间keepAliveTime内,获取不到任务,也会被回收。
核心线程,如果allowCoreThreadTimeOut = false,即使空闲,也不会回收。
如果调用了shutdown()方法,会中断所有空闲线程,去回收它们。
如果调用了shutdownNow()方法,会中断所有线程,去回收它们。
走到拒绝策略的情况:①线程池里的线程数达到了maximunPoolSize。②调用了shutdown或shutdowNow方法来关闭线程池,线程池的状态不是running。
调用shutdown方法后,处于shutdown状态的线程池,提交的任务会被拒绝。shutdown状态下,会先把任务队列中的任务消费完,任务队列为空后,线程池的线程才会依次退出。最后一个退出的线程会把线程池的状态改为tidying,然后调用 terminated方法,最后将线程池改为terminated状态。
shutdown方法将线程池改为shutdown状态,拒绝之后提交的任务。然后会遍历线程池中的所有线程,尝试获取线程的独占锁。如果获取失败,就什么也不做。如果拿到了某个线程的独占锁,说明这个线程处于空闲状态,就给这个线程一个中断信号(调用interrupt方法)。收到中断信号的线程会从waiting状态切换为runnable状态。这个线程一看线程池是shutdown状态了,就会执行退出相关的逻辑。shutdown方法会中断所有没有正在执行任务的线程,但会保证线程池以及任务队列中的任务都执行完毕。
shutdownNow方法将线程池改为stop状态,拒绝之后提交的任务。遍历线程池中的线程,不检查线程的独占锁,直接给线程一个中断信号(调用interrupt方法)。此时的线程可能是空闲的,可能是正在执行任务。 正在执行任务的线程,收到中断信号后,还是会继续执行这个task,除非task会响应中断。如果不响应中断,就什么也不会发生。处于空闲状态的线程收到中断信号后,一看当前线程池状态为stop,它就不管任务队列中还有没有task,直接就退出了。
- public class ThreadPoolExecutor extends AbstractExecutorService {
- //高3位:表示当前线程池运行状态 除去高3位之后的低位:表示当前线程池中所拥有的线程数量
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- //表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位。不直接写29,是为了防止在某个JDK版本中Integer的大小不是4个字节,是8个字节的情况。
- private static final int COUNT_BITS = Integer.SIZE - 3;
- //低COUNT_BITS位 所能表达的最大线程数值。 000 1111111111111...111(29个1) => 5亿多。
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
- // runState is stored in the high-order bits
- //111 00000000...00000(29个0) 转换成整数,其实是一个负数
- private static final int RUNNING = -1 << COUNT_BITS;
- //000 00000000...00000(29个0)
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- //001 0000000...00000(29个0)
- private static final int STOP = 1 << COUNT_BITS;
- //010 0000000...00000(29个0)
- private static final int TIDYING = 2 << COUNT_BITS;
- //011 0000000...00000(29个0)
- private static final int TERMINATED = 3 << COUNT_BITS;
-
- // Packing and unpacking ctl
- //获取当前线程池运行状态
- //~000 11111111111111...111(29个1) => 111 0000000000000...00000(29个0)
- //假设c == ctl = 111 00000000...000000111
- //111 000000000...000000111
- //111 000000000...000000000
- //111 000000000...000000000(一共32位)
- private static int runStateOf(int c) { return c & ~CAPACITY; }
-
- //获取当前线程池线程数量
- //假设c == ctl = 111 00000000...0000000111
- //111 000000000...000000111
- //000 111111111...111111111
- //000 000000000...000000111 => 7
- private static int workerCountOf(int c) { return c & CAPACITY; }
-
- //用在设置当前线程池ctl值时 会用到
- //rs 表示线程池状态 wc 表示当前线程池中worker(线程)数量
- //111 000000000000000000
- //000 000000000000000111
- //111 000000000000000111
- private static int ctlOf(int rs, int wc) { return rs | wc; }
-
- //比较当前线程池ctl所表示的状态,是否小于某个状态s
- //c = 111 000000000000000111 < 000 000000000000000000 == true
- //所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
-
- //比较当前线程池ctl所表示的状态,是否大于等于某个状态s
- private static boolean runStateAtLeast(int c, int s) {
- return c >= s;
- }
-
- //小于SHUTDOWN 的一定是RUNNING。 SHUTDOWN == 0
- private static boolean isRunning(int c) {
- return c < SHUTDOWN;
- }
-
- /**
- * Attempts to CAS-increment the workerCount field of ctl.
- */
- //使用CAS方式 让ctl值+1 ,成功返回true, 失败返回false
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
-
- /**
- * Attempts to CAS-decrement the workerCount field of ctl.
- */
- //使用CAS方式 让ctl值-1 ,成功返回true, 失败返回false
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
-
- /**
- * Decrements the workerCount field of ctl. This is called only on
- * abrupt termination of a thread (see processWorkerExit). Other
- * decrements are performed within getTask.
- */
- //将ctl值减1,这个方法一定成功
- private void decrementWorkerCount() {
- //这里会一直重试,直到成功为止。
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
-
- //任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue
- //workQueue 4种阻塞队列,ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue
- private final BlockingQueue<Runnable> workQueue;
-
- //线程池全局锁,它是一个可重入锁,增加worker、减少worker、修改线程池运行状态时 ,需要持有mainLock
- //同一时刻中,只有一个线程可以往线程池中 加线程 或者 减线程 或者 改线程池的状态,无法并发
- private final ReentrantLock mainLock = new ReentrantLock();
-
- /**
- * Set containing all worker threads in pool. Accessed only when
- * holding mainLock.
- */
- //线程池中真正存放 worker->thread 的地方。放工作线程的地方
- private final HashSet<Worker> workers = new HashSet<Worker>();
-
- /**
- * Wait condition to support awaitTermination
- */
- //当外部线程调用 awaitTermination() 方法时,外部线程会等待当前线程池状态为 Terminated 为止。阻塞的外部线程会在tryTerminate方法中被唤醒termination.signalAll();
- //等待是如何实现的? 就是将外部线程 封装成 waitNode 放入到 Condition 队列中了, waitNode.Thread 就是外部线程,会被park掉(处于WAITING状态)。
- //当线程池 状态 变为 Termination时,会去唤醒这些线程。通过 termination.signalAll() ,唤醒之后这些线程会进入到 阻塞队列,然后头结点会去抢占mainLock。
- //抢占到的线程,会继续执行awaitTermination() 后面的程序。这些线程最后,都会正常执行。
- //简单理解:termination.await() 会将线程阻塞在这。
- // termination.signalAll() 会将阻塞在这的线程依次唤醒
- private final Condition termination = mainLock.newCondition();
-
- /**
- * Tracks largest attained pool size. Accessed only under
- * mainLock.
- */
- //记录线程池生命周期内 线程数最大值
- private int largestPoolSize;
-
- /**
- * Counter for completed tasks. Updated only on termination of
- * worker threads. Accessed only under mainLock.
- */
- //记录线程池所完成任务总数 ,当worker退出时会将 worker完成的任务累积到completedTaskCount
- private long completedTaskCount;
- //创建线程时会使用 线程工厂,当我们使用 Executors.newFix... newCache... 创建线程池时,使用的是 DefaultThreadFactory
- //一般不建议使用Default线程池,推荐自己实现ThreadFactory,这样我们可以自定义命名规则,方便找出是哪块业务逻辑出现了问题。
- //因为Default线程池工厂的命名规则不好用,万一出现了bug,无法通过命名来判断是哪条业务线上出现了问题
- //DefaultThreadFactory implements ThreadFactory 在Executors 工厂类中
- private volatile ThreadFactory threadFactory;
-
- /**
- * Handler called when saturated or shutdown in execute.
- */
- //拒绝策略,juc包提供了4中方式,默认采用 Abort..抛出异常的方式。AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy
- private volatile RejectedExecutionHandler handler;
-
- /**
- * Timeout in nanoseconds for idle threads waiting for work.
- * Threads use this timeout when there are more than corePoolSize
- * present or if allowCoreThreadTimeOut. Otherwise they wait
- * forever for new work.
- */
- //空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出部分会被超时。
- //allowCoreThreadTimeOut == true 核心数量内的线程 空闲时 也会被回收。
- private volatile long keepAliveTime;
-
- /**
- * If false (default), core threads stay alive even when idle.
- * If true, core threads use keepAliveTime to time out waiting
- * for work.
- */
- //控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。
- //false时,线程池会维护线程数量为corePoolSize个
- //true时,核心线程获取任务超时 且 (当前线程数量大于1 || 工作队列为空 )时,核心线程会被回收
- private volatile boolean allowCoreThreadTimeOut;
-
-
- //核心线程数量限制。
- private volatile int corePoolSize;
-
- //线程池最大线程数量限制。
- private volatile int maximumPoolSize;
-
- /**
- * The default rejected execution handler
- */
- //缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式。
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
-
- ......
-
- }
- /** 内部类 Worker*/
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- //Worker采用了AQS的独占模式
- //独占模式:两个重要属性 state 和 ExclusiveOwnerThread
- //state:0时表示未被占用,=0时,才会尝试去抢锁; > 0时表示被占用,加锁了; < 0 时 表示初始状态,这种情况下不能被抢锁。
- //ExclusiveOwnerThread:表示独占锁的线程。谁抢到锁了,ExclusiveOwnerThread(独占者线程)就指向谁
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- */
- private static final long serialVersionUID = 6138294804551838833L;
-
- /** Thread this worker is running in. Null if factory fails. */
- //worker内部封装的工作线程
- final Thread thread;
- /** Initial task to run. Possibly null. */
- //假设firstTask不为空,那么当worker启动(内部的线程启动)后会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。
- Runnable firstTask;
-
- /** Per-thread task counter */
- //记录当前worker所完成任务数量。
- volatile long completedTasks;
-
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- //firstTask可以为null。为null 启动后会到queue中获取任务。
- Worker(Runnable firstTask) {
- //设置AQS独占模式为初始化中状态,这个时候 不能被抢占锁。
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- //使用线程工厂创建了一个线程,并且将当前worker 指定为 Runnable,也就是说当thread启动的时候,会以worker.run()为入口。
- //把当前Worker对象 传入了Thread里面。这个Worker对象会被放到 Thread.java 的private Runnable target; 属性中。
- //Worker类继承了AbstractQueuedSynchronizer,实现了Runnable接口
- //thread.start()后,会调用start0()方法,start0()是一个native方法,它会在操作系统底层申请资源,开启一个真正的线程(Windows)或者轻量级的进程 (Linux)
- //这个线程启动后,就会调用 thread.run() ,run()方法会调用 target属性的 run()方法,这其实就是 worker.run()。就会来到 runWorker方法中
- this.thread = getThreadFactory().newThread(this);
- }
-
- /** Delegates main run loop to outer runWorker */
- //当worker启动时,会执行run()
- public void run() {
- //ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入。
- runWorker(this);
- }
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
- //判断当前worker的独占锁是否被独占。
- //0 表示未被占用
- //1 表示已占用
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- //尝试去占用worker的独占锁
- //返回值 表示是否抢占成功
- protected boolean tryAcquire(int unused) {
- //使用CAS修改 AQS中的 state ,期望值为0(0时表示未被占用),修改成功表示当前线程抢占成功
- //那么则设置 ExclusiveOwnerThread 为当前线程。
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- //外部不会直接调用这个方法 这个方法是AQS 内调用的,外部调用unlock时 ,unlock->AQS.release() ->tryRelease()
- protected boolean tryRelease(int unused) {
- //将当前独占的线程置为null
- setExclusiveOwnerThread(null);
- //state 改为 0
- setState(0);
- return true;
- }
- //加锁,加锁失败时,会阻塞当前线程,直到获取到锁位置。
- public void lock() { acquire(1); }
- //尝试去加锁,如果当前锁是未被持有状态,那么加锁成功后 会返回true,否则不会阻塞当前线程,直接返回false.
- public boolean tryLock() { return tryAcquire(1); }
- //一般情况下,咱们调用unlock 要保证 当前线程是持有锁的。
- //特殊情况,当worker的state == -1 时,调用unlock 表示初始化state 设置state == 0
- //启动worker之前会先调用unlock()这个方法。会强制刷新ExclusiveOwnerThread == null State==0
- //worker对象.unlock-> AQS.release -> worker对象.tryRelease。因为方法重写
- public void unlock() { release(1); }
- //就是返回当前worker的lock是否被占用。
- public boolean isLocked() { return isHeldExclusively(); }
-
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
- /**构造方法 */
- public ThreadPoolExecutor(int corePoolSize,//核心线程数限制
- int maximumPoolSize,//最大线程限制
- long keepAliveTime,//空闲线程存活时间
- TimeUnit unit,//时间单位 seconds nano..
- BlockingQueue<Runnable> workQueue,//任务队列
- ThreadFactory threadFactory,//线程工厂
- RejectedExecutionHandler handler/*拒绝策略*/) {
- //判断参数是否越界
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
-
- //工作队列 和 线程工厂 和 拒绝策略 都不能为空。
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
-
-
- this.acc = System.getSecurityManager() == null ?
- null :
- AccessController.getContext();
-
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
- //command 可以是普通的Runnable 实现类,也可以是 FutureTask
- /*execute 方法就是为了提交任务到线程池,创建线程去执行它。execute方法的核心是addWorker方法、workQueue.offer方法、reject方法
- 情况1.当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,去执行任务。
- 如果addWorker成功,则返回。
- 如果失败了,就有2种情况,
- 1.1.线程池running状态下,存在并发现象。多个线程同时调用execute方法,其他线程创建worker后,线程数量又达到核心线程数了。
- 尝试将task放入任务队列中。
- 1.1.1 如果放入队列成功,就获取ctl值,再次检查线程池状态。
- 如果任务提交到工作队列后,调用了shutdown、shutdownNow,就需要remove刚刚提交的任务。
- 如果提交的任务还没有被线程池中的线程消费,就remove成功,将任务从工作队列中删除,再执行拒绝策略。
- 如果提交之后,在shutdown() shutdownNow()之前,就任务被线程池中的线程 给处理,就remove失败。
-
- 如果任务提交后,线程池还是running状态,就会有一个担保机制:
- 当前线程池是running,但是线程池中的存活线程数量是0,就会调用addWorker方法,创建一个线程去执行任务。
- 1.1.2 如果放入队列失败,则说明使用的是有界队列,且队列满了。
- 此时就看maximumPoolSize了,如果线程数大于等于maximum,就走拒绝策略。否则,就创建一个新的Worker去执行任务。
- 1.2.当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, core=true|false) 一定会失败。
- 此时,会采用拒绝策略,拒绝提交任务了。
- 情况2.当前线程数量大于等于核心线程数,分2类情况:
- 2.1.线程池是running状态,尝试将task放入工作队列中,后面的情况之前说过,就不说了。
- 2.2.线程池不是running状态,会采用拒绝策略,拒绝提交任务了。
- * */
- public void execute(Runnable command) {
- //非空判断..
- if (command == null)
- throw new NullPointerException();
- //获取ctl最新值赋值给c,ctl :高3位 表示线程池状态,低位表示当前线程池线程数量。
- int c = ctl.get();
- //workerCountOf(c) 获取出当前线程池中的线程数量
- //条件成立:表示当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新的线程。
- if (workerCountOf(c) < corePoolSize) {
- //addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask
- //core == true 表示采用核心线程数量限制 false表示采用 maximumPoolSize
- if (addWorker(command, true))
- //创建成功后,直接返回。addWorker方法里面会启动新创建的worker,将firstTask执行。
- return;
- //执行到这条语句,说明addWorker一定是失败了...
- //有几种可能呢??
- //1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize成立后,
- //其它线程可能也成立了,并且向线程池中创建了worker。这个时候线程池中的核心线程数已经达到,所以...
- //2.当前线程池状态发生改变了。 RUNNING SHUTDOWN STOP TIDYING TERMINATED
- //当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, true|false) 一定会失败。
- //SHUTDOWN 状态下,也有可能创建成功。前提 firstTask == null 而且当前 queue 不为空。特殊情况。
- c = ctl.get();
- }
- //执行到这里有几种情况?
- //1.当前线程数量已经达到corePoolSize
- //2.addWorker失败..
- //条件成立:说明当前线程池处于running状态,则尝试将 task 放入到workQueue中。
- if (isRunning(c) && workQueue.offer(command)) {
- //执行到这里,说明offer提交任务成功了..
- //再次获取ctl保存到recheck。
- int recheck = ctl.get();
- //条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
- //这种情况 需要把刚刚提交的任务删除掉。
- //条件二:remove(command) 有可能成功,也有可能失败
- //成功:提交之后,线程池中的线程还未消费(处理)
- //失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理。就返回了。
- if (! isRunning(recheck) && remove(command))
- //提交之后线程池状态为 非running 且 任务出队成功,走个拒绝策略。
- reject(command);
- //有几种情况会到这里?
- //1.当前线程池是running状态(这个概率最大)
- //2.线程池状态是非running状态 但是remove提交的任务失败.
- //担心 当前线程池是running状态,但是线程池中的存活线程数量是0,这个时候,如果是0的话,会很尴尬,任务没线程去跑了,
- //这里其实是一个担保机制,保证线程池在running状态下,最起码得有一个线程在工作。
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- //执行到这里,有几种情况?
- //1.offer失败,使用的是有界队列,队列已经满了
- //2.当前线程池是非running状态
-
- //1.offer失败,需要做什么? 说明当前queue 满了!这个时候 如果当前线程数量尚未达到maximumPoolSize的话,会创建新的worker直接执行command
- //假设当前线程数量达到maximumPoolSize的话,即使线程池running,addWorker也会失败,也走拒绝策略。
- //2.线程池状态为非running状态,这个时候因为 command != null addWorker 一定是返回false。
- else if (!addWorker(command, false))
- reject(command);
- }
- /**
- addWorker方法逻辑,线程池中的线程不分核心,不核心,都是线程,只不过创建的时候,会根据线程池数量阈值(corePoolSize/maximumPoolSize),去执行不同的判断逻辑。
- 1.retry自旋中。判断 当前线程池状态 是否允许创建worker(添加线程)。
- 1.1.不允许创建worker的情况为
- 一、线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION);
- 二、rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null;
- 1.2.如果不是这些情况,就开始内部自旋,获取创建线程的令牌。
- (所谓的令牌就是看能不能通过CAS的方式将ctl +1,这代表增加一个worker)
- 内部自旋中,会根据形参core,来判断当前线程池中线程数量 是否 大于等于corePoolSize/maximumPoolSize。如果达到了指定限制,就创建worker失败,return false。
- 如果没达到限制,就通过CAS的方式将线程数量 +1.
- 1.2.1. CAS成功,就跳出retry自旋。
- 1.2.2. CAS失败,重新检查线程池的状态,会跳到对应的自旋中。发生竞争/其他线程调用了shutdown、shutdownNow导致线程池状态改变了,都会使CAS失败。
- 2.已经通过CAS的方式将ctl +1了,线程数量加1。开始尝试创建worker,并启动对应的thread。
- 2.1.先获取全局锁 mainLock,
- 2.2.检查当前线程池的状态。确保添加worker 发生在running状态 或 (shutdown状态 且 firstTask为空)下。
- 2.3.检查当前worker的thread 是否 已经启动了。如果启动了,就抛出异常。
- 2.4.将worker添加到workers(HashSet)中。
- 2.5.判断一下最大线程数量是否突破历史最大值?largestPoolSize
- 2.6.将workerAdded 标志设为true。释放全局锁。将当前worker对应的线程 start。将workerStarted设为 true;
- 2.7.如果thread启动失败,就要做一些清理工作。
- 2.7.1.获取全局锁
- 2.7.2.在workers中remove worker
- 2.7.3.通过CAS的方式将线程数量-1
- 2.7.4.释放全局锁。
- 方法返回值总结:
- true 表示创建worker成功,把这个worker放入workers(HashSet)中了,且线程启动。
- false 表示创建失败。
- 1.线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
- 2.rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
- 3.当前线程池已经达到指定指标(coprePoolSize 或者 maximumPoolSize)
- 4.threadFactory 创建的线程是null
-
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- //自旋 判断当前线程池状态是否允许创建线程的事情。
- retry:
- for (;;) {
- //获取当前ctl值保存到c
- int c = ctl.get();
- //获取当前线程池运行状态 保存到rs长
- int rs = runStateOf(c);
- //条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是running状态
- //条件二:前置条件,当前的线程池状态不是running状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
- //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
- //表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的。 & 当前任务队列不是空
- //排除掉这种情况,当前线程池是SHUTDOWN状态,但是队列里面还有任务尚未处理完,这个时候是允许添加worker,但是不允许再次提交task。
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- //什么情况下回返回false?
- //线程池状态 rs > SHUTDOWN
- //rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
- return false;
- //上面这些代码,就是判断 当前线程池状态 是否允许添加线程。
-
- //内部自旋 获取创建线程令牌的过程。
- // 所谓的创建线程的令牌,就是看能不能通过CAS的方式将ctl +1,代表增加一个worker
- for (;;) {
- //获取当前线程池中线程数量 保存到wc中
- int wc = workerCountOf(c);
- //条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
- //条件二:wc >= (core ? corePoolSize : maximumPoolSize)
- //core == true ,判断当前线程数量是否>=corePoolSize,会拿核心线程数量做限制。
- //core == false,判断当前线程数量是否>=maximumPoolSize,会拿最大线程数量做限制。
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- //执行到这里,说明当前无法添加线程了,已经达到指定限制了
- return false;
- //条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌。
- //条件失败:说明可能有其它线程,修改过ctl这个值了。
- //可能发生过什么事?
- //1.其它线程execute() 申请过令牌了,在这之前。导致CAS失败
- //2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生变化了,咱们知道 ctl 高3位表示状态
- //状态改变后,cas也会失败。
- if (compareAndIncrementWorkerCount(c))
- //进入到这里面,一定是cas成功啦!申请到令牌了
- //直接跳出了 retry 外部这个for自旋。
- break retry;
- //CAS失败,没有成功的申请到令牌
- //获取最新的ctl值
- c = ctl.get(); // Re-read ctl
- //判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown. shutdownNow 会导致状态变化。
- if (runStateOf(c) != rs)
- //状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程。
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- //现在可以允许尝试创建worker了
- //表示创建的worker是否已经启动,false未启动 true启动
- boolean workerStarted = false;
- //表示创建的worker是否添加到池子中了 默认false 未添加 true是添加。
- boolean workerAdded = false;
- //w表示后面创建worker的一个引用。
- Worker w = null;
- try {
- //创建Worker,执行完后,线程应该是已经创建好了。
- w = new Worker(firstTask);
- //将新创建的worker节点的线程 赋值给 t
- final Thread t = w.thread;
- //为什么要做 t != null 这个判断?
- //为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现。
- //万一哪个 小哥哥 脑子一热,有bug,创建出来的线程 是null、、
- //Doug lea考虑的比较全面。肯定会防止他自己的程序报空指针,所以这里一定要做!
- if (t != null) {
- //将全局锁的引用保存到mainLock
- final ReentrantLock mainLock = this.mainLock;
- //持有全局锁,可能会阻塞,直到获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持锁。
- mainLock.lock();
- //从这里加锁之后,其它线程 是无法修改当前线程池状态的。
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- //获取最新线程池运行状态保存到rs中
- int rs = runStateOf(ctl.get());
- //条件一:rs < SHUTDOWN 成立:最正常状态,当前线程池为RUNNING状态.
- //条件二:前置条件:当前线程池状态不是RUNNING状态。
- //(rs == SHUTDOWN && firstTask == null) 当前状态为SHUTDOWN状态且firstTask为空。其实判断的就是SHUTDOWN状态下的特殊情况,
- //只不过这里不再判断队列是否为空了
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- //t.isAlive() 当线程start后,线程isAlive会返回true。
- //防止脑子发热的程序员,ThreadFactory创建线程返回给外部之前,将线程start了。。
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- //将咱们创建的worker添加到线程池中。
- workers.add(w);
- //获取最新当前线程池线程数量
- int s = workers.size();
- //条件成立:说明当前线程数量是一个新高。更新largestPoolSize
- if (s > largestPoolSize)
- largestPoolSize = s;
- //表示线程已经追加进线程池中了。
- //其实线程池中的线程不分核心,不核心,都是线程,
- // 只不过创建的时候,会根据线程池数量阈值(corePoolSize/maximumPoolSize),去执行不同的判断逻辑。
- workerAdded = true;
- }
- } finally {
- //释放线程池全局锁。
- mainLock.unlock();
- }
- //条件成立:说明 添加worker成功
- //条件失败:说明线程池在lock之前,线程池状态发生了变化,导致添加失败。
- if (workerAdded) {
- //成功后,则将创建的worker启动,线程启动。
- t.start();
- //启动标记设置为true
- workerStarted = true;
- }
- }
- }finally {
- //条件成立:! workerStarted 说明启动失败,需要做清理工作。
- if (! workerStarted)
- //失败时做什么清理工作?
- //1.释放令牌,就是通过CAS的方式将 ctl -1
- //2.将当前worker清理出workers集合
- addWorkerFailed(w);
- }
- //返回新创建的线程是否启动。
- return workerStarted;
- }
- /**
- 从整体来看,workers.size() 与 ctl低位表示的线程数量 是一致的。
- runWorker方法的调用逻辑是
- 在addWorker里面启动thread后,w.thread.start -> start0方法 -> 操作系统中开启真正的线程
- -> thread.run方法 ->worker.run方法 -> ThreadPoolExecutor.runWorker方法
- runWorker的大体逻辑:
- 1.调用unlock方法,初始化state == 0 和 exclusiveOwnerThread ==null
- 2.while(当前worker.firstTask不为空 || getTask()的返回值不为空,当前线程从工作队列中成功获取到了任务) 时,即将开始执行任务。getTask()这个方法是会阻塞的。
- 2.1.当前线程获取worker的独占锁
- 2.2.如果线程池目前处于STOP/TIDYING/TERMINATED且当前线程中断标记位为false,则给当前线程一个中断信号
- 2.3.调用钩子方法 beforeExecute(wt, task)
- 2.4.调用task.run()方法。
- task 可能是FutureTask(调用submit方法提交的任务) 也可能是 普通的Runnable接口实现类(调用execute方法提交的任务)
- 2.5.调用钩子方法afterExecute(task, thrown)
- 2.6.在finally块中,更新worker完成任务数量(包括task.run()抛出异常的任务)。w.completedTasks++; 并 释放掉worker的独占锁。
- 此时,正常情况下,会再次回到getTask()从工作队列中获取任务 while(getTask...)
- 但也可能,task.run()时内部抛出异常了。注意:try/catch仅仅捕捉异常,是不会跳出while循环的。但用了throw关键字,就会跳出去。
- 3.如果getTask方法返回空,说明当前线程应该执行退出逻辑了。
- 4.当前线程可能是正常退出 completedAbruptly == false;也可能异常退出 completedAbruptly == true。
- worker退出的逻辑在processWorkerExit方法里
- 对于正常退出,
- 1.获取全局锁,将当前worker完成的task数量,汇总到线程池的completedTaskCount;将worker从池子中移除
- 2.释放全局锁。调用 tryTerminate()方法
- 3.如果当前线程池为running 或 (shutdown 但工作队列不空),
- 3.1.根据allowCoreThreadTimeOut,看看核心线程是否支持超时机制,
- 以此来计算线程池需要维持的最低线程数量。有3种可能,0,1,corePoolSize。
- 如果当前线程数量 大于等于 最低值,则return。
- 如果当前线程数量 小于 最低值,就会 addWorker(null, false)
- 对于异常退出,
- 1.通过CAS的方式,将ctl -1.
- 2.获取全局锁,将当前worker完成的task数量,汇总到线程池的completedTaskCount;将worker从池子中移除
- 3.释放全局锁。调用 tryTerminate()方法
- 4.如果当前线程池状态为 RUNNING 或 (shutdown 但工作队列不空),就需要创建一个新worker顶上去,addWorker(null, false)
- */
- final void runWorker(Worker w) {
- //wt == w.thread
- Thread wt = Thread.currentThread();
- //将初始执行task赋值给task
- Runnable task = w.firstTask;
- //清空当前w.firstTask引用
- w.firstTask = null;
- //这里为什么先调用unlock? 就是为了初始化worker state == 0 和 exclusiveOwnerThread ==null
- w.unlock();
- //是否是突然退出,true->发生异常了,当前线程是突然退出,回头需要做一些处理
- //false->正常退出。
- boolean completedAbruptly = true;
-
- try {
- //条件一:task != null 指的就是firstTask是不是null,如果不是null,直接执行循环体里面。
- //条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask这个方法是一个会阻塞线程的方法
- //getTask如果返回null,当前线程需要执行结束逻辑。
- while (task != null || (task = getTask()) != null) {
- //worker设置独占锁 为当前线程
- //为什么要设置独占锁呢?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作。
- //调用shutdown方法时,会去判断一下能不能拿到当前worker的独占锁,拿不到就去检查下一个worker。如果拿到了,就会给对应的线程一个中断信号。
- //线程收到中断信号后,会被唤醒。
- w.lock();
- //条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池目前处于STOP/TIDYING/TERMINATED 此时线程一定要给它一个中断信号
- //中断是线程的一个标记位。如果程序是响应中断的话,就会去检查这个标记位,然后做一些相关的逻辑。如果程序不响应中断,那这个中断信号,就没用。
- //条件一成立:runStateAtLeast(ctl.get(), STOP)&& !wt.isInterrupted()
- //上面如果成立:说明当前线程池状态是>=STOP 且 当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断。
-
- //假设:runStateAtLeast(ctl.get(), STOP) == false
- // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) 在干吗呢?
- // Thread.interrupted() 获取当前中断状态,且设置中断位为false。连续调用两次,这个interrupted()方法 第二次一定是返回false.
- // runStateAtLeast(ctl.get(), STOP) 大概率这里还是false.
- // 其实它在强制刷新当前线程的中断标记位 false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位 设置为了 true,且没有处理
- // 这里一定要强制刷新一下。不会再影响到后面的task了。
- //假设:Thread.interrupted() == true 且 runStateAtLeast(ctl.get(), STOP)) == true
- //这种情况有发生几率么?
- //有可能,因为外部线程在 第一次 (runStateAtLeast(ctl.get(), STOP) == false 后,有机会调用shutdown 、shutdownNow方法,将线程池状态修改
- //这个时候,也会将当前线程的中断标记位 再次设置回 中断状态。
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- //钩子方法,留给子类实现的
- beforeExecute(wt, task);
- //表示异常情况,如果thrown不为空,表示 task运行过程中 向上层抛出异常了。
- Throwable thrown = null;
- try {
- //task 可能是FutureTask(调用submit方法提交的任务) 也可能是 普通的Runnable接口实现类(调用execute方法提交的任务)。
- //如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。这个不清楚,请看上一期,在b站。
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- //钩子方法,留给子类实现的
- afterExecute(task, thrown);
- }
- } finally {
- //将局部变量task置为Null
- task = null;
- //更新worker完成任务数量
- w.completedTasks++;
-
- //worker处理完一个任务后,会释放掉独占锁
- //1.正常情况下,会再次回到getTask()那里获取任务 while(getTask...)
- //2.task.run()时内部抛出异常了..
- w.unlock();
- }
- }
- //什么情况下,会来到这里?
- //getTask()方法返回null时,说明当前线程应该执行退出逻辑了。
- completedAbruptly = false;
- }finally {
-
- //task.run()内部抛出异常时,直接从 w.unlock() 那里 跳到这一行。
- //正常退出 completedAbruptly == false
- //异常退出 completedAbruptly == true
- processWorkerExit(w, completedAbruptly);
- }
- }
- /**
- getTask方法是用来给线程池中的线程从工作队列中获取任务的。
- getTask方法的大体逻辑:
- 开启for自旋
- 1.获取ctl值、当前线程池状态。
- 如果状态>= STOP || 状态为SHUTDOWN 且 工作队列为空,则通过CAS的方式将ctl -1。CAS一定成功。返回null。
- 2.获取线程数量wc,判断当前线程获取task 是否支持超时机制,timed。
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- 为什么这样写?因为当前线程可能是核心线程,可能是非核心线程。核心、非核心都是worker
- 核心线程对应 allowCoreThreadTimeOut。非核心对应wc > corePoolSize
- timed为true 表示支持超时机制,使用queue.poll(xxx,xxx);获取任务。如果超时了,那线程拿到的Runnable 为空。下次自旋,当前线程就可能会被回收。
- timed为false 表示不支持超时机制的,当前线程会使用 queue.take();获取任务。无限期的阻塞。
- 2.1.如果线程池中的线程数量 超过 最大限制maximumPoolSize时,通过CAS的方式让ctl-1(线程数量-1)。即将回收当前worker,当前线程要退出。
- CAS成功则返回null。CAS失败(可能是竞争,可能是状态变了),则再次自旋。
- 2.2.如果当前线程 获取任务超时,且当前线程支持超时机制,
- 此时,如果线程数量 大于1 或 工作队列为空,通过CAS的方式让ctl-1(线程数量-1)。即将回收当前worker,当前线程要退出。
- 3.获取任务。
- 如果线程拿到的Runnable不为空,则返回。
- 如果为空,说明当前线程超时了。自旋。
- 什么情况下会返回null?
- 1.rs >= STOP 成立说明:当前的状态最低也是STOP状态,
- 2. SHUTDOWN 且 工作队列为空
- 3.线程池中的线程数量 超过 最大限制maximumPoolSize时,会有一部分线程返回Null
- 4.线程池中的线程数超过corePoolSize时,会有一部分线程 超时后,返回null。
- */
- private Runnable getTask() {
- //表示当前线程获取任务是否超时 默认false true表示已超时
- boolean timedOut = false;
- //自旋
- for (;;) {
- //获取最新ctl值保存到c中。
- int c = ctl.get();
- //获取线程池当前运行状态
- int rs = runStateOf(c);
-
- //条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING状态,可能是 SHUTDOWN/STOP....
- //条件二:(rs >= STOP || workQueue.isEmpty())
- //2.1:rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
- //2.2:前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()条件成立:说明当前线程池状态为SHUTDOWN状态 且 任务队列已空,此时一定返回null。
- //返回null,runWorker方法就会将返回Null的线程执行线程退出线程池的逻辑。
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- //使用CAS+死循环的方式让 ctl值 -1
- decrementWorkerCount();
- return null;
- }
- //执行到这里,有几种情况?
- //1.线程池是RUNNING状态
- //2.线程池是SHUTDOWN状态 但是队列还未空,此时可以创建线程。
-
- //获取线程池中的线程数量
- int wc = workerCountOf(c);
- //timed == true 表示当前这个线程 获取 task 时 是支持超时机制的,使用queue.poll(xxx,xxx); 当获取task超时的情况下,下一次自旋就可能返回null了。
- //timed == false 表示当前这个线程 获取 task 时 是不支持超时机制的,当前线程会使用 queue.take();
-
- //情况1:allowCoreThreadTimeOut == true 表示核心线程数量内的线程 也可以被回收。
- //所有线程 都是使用queue.poll(xxx,xxx) 超时机制这种方式获取task.
- //情况2:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程。
- //wc > corePoolSize
- //条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程,都是用poll 支持超时的方式去获取任务,
- //这样,就会可能有一部分线程获取不到任务,获取不到任务 返回Null,然后..runWorker会执行线程退出逻辑。
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- //条件一:(wc > maximumPoolSize || (timed && timedOut))
- //1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize()方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
- //1.2: (timed && timedOut) 条件成立:前置条件,当前线程使用 poll方式获取task。上一次循环时 使用poll方式获取任务时,超时了
- //条件一 为true 表示 线程可以被回收,达到回收标准,当确实需要回收时再回收。
-
- //条件二:(wc > 1 || workQueue.isEmpty())
- //2.1: wc > 1 条件成立,说明当前线程池中还有其他线程,当前线程可以直接回收,返回null
- //2.2: workQueue.isEmpty() 前置条件 wc == 1, 条件成立:说明当前任务队列 已经空了,最后一个线程,也可以放心的退出。
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- //使用CAS机制 将 ctl值 -1 ,减1成功的线程,返回null
- //CAS成功的,返回Null
- //CAS失败? 为什么会CAS失败?
- //1.其它线程先你一步退出了
- //2.线程池状态发生变化了。
- if (compareAndDecrementWorkerCount(c))
- return null;
- //再次自旋时,timed有可能就是false了,因为当前线程cas失败,很有可能是因为其它线程成功退出导致的,再次咨询时
- //检查发现,当前线程 就可能属于 不需要回收范围内了。
- continue;
- }
- try {
- //获取任务的逻辑
- //1.timed为true,表示前线程获取 task 时,是支持超时机制的。如果超时了,那线程拿到的Runnable 为空。
- // 下次自旋,当前线程就可能会被回收。
- //2.timed为false,表示当前线程是不支持超时机制的,就会调用take()方法,无限期的阻塞。
-
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
-
- //条件成立:返回任务
- if (r != null)
- return r;
-
- //说明当前线程超时了...
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- public void shutdown() {
- shutdown方法的大体逻辑
- 1.获取全局锁mainLock。做权限判断checkShutdownAccess。假设有权限:
- 2.设置线程池状态为SHUTDOWN
- 3.中断所有空闲线程,interruptIdleWorkers方法。
- 如果worker.thread正在执行任务,就不管。
- 如果worker.thread在getTask中处于阻塞状态,则给一个中断信号,去执行退出逻辑。
- 4.调用钩子方法onShutdown()
- 5.释放全局锁
- 6.调用tryTerminate()方法
- }
- public List<Runnable> shutdownNow() {
- //返回值引用
- List<Runnable> tasks;
- 1.获取线程池全局锁mainLock。做权限判断checkShutdownAccess。假设有权限:
- 2.设置线程池状态为STOP
- 3.中断线程池中所有线程,interruptWorkers方法。如果worker内的thread 是启动状态,则给它一个中断信号
- 4.导出未处理的tasks列表
- 5.调用tryTerminate()方法
- 6.返回当前任务队列中未处理的tasks列表
- }
- /**
- 尝试终止
- */
- final void tryTerminate() {
- for (;;) {
- 获取最新的ctl值
- if(
- 线程池状态为running
- || 状态大于等于 TIDYING
- || 状态为 SHUTDOWN && 工作队列不空
- ){
- return;
- }
- //如果线程池状态为STOP 或 SHUTDOWN 且 队列已经空了,线程唤醒后,都会执行退出逻辑。
- // 线程在queue.poll()/.take()处唤醒后,先在getTask中ctl-1 -> runWorker
- // -> processWorkerExit中 移除worker 并调用tryTerminate方法,唤醒下一空闲线程。
- if (当前线程池中的线程数量 > 0) { // Eligible to terminate
- //中断一个空闲线程。
- //空闲线程,在哪空闲呢? 在getTask方法中断queue.take() | queue.poll() 处阻塞
- //1.唤醒后的线程 会在getTask()方法返回null
- //2.执行退出逻辑(processWorkerExit)的时候会再次调用tryTerminate() 唤醒下一个空闲线程
- //3.因为线程池状态是 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了) 最终调用addWorker时,会失败。
- //最终空闲线程都会在这里退出,非空闲线程 当执行完当前task时,也会调用tryTerminate方法,有可能会走到这里。
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- //workerCountOf(c) == 0时,代表当前线程是 最后一个要退出的线程
- 获取全局锁mainLock
- 通过CAS的方式,将线程池状态设为TIDYING
- 调用钩子方法terminated()
- 将线程池状态设为TERMINATED
- 唤醒调用 awaitTermination()而阻塞的 外部线程。termination.signalAll()
- 释放全局锁
- }
- }
本文图片来源于小刘老师(哔哩哔哩:小刘讲源码)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。