当前位置:   article > 正文

ThreadPoolExecutor源码分析_kwaiflow

kwaiflow

1.写在前面

本文整理自组内的技术分享,主要参考了以下文档。

  • JDK版本1.8.0_202
  • 美团点评的文章《Java线程池实现原理及其在美团业务中的实践》https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
    我们在代码中增加了详细的注释供大家理解,有不正确的地方欢迎大家多多指正~

2. 线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能确定在任意时刻有多少任务需要执行,有多少资源需要投入。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。本文中我们描述线程池是JDK中提供的ThreadPoolExecutor类。

当然,使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

这里也给出查看进程中线程的数量的几种方法:

  • cat /proc/$pid/status | grep Threads
  • pstree -p $pid
  • top -Hp $pid (该命令可以看到占用cpu最高的线程,然后将pid转成16进制在jstack中搜索,例如pid是209,转成16进制是0xd1,在jstack中搜nid=0xd1即可找到占用cpu最高线程的调用栈)

3. JDK中线程池的实现(ThreadPoolExecutor

3.1 总体设计

Java中的线程池核心实现类是ThreadPoolExecutor,我们基于JDK 1.8的源码来分析Java线程池的核心设计与实现。ThreadPoolExecutor的UML类图
如图1所示。
file

  • Executor接口:ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

  • ExecutorService接口:接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 抽象类AbstractExecutorService:将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。线程池部分功能的基本实现,例如submit方法,invokeAll/invokeAny等等,例如submit方法底层也是依赖Executor#execute() 方法实现的执行,其内部包装了一个FutureTask来获取执行结果。从这里也可以看出,AbstractExecutorService中实现的这几个方法是一个模板方法,或者理解为标准,其他的线程池想使用这些方法只需要实现Executor#execute() 方法即可。
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 实现类ThreadPoolExecutor:最基础的线程池实现版本,我们常用的线程池的核心实现就在这个类中。实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?

3.2 线程池的构造方法与基础属性

构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • corePoolSize: 核心线程数
  • maximumPoolSize: 最大线程数
  • keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
  • unit:上个参数的时间单位
  • workerQueue:任务队列
  • threadFactory:线程工厂,可以定义线程创建的方法
  • handler:拒绝策略,可以决定如果线程池溢出了这个任务要怎么做

状态管理

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量ctl维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在int中,高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 00000000000000000000000000011101
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 11100000000000000000000000000000 , -1 * 2^29 = -536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000000000000000000000000000 , 0 * 2 ^ 29 = 0
private static final int STOP       =  1 << COUNT_BITS; // 00100000000000000000000000000000 , 1 * 2 ^ 29 = 536870912
private static final int TIDYING    =  2 << COUNT_BITS; // 01000000000000000000000000000000 , 2 * 2 ^ 29 = 1073741824
private static final int TERMINATED =  3 << COUNT_BITS; // 01100000000000000000000000000000 , 3 * 2 ^ 29 = 1610612736
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 得到高3位,即线程池状态
private static int workerCountOf(int c)  { return c & CAPACITY; } // 得到低29位,即当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 通过状态和线程数生成ctl
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
为什么这样设计?

ctl这个变量使用AtomicInteger实现能够保证数值在各个线程的可见性,也就能保证线程状态和数量在线程池全局可见性。
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
位运算的速度比基本运算会快很多。
状态设计的参考

启示

设计状态时,状态编号的大小也可以表示含义,例如线程池的runState>=0时表示线程池正在关闭。Kwaiflow的Ti以及DR状态也可以参考这样的设计。采用这样的设计后,我们也可以用比较运算来判断ti或者dr是否处于终态。

ThreadPoolExecutor的运行状态有5种,分别为:
file

线程池生命周期转换如图3所示:
file

3.3 任务执行机制

任务调度

任务调度流程如图3所示。

  • 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  • 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  • 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  • 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

file

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // 当前线程数量小于corePoolSize,尝试新增线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // offer是非阻塞的,队列满直接返回false
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 任务加入队列后查看线程池状态是否已经不是RUNNING状态了,如果不是则拒绝任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 看下是否需要增加线程,因为之前的线程可能已经停止了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 线程池状态不是RUNNING或者向队列中添加任务失败,则尝试增加线程(例如corePoolSize是3,队列长度是5,maxPoolSize是10。当队列满了之后,会走这里)
    else if (!addWorker(command, false))
        // 线程池状态不是RUNNING或者线程数大于等于最大线程数(即CAPACITY,536870912)
        reject(command);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

任务缓冲

线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

使用不同的队列可以实现不一样的任务存取策略。在这里,我们再介绍下阻塞队列的成员:

file

ThreadPoolExecutor中使用了BlockingQueue的offer、remove、poll、take方法。可以看到,ThreadPoolExecutor向队列中放元素的时候并没有使用阻塞方法,从队列取元素的时候用到了阻塞和非阻塞的方法,在下面解析getTask方法时我们会再进行详细的说明。

public interface BlockingQueue<E> extends Queue<E> {
    // 队列满时抛异常
    boolean add(E e);

    // 队列满时返回false
    boolean offer(E e);

    // 队列满时阻塞
    void put(E e) throws InterruptedException;

    // 队列空时阻塞
    E take() throws InterruptedException;

    // 等待超时返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 不包含o则返回false
    boolean remove(Object o);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

任务申请

任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,流程如图5所示。getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

file

// 从队列获取任务,总体是先判断线程池状态和线程数量,然后再从队列获取任务
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out? 上一次获取是否超时了?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // rs >=STOP 或者 rs == SHUTDOWN并且队列是空,返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling? 是否允许worker线程被回收?如果允许核心线程超时或者线程数超过corePoolSize则允许
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 当前线程数大于maximumPoolSize或者允许线程回收并且上次获取超时了,并且当前线程数大于1或者队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            // 注意从队列获取任务这段代码是catch了InterruptedException的(BlockQueue的poll、take中都使用了lock.lockInterruptibly),在线程池停止时,会中断worker,
            // Worker设置interrupt之后从队列取数据的行为会终止,再回到循环开始如果判断线程池状态终止了,则返回null,随即Worker退出。
            try {
                // worker允许被停掉,则从队列里获取任务时最多等待keepAliveTime;否则,获取不到任务就一直阻塞
                Runnable r = timed ?
                    // 超时后poll会返回null
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 只有allowCoreThreadTimeOut是false并且 wc <= corePoolSize,timed才是false,也就是不允许核心线程被回收的前提下,核心线程会走take
                    // 这也就是一般情况下,我们不允许核心线程超时,如果不关闭线程池会发现main方法一直不退出的原因
                    workQueue.take();
                if (r != null)
                    return r;
                // r == null才会走到这里,也就是没走上面的workQueue.take()和return r,表示本轮获取任务超时了,因此将timeOut置为true
                timedOut = true;
            } catch (InterruptedException retry) {
                // workerQueue.poll 或者 workerQueue.take被中断,线程池停止时会发出中断
                timedOut = false;
            }
        }
    }


// ArrayBlockingQueue的poll方法,使用lock.lockInterruptibly()加锁,能够响应中断
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        // It's a coding style made popular by Doug Lea.
        // It's an extreme optimization that probably isn't necessary;
        // you can expect the JIT to make the same optimizations.
        // (you can try to check the machine code yourself!)
        // Nevertheless, copying to locals produces the smallest
        // bytecode, and for low-level code it's nice to write code
        // that's a little closer to the machine.
        // Doug Lea的极致优化
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。拒绝策略是一个接口,其设计如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • 1
  • 2
  • 3

用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

file

3.4 Worker线程管理

Worker线程

Worker实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。

  • thread是在调用构造方法时通过ThreadFactory来创建的线程,thread持有的任务就是this。
  • firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建或者队列满了添加非核心线程时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker执行任务的模型如图6所示:
file

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  • lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  • 如果正在执行任务,则不应该中断线程。
  • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  • 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
// 通过继承AQS实现不可重入的独占锁
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // state表示worker的状态. -1 是初始状态(禁止中断直到runWorker) 0表示未锁状态 1表示锁住状态
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

Worker线程增加

addWorker方法的功能是增加一个线程,addWorker方法有两个参数:firstTask、core。

  • firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
  • core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

总体来讲,addWorker分为两个步骤

  • 对线程池的状态和线程数等信息进行校验,不符合创建线程条件则返回false
  • 创建worker
// 增加线程,并使它运行,最后返回是否成功
// firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
// core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
// false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
        // 检查线程池状态和线程数量是否符合预期,不符合预期直接返回false
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1. 检查线程池状态
            // Check if queue empty only if necessary. RUNNING是最小的,rs >= SHUTDOWN表示线程池正在关闭
            // 这里写成(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) 更好理解些
            // 即线程池状态是 大于SHUTDOWN、线程池状态是SHUTDOWN并且firstTask不是空、线程池状态是SHUTDOWN并且队列是空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 2. 检查线程数量
                int wc = workerCountOf(c);
                // 线程数量大于最大值(2 ^ 29 - 1)或者 core为true且大于corePoolSize 或者 core为false且大于maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // WorkerCount增加成功则跳出循环往下走
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                   // 线程池状态改变了,重新执行大循环,即需要重新检查下线程池的状态 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop,线程池状态没变,执行小循环,即不需要检查线程池状态了,只需要看能否增加线程数即可
            }
        }
        
        // 线程池状态和线程数量检查通过,开始创建Worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // t等于null的话说明创建线程失败了
            if (t != null) {
                // Doug Lea的极致优化
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加一个worker
                        workers.add(w);
                      
                        // largestPoolSize记录线程池同时运行过的最多的线程数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // worker添加成功,启动线程。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // worker中的线程是null或者其他情况没将workerStarted置为true,则进行回滚
            if (! workerStarted)
                // workers.remove(w)、decrementWorkerCount()、tryTerminate();
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  • while循环不断地通过getTask()方法获取任务。
  • getTask()方法从阻塞队列中取任务。
  • 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  • 执行任务。
  • 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的构造函数里可以看到Worker将AQS的state初始化为-1,
        // 在tryRelease中直接赋值为0,所以在Worker加锁之前要先重置为0。
        // 为什么不直接从0开始呢,可以看到线程池的interruptIdleWorkers方法里会尝试给worker加锁然后interrupt 线程,这
        // 里加锁肯定是会失败的,这么做也是禁止worker刚创建就被interrupt。
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 核心线程addWorker时会设置task,此外都是从队列获取task
            while (task != null || (task = getTask()) != null) {
                // 加锁,防止interruptIdleWorkers中断正在执行任务的worker
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池处于STOP状态则要确保线程被中断,对应runStateAtLeast(ctl.get(), STOP)是true 并且 !wt.isInterrupted(),则wt.interrupt()
                // 否则,确保线程不被中断,也就是runStateAtLeast(ctl.get(), STOP)是false,Thread.interrupted()之后线程中断状态被置为false,再根据线程池状态判断是否要中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // beforeExecute抛异常的话,会跳出循环,并且completedAbruptly是true,这时不会处理task,并且会走到processWorkerExit杀掉当前线程,然后再把异常抛出去
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务,抛出异常的话会传递到afterExecute里,此时也会跳出循环杀掉当前的线程
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 抛异常也会跳出循环,杀掉当前的线程
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 从队列里没取到任务
            completedAbruptly = false;
        } finally {
            // task是空并且getTask返回的也是空则销毁worker
            // 对于核心线程,如果设置了allowCoreThreadTimeOut为true,则核心线程在keepAliveTime后还获取不到任务,会释放核心线程
            // 对于非核心线程,在keepAliveTime后还获取不到任务,会释放非核心线程
            processWorkerExit(w, completedAbruptly);
        }
    }


private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

  			// 尝试让线程池进入TERMINATED状态
        tryTerminate();

        int c = ctl.get();
        // 状态小于STOP说明还有未执行完的任务要处理
        if (runStateLessThan(c, STOP)) {
            // 看下Worker线程退出是不是因为执行过程中发生了异常
            if (!completedAbruptly) {
                // 没发生异常,如果allowCoreThreadTimeOut是false,则min是corePoolSize; 否则,队列非空则min是1,队列是空则min是0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 线程数大于min,则不需要增加worker
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 分配一个空的非核心worker去把剩余任务执行完
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

线程池关闭

shutdown方法
// shutdown方法是先设置状态再interrupt的,这和getTask里面的处理是吻合的,
// 先设置状态确保让worker先退出,避免interrupt的过程中线程又去获取任务。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 如果配置了security manager,则检查下caller是否有关闭线程的权限
            checkShutdownAccess();
            // 循环CAS将线程池状态置为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 只清理闲置的Worker(例如不在执行中且未被中断的非核心线程)
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

// 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 能获取到锁,说明线程是空闲状态,可以对该线程进行中断,使线程被安全地回收
                // 如果worker在执行runWorker,则会先加锁,那么这里的w.tryLock()就返回false了
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
shutdownNow方法
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 循环CAS将线程池状态置为STOP
            advanceRunState(STOP);
            // 把所有的worker都interrupt
            interruptWorkers();
            // 把任务队列中尚未处理的任务拿出来交给调用线程自行处理,
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }


private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/944592
推荐阅读
相关标签
  

闽ICP备14008679号