当前位置:   article > 正文

ThreadPoolExecutor 源码解析_threadpoolexecutor源码分析

threadpoolexecutor源码分析

ThreadPoolExecutor 源码解析



前言

ThreadPoolExecutor 结构图如下:

在这里插入图片描述


一、字段分析

  • private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)):原子整型,高三位:线程池状态。低29位:Worker数量,即当前线程池中正在执行任务的线程数。todo

  • 线程状态字段:

    • private static final int RUNNING = -1 << COUNT_BITS:线程池正在运行,可以接收新任务并处理已有任务。
    • private static final int SHUTDOWN = 0 << COUNT_BITS:线程池处于关闭状态,不再接受新任务,但会继续处理已有任务,直到任务队列为空。
    • private static final int STOP = 1 << COUNT_BITS:线程池立即停止,正在执行的任务会被中断,尚未执行的任务会被移出队列。
    • private static final int TIDYING = 2 << COUNT_BITS:线程池正在整理线程,等待终止状态。
    • private static final int TERMINATED = 3 << COUNT_BITS:线程池已终止,不再执行任何任务。
  • private static final int COUNT_BITS = Integer.SIZE - 3:表示线程池中线程数量的位数 (32 - 3)。

  • private static final int CAPACITY = (1 << COUNT_BITS) - 1:表示线程中的最大线程数量(2^29 - 1)。

  • private final BlockingQueue<Runnable> workQueue:用于存放已提交待执行的任务。

  • private final ReentrantLock mainLock = new ReentrantLock():可重入锁,ThreadPoolExecutor 存放线程池中的线程集合使用 HashSet< Worker > 存储的,而HashSet 是非线程安全的,所以操作前会先获取锁,在进行操作。

  • private final HashSet<Worker> workers = new HashSet<Worker>():存放线程池中线程的集合,在访问这个集合之前,必须获得 mainLock 锁。

  • private final Condition termination = mainLock.newCondition():用来分装线程的 Worker 继承了 AQS,termination 就是用来关联等待队列(或称呼条件队列)的。

  • private volatile ThreadFactory threadFactory:七大参数之一,用来创建线程的线程工厂。

  • private volatile RejectedExecutionHandler handler:七大参数之一,拒绝策略。

  • private static final RejectedExecutionHandler defaultHandler = new AbortPolicy():默认的拒绝策略。

  • private int largestPoolSize:用来追踪线程池达到的最大线程个数,更新之前也需要获取 mainLock 锁。通过这个数据可以知道线程池是
    否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。

  • private long completedTaskCount:线程池在运行过程中已完成的任务数量。

  • private volatile long keepAliveTime:七大参数之二,使用 unit.toNanos(keepAliveTime) 计算得来,为 纳秒。表示线程没有任务执行,空闲的最大时间,当线程数量大于核心线程数量(corePoolSize)时,超出的部分线程空闲超过这个时间,则会被销毁。如果allowCoreThreadTimeOut = true,核心线程超过时间也会被销毁。

  • private volatile boolean allowCoreThreadTimeOut:

    • 当设置为 true 时:任何线程空闲时间超过kepAliveTime后就会被销毁(重点:无需超过核心线程数、包括核心线程只要是超时的空闲线程都会被销毁)。
    • 当设置为 false 时(默认为false):如果线程数量小于等于核心线程数(corePoolSize),即便这些线程空闲,并且超出了空闲时间,也不会被销毁。只有超出核心线程数的部分线程会被销毁。
  • private volatile int corePoolSize:七大参数之一:核心线程数量。

  • private volatile int maximumPoolSize:七大参数之一:最大线程数。

  • private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"):权限属性,用于检查是否设置了安全管理器,是则看当前调用shutdown命令的线程是否具有关闭线程的权限,如果有还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出异常。

  • private static final boolean ONLY_ONE = true:中断时默认只中断一个工作者线程,tryTerminate方法会用到此常量。

源码如下:

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	private final Condition termination = mainLock.newCondition();
	private int largestPoolSize;
	private long completedTaskCount;
	private volatile ThreadFactory threadFactory;
	private volatile RejectedExecutionHandler handler;
	private volatile long keepAliveTime;
	private volatile boolean allowCoreThreadTimeOut;
	private volatile int corePoolSize;
	private volatile int maximumPoolSize;
	private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
	private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
	private static final boolean ONLY_ONE = true;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

二、核心类分析

1、Worker

  • 继承了 AQS,实现了 Runnable,本质上是线程池中执行任务的线程。分装了 任务 和 用 来执行任务的线程。

源码如下:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        //用于执行任务的线程,一般不为空,这由ThreadFactory决定  
        final Thread thread;
        //worker执行的第一个任务(对于后来的其他任务它会到任务队列中去获取)
        Runnable firstTask;
        //此线程完成的任务数
        volatile long completedTasks;

        //firstTask:需要执行的任务,可以为null
        Worker(Runnable firstTask) {
        	//将state置为负数-1,它的主要目的是防止线程被过早中断,直到runWorker开始执行任务时才清除state的负数状态
        	//什么叫过早的中断?
        	//通过 interruptIfStarted 方法可知,想要中断必须是 state >=0,才有可能执行t.interrupt();方法,
        	//而在 开始执行任务之前是没有必要中断的,所以设置了-1,在执行任务之前,即便调用了中断方法,也不会发生中断。
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //getThreadFactory是外部类的ThreadPoolExecutor的实例方法,返回一个实现ThreadFactory接口的对象.
  			//newThread根据当前worker所代表(Worker实现了Runnable接口,worker也是Runnable对象)的Runnable对象
    		//创建一个线程对象。所以this.thread线程启动就会执行worker.run().
            this.thread = getThreadFactory().newThread(this);
        }

       //当Worker.thread.start()调用后,Worker.thread线程启动,就会执行这里的run方法,run方法又委托给runWorker,
       //换句话说,worker线程启动后,最终会执行runWorker方法。
        public void run() {
            runWorker(this);
        }

        //这3个方法均是父类AQS的模板方法所调用的应被重写的方法,
        //其具体原理在之前AbstractQueuedSynchronizer实现原理分析帖子中做过说明,这里不再细说。
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //重写了 AQS 的 tryAcquiree 方法,成功将 state 从0设置为1,则认为获取锁成功
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
		
		//重写了 AQS 的 tryReleas 方法,释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
		
		//尝试获取锁,获取失败则加入到同步队列中
        public void lock()        { acquire(1); }
        //尝试获取锁,获取失败则直接返回false,不加入到同步队列中
        public boolean tryLock()  { return tryAcquire(1); }
        //释放锁
        public void unlock()      { release(1); }
        //判断锁是否空闲
        //true:共享资源已被加锁
        //false:共享资源未被加锁
        public boolean isLocked() { return isHeldExclusively(); }

		//调用此方法可以在任务启动后中断线程。
        void interruptIfStarted() {
            Thread t;
            //state不能小于0,为负数表明worker线程还未启动
            //(在构造方法中将state初始为-1,在worker线程启动后runWorker方法开头处的w.unLock会将state置为0)
            //!t.isInterrupted()如果线程已经是中断状态,也不需要再去中断它了。
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 通过源码分析可知,Worker 分装了三个内容:
    • worker执行的第一个任务(对于后来的其他任务它会到任务队列中去获取):firstTask。
    • 用来执行任务的线程:thread。
    • 该线程已经执行完的任务数量:completedTasks。
  • 可以通过 Worker 对象来调用获取锁和释放锁的操作。
  • 该类的具体用法和更多信息,继续看后面的方法。

三、核心方法分析

1、构造方法

//传入了五个参数
//1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列。使用默认的拒绝策略与线程工厂
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

//传入了6个参数
//1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:线程工厂。使用默认的拒绝策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

//传入了6个参数
//1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:拒绝策略。使用默认的线程工厂
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

//传入了7个参数
//1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:线程工厂;7:拒绝策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //判断参数有效性,违规则抛出异常                      
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        //一系列赋值        
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        //计算空闲时间,单位 纳秒
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

2、execute 方法

  • 将任务交给线程池去处理,一般都执行 execute 方法。
//传入的参数 command 就是交给线程池的待执行任务
public void execute(Runnable command) {
		//任务不可为空
        if (command == null)
            throw new NullPointerException();
        //获取 ctl
        int c = ctl.get();
        //workerCountOf:获取线程池中的线程数量
        //addWorker(command, true):创建核心线程
        //1:如果线程池中的线程数量 < 核心线程数,则创建新的新城用于执行该任务,并直接返回
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            //todo    
            c = ctl.get();
        }
        //到这里说明  2:线程池中的线程数量已经达到了核心线程数,则如果 工作队列没有满的话,加入到工作队列中
        //线程池正在运行 && 工作队列未满(offer 在队列满时返回false)
        if (isRunning(c) && workQueue.offer(command)) {
        	//因为在上面的操作过程中,ctl可能被别的线程更新过,所以这里再获取一次
            int recheck = ctl.get();
            //如果线程池已经不在运行,则无需将任务添加到工作队列中了,
            if (! isRunning(recheck) && remove(command))
            	//直接委托给拒绝策略handler调用rejectedExecution方法。
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //到这里说明 3:核心线程和工作队列都满了,则尝试创建非核心线程。
        else if (!addWorker(command, false))
        	//4:如果核心线程和工作队列满了,且线程已经达到最大线程数了,则执行拒绝策略
            reject(command);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 综上 execute 方法主要分四步走:
    1. 如果线程数量 < 核心线程数,则创建新的核心线程处理新添加的任务。
    2. 如果线程数量 >= 核心线程数,则将新任务添加到工作队列中。
      • 添加到工作队列之后,会检查线程池是否在运行,如果不在运行,则从工作队列中踢出该任务,并且将该任务直接交给拒绝策略执行。
    3. 如果线程数量 >= 核心线程数,且 工作队列满了,则创建 新的非核心线程处理新添加的任务。
    4. 如果工作队列满了,且线程数量 == 最大线程数量,则执行拒绝策略。

3、addWorker 方法

  • 用来新建 Worker,传入两个参数:

    • Runnable firstTask:需要执行的任务。
    • boolean core:true 则需要核心线程来执行,fales 则需要非核心线程来执行。
  • addWorker() 方法中只允许两种情况可以创建Worker:

    • 线程池状态为RUNNING,可以创建Worker。
    • 线程池状态为SHUTDOWN,且任务阻塞队列不为空,可以创建初始任务为null的Worker。
private boolean addWorker(Runnable firstTask, boolean core) {
		//标记,让里层的for循环跳到此处
        retry:
        for (;;) {
        	//获取 clt
            int c = ctl.get();
            //获取线程池的状态(ctl的高三位)
            int rs = runStateOf(c);

            //如果 rs >= SHUTDOWN ,说明线程池不在运行了
            // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()):
            //翻译一下就是 线程池状态为 SHUTDOWN && 工作队列不为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			//到这里说明有两种情况:
			//1:线程池正在运行 或
			//2:线程池是 SHUTDOWN 状态 且 传入的任务为空 且 工作队列不为空 
            for (;;) {
            	//获取线程数量
                int wc = workerCountOf(c);
                //如果 线程数量达到了线程池所能承受的极限 || 线程数量达到了核心线程数量(core = true) 或 最大线程数 (core = false)
                //说明无法创建新的线程了,直接返回false,创建 Worker 失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用 cas 的方法,将线程数量 + 1,成功则跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //失败说明又有其他线程修改了 ctl,这里重新获取ctl    
                c = ctl.get();  // Re-read ctl
                //如果线程池状态被修改了,则返回外层 for 循环重新执行 
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		//综合上面的代码分析,到这里说明线程池可能有两种状态 RUNNING 或 SHUTDOWN(传入的任务为空),且在 ctl 中
		//将将记录的线程数量 + 1 了。
		//记录 工作线程是否启动
        boolean workerStarted = false;
        //记录是否成功创建了工作线程Worker
        boolean workerAdded = false;
        //记录新建的工作线程Worker
        Worker w = null;
        try {
        	//使用传入的任务创建 Worker
            w = new Worker(firstTask);
            //获取 新建Worker 的线程
            final Thread t = w.thread;
            //如果线程不为空
            if (t != null) {
            	//获取重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //获取线程池状态
                    int rs = runStateOf(ctl.get());
					//如果线程池是运行状态 或 是SHUTDOWN 状态且传入的任务为空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //检查一下Worker的线程是否可以启动(处于活动状态的线程无法再启动)
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将 新建的 Worker 加入到工作队列中
                        workers.add(w);
                        //获取工作队列的大小
                        int s = workers.size();
                        //如果大小超过了已记录的最大值,更新下
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //标记创建 Worker 成功    
                        workerAdded = true;
                    }
                } finally {
                	//释放锁
                    mainLock.unlock();
                }
                //如果新建的 Worker 加入到工作队列成功了,则启动Worker 绑定的线程,并标记线程启动成功
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        	//如果启动失败,则将 Worker 从工作队列 workers 移出,并且将 ctl 记录线程数 - 1
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回线程启动结果
        return workerStarted;
    }

//现成启动失败后,将 w 从 工作队列中移除,并且 ctl 记录线程数量的位数 - 1,并且尝试终止线程池
rivate void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            //尝试终止线程池    
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

	//两种情况下会终止线程池
	//1:线程池的状态为SHUTDOWN,Worker数量为0,任务阻塞队列为空;
	//2:线程池的状态为STOP,Worker数量为0。
	//官方注释中还说明在所有可能导致线程池终止的操作中都应该调用tryTerminate() 方法来尝试终止线程池,
	//因此线程池中Worker被删除时和任务阻塞队列中任务被删除时会调用tryTerminate(),以达到在线程池符合终止条件时及时终止线程池。
    final void tryTerminate() {
        for (;;) {
        	//获取ctl
            int c = ctl.get();
            // 如果线程池状态为RUNNING,则没有资格终止线程池
        	// 如果线程池状态大于等于TIDYING,则没有资格终止线程池
        	// 如果线程池状态为SHUTDOWN但任务阻塞队列不为空,则没有资格终止线程池	
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //到这里说明线程池状态为 SHUTDOWN 且工作队列为空  或 线程池状态为 STOP
            //如果成立,说明工作队列中的 Worker 数量不为0,说明还有正在执行或空闲的 Worker,此时中断一个Worker。todo,为何中断的是一个。
            if (workerCountOf(c) != 0) { // Eligible to terminate
            	//中断空闲的Worker,因为 ONLY_ONE 为 true,只中断一个
            	//其实就是通过中断信号,唤醒阻塞的线程(getTask()阻塞的)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
			//到这里说明没有现成了,则进行关闭线程池的操作
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
            	//使用cas方式将线程池状态设置为 TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                    	//终止线程池 todo
                        terminated();
                    } finally {
                    	// 将线程池状态最终置为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //todo
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 分析完上面个的源码,总结下 execute 的流程图(包含addWorker)如下:

在这里插入图片描述

小结:

  • execute 整体分四步走:

    • 如果线程池中的线程数量 < 核心线程数,则创建新的新城用于执行该任务,并直接返回。
    • 线程池中的线程数量已经达到了核心线程数,则如果 工作队列没有满的话,加入到工作队列中。
    • 核心线程和工作队列都满了,则尝试创建非核心线程。
    • 如果核心线程和工作队列满了,且线程已经达到最大线程数了,则执行拒绝策略
  • 提交的 command 是如何执行它的run方法的?

    • 提交的 command任务会被分装到 Worker 里,而 Worker 也是实现了 Runnable 接口,并且 Worker 还分装了 Thread 线程,在创建 Thread 线程时传入的 this(指的就是 Worker 对象)。
    • 创建好 Worker 对象后,会加入到工作队列中,成功加入后会调用分装的 Thread 的 start 方法启动线程,由于 Thread 创建时传入的是this(Worker 对象),所以会调用 Worker 的 run方法,从而会调用 runWorker(this)方法,从而调用了 我们传入的 command 的 run 方法。

4、runWorker 方法

  • 通过 addWorker 方法可知,无论是创建核心线程,还是非核心线程,都是通过创建 Worker 对象,封装了 command 和 Thread。并启动Thread 线程,通过调用 Worker.run 方法 -> runWorker 方法,runWorker 中来调用 command方法的。接下来详细解析下 runWorker 方法。
//w 中分装了 Thread 和 需要执行的 firstTask (其实就是调用 execute 传入的 command)
final void runWorker(Worker w) {
		//获取当前线程,和w.thread 是同一个
        Thread wt = Thread.currentThread();
        //获取需要执行的任务
        Runnable task = w.firstTask;
        //firstTast已被去除,可设为空,
        w.firstTask = null;
        //因为Worker创建时state设为-1,防止过早被中断,现在准备执行任务了,会进行加锁解锁或中断操作,所以设为0
        w.unlock(); // allow interrupts
        //worker 有没有发生异常
        //false:没有
        //true:有
        boolean completedAbruptly = true;
        try {
        	//循环执行任务,第一个先执行拿到的 task,后面的通过 getTask 从工作队列中获取任务,直到工作队列中的任务全部执行完毕
            while (task != null || (task = getTask()) != null) {
            	//这里不应该理解为上锁,因为不同的Worker 对应不同的线程,也对应不同的锁,不会出现多线程同时访问同一个任务的情况。
            	//这里其实有以下几个作用:
            	//1:表示当前线程正在执行任务,则不应该被中断。
            	//2:如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可 以对该线程进行中断; 
            	//3:线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来 中断空闲的线程,
            	//interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程 是否是空闲状态
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 正如英文描述的一样,为了确保 线程池停止了(线程池状态是大于等于STOP的话(执行了shutdownNow)),线程也需要中断。
                //可拆分两种情况:
                //1: runStateAtLeast(ctl.get(), STOP) &&  !wt.isInterrupted()
                //2: (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
                //可以看出,如果线程池停止了,现成没有中断,则将线程进行中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    //走到这里说明线程池为停止状态,而线程不为中断,则需要中断线程,确保线程池被中断的情况下,线程也需要中断
                    wt.interrupt();
                try {
                	//钩子函数,可用于扩展。执行任务之前
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//执行我们调用 execute传入的command里的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                   		//钩子函数,可用于扩展。执行任务之后
                        afterExecute(task, thrown);
                    }
                } finally {
                	//任务执行完后置空,方便下一轮循环
                    task = null;
                    //当前worker 执行完的任务数量 + 1
                    w.completedTasks++;
                    //解锁
                    w.unlock();
                }
            }
            //正常结束,没有发生异常
            completedAbruptly = false;
        } finally {
        	//执行Worker的退出
            processWorkerExit(w, completedAbruptly);
        }
    }


private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
        	//获取ctl的值
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);
			
            // Check if queue empty only if necessary.
            //if 为 true 有两种情况
            //1:线程池状态为 SHUTDOWN 工作队列为空,说明没有任务可处理了。
            //2:线程池状态为 STOP 或 TIDYING 或 TERMINATE,这些状态下不会在执行任务。
   			//所以当前Worker 需要被移除,移除前先 decrementWorkerCount 减少 worker 数量。
   			//并返回null,未获取到任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
			//获取 worker 数量
            int wc = workerCountOf(c);
			
            // Are workers subject to culling?
            //判断当前 Worker 收不收空闲时间的有影响。 
            //allowCoreThreadTimeOut = true:所有 worker 都受影响,超过空闲时间需要被回收。
            //allowCoreThreadTimeOut = false:超出核心线程数的Worker 收影响,超过空闲时间需要被回收。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//1:线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize()
			//被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize 
			//2:timed && timedOut 如果为 true,上次for循环调用下面的 poll,并且超过了存活时间,返回了null,
			//那么这次for循环,才能保证timed && timedOut 为true
			//获取任务发生了超时.其实就是体现了空闲线程的存活时间 
			//3:wc > 1 || workQueue.isEmpty()   线程数量 > 1 或 工作队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //线程数量 - 1,返回null,未获取到任务
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            	//判断是用 poll 还是 take
            	//poll(keepAliveTime, TimeUnit.NANOSECONDS):从BlockingQueue取出一个队首的对象,如果在指定时间内,
            	//队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回null。
            	//take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入,
            	//注意 take 会阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //从工作对了中获取到任务则直接返回
                if (r != null)
                    return r;
                //调用了 poll,超时了未获取到 任务,记录本次循环经历了超时未获取到任务,用于下次循环    
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


//执行Worker的退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
		// 如果任务执行过程中发生了报错,则CAS的方式把任务数-1。ctl的低29位。
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        	// CAS的方式将任务数-1。
            decrementWorkerCount();
		 // 上锁 保证将任务移除队列的线程安全。
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 将任务移除队列,因为任务已经执行完了
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
		// 尝试中断线程池
        tryTerminate();

		/*
    	 * 下面这段代码的含义是如果线程池是RUNNING或者SHUTDOWN状态的话,
    	 * 且任务顺利完成(completedAbruptly=false)的话,那么判断是否设置了允许核心线程超时
   	 	 * 如果允许核心线程超时,且任务队列不等于空的话,那么开启一个线程来执行任务。
    	 * 
    	 * 一言以蔽之:如果线程池是RUNNING或者SHUTDOWN状态的话,且任务队列不是空,那么至少保证线程池中有一个线程在执行任务
   		 */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
            	//如果允许核心线程有空闲时间(allowCoreThreadTimeOut = true,默认false),最 最小线程数0,否则最小为核心线程数
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //两种情况
                //1:线程数量 >= 1。保证至少有一个 Worker 来处理任务
                //2:线程数量 >= 核心线程数   线程数还未达到核心线程数,再建一个 Worker 来处理任务
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //再建一个 Worker
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179

小结:

  • 如果核心线程数满了,工作队列没满,提交的任务是如何做到线程复用的呢?

    • 通过 runWorker 源码可以知道,Worker 首先执行分装的 firstTask 任务,执行完之后并没有停止线程,而是继续从 工作队列 workQueue 中去获取任务,获取到任务后继续执行,从而做到了一个线程执行多个任务。
  • 非核心线程超过了空闲时间,是如何回收的呢?核心线程超过了空闲时间会回收吗?

    • 如果是非核心线程,在获取工作队列 workQueue 中的任务是,调用的是 poll(time,timeUtil),在指定时间内如果没有获取到任务就会返回null,那么该线程所在的 Worker会从 使用了HashSet存放了所有的Worker的容器中移除该woker,同时代表了线程池线程数量的 ctl 会 减少1,从而达到线程回收的效果。
    • 通过设置 allowCoreThreadTimeOut 变量来决定核心线程超过了空闲时间是否被回收,默认为false:不回收,设置为true:则回收。

5、shutdown 方法

  • 源码如下:
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 循环通过CAS方式将线程池状态置为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有空闲的 Worker
        interruptIdleWorkers();
        //钩子函数,可扩展
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    // 尝试终止线程池
    tryTerminate();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 在shutdown() 方法中首先会将线程池状态置为SHUTDOWN,然后调用interruptIdleWorkers() 方法中断空闲Worker,最后调用tryTerminate() 方法来尝试终止线程池。那么这里要解释一下什么是空闲Worker,先看一下interruptIdleWorkers() 的实现。
//中断空闲Worker
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 中断线程前需要先尝试获取Worker的锁
            	// 只能获取到空闲Worker的锁,所以shutdown()方法只会中断空闲Worker
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 调用interruptIdleWorkers() 方法中断Worker前首先需要尝试获取Worker的锁,已知Worker除了实现Runnable接口外,还继承于AbstractQueuedSynchronizer,因此Worker本身是一把锁,然后在runWorker() 中Worker执行任务前都会先获取Worker的锁,这里看一下Worker的lock() 方法的实现。
public void lock() {
    acquire(1);
}

protected boolean tryAcquire(int unused) {
    // 以CAS方式将state从0设置为1
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 可以发现,Worker在lock() 中调用了acquire() 方法,该方法由AbstractQueuedSynchronizer抽象类提供,在acquire() 中会调用其子类实现的tryAcquire() 方法,tryAcquire() 方法会以CAS方式将state从0设置为1,因此这样的设计让Worker是一把不可重入锁。
  • 回到interruptIdleWorkers() 方法,前面提到该方法中断Worker前会尝试获取Worker的锁,能够获取到锁才会中断Worker(说明该Worker 不是空闲的),而因为Worker是不可重入锁,所以正在执行任务的Worker是无法获取到锁的,只有那些没有执行任务的Worker的锁才能够被获取,因此所谓的中断空闲Worker(这里不分核心和非核心,只要空闲就中断),实际就是中断没有执行任务的Worker(在getTask中无法获取任务而阻塞了),那些执行任务的Worker在shutdown() 方法被调用时不会被中断,这些Worker执行完任务后会继续从任务阻塞队列中获取任务来执行,直到任务阻塞队列为空,此时没有被中断过的Worker也会被删除掉,等到线程池中没有Worker以及任务阻塞队列没有任务后,线程池才会被终止掉。
  • 对于shutdown() 方法,一句话总结就是:将线程池状态置为SHUTDOWN并拒绝接受新任务,等到线程池Worker数量为0,任务阻塞队列为空时,关闭线程池。

6、shutdownNow 方法

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //将线程池状态修改为 STOP,则无需工作队列中未处理完的任务的任务,以及拿到任务的 Worker,在执行前也会先判断是都为STOP状态,
            //是则会停止,进行中断操作(体现在runWorker方法中),但是仍会执行后面的  task.run() 方法。但是下一次 getTask 
            //无法获取到任务了,移除Worker 和 停止线程
            advanceRunState(STOP);
 			//中断所有 worker
            interruptWorkers();
            //获取所有在工作队列中没有处理的任务进行返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //尝试中断线程池
        tryTerminate();
        return tasks;
    }

//获取所有工作队列中的任务
private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        //一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,
        //可以提升获取数据效率;不需要多次分批加锁或释放锁。
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

小结:

  • ThreadPoolExecutor 是如何关闭线程池的?
    • 通过 shutdown 和 shutdownNow 方法来关闭线程。
    • shutdown: 首先将线程池状态更新为 SHUTDOWN,紧接着中断所有空闲线程,因为 在 runWorker 中,处理任务之前会先上锁,所以是通过 worker.tryLock 来判断是否空闲线程,是 则进行 interrupt 中断操作,通过getTask可知,空闲线程是在获取工作队列任务时进行了阻塞,而这里调用了中断方法,阻塞的线程就会抛出异常,返回null,从而调用processWorkerExit 方法,移除worker 和 线程数量 - 1。非空闲的线程则不中断,继续执行工作队列的任务,直到工作队列为空。接着尝试关闭线程池,当所有线程都关闭后,将线程池设为 TIDYING 在设置为 TERMINATED,关闭完成。
      • 那么是如何关闭哪些非空闲线程,达到所有线程都关闭呢?
        • 正在运行的线程在所有的任务完成后,执行getTask无法获取任务,也会执行processWorkerExit 从而移除worker 和 线程数量 - 1。
    • shutdownNow: 首先将线程池状态更新为 STOP,并且中断所有的线程,从而导致所有的空闲线程都会报InterruptedException,并从getTask返回 null,从而调用 processWorkerExit进行关闭,而正在执行的线程,同样的在调用 getTask 无法获取到任务,也会调用 processWorkerExit进行关闭。所以哪些在工作队列中的任务不会被执行了,当然,如果一个任务正在执行,它是不会中断的,因为 interrupt只是打上中断标记,可以保障下一次无法获取到任务而中断。shutdownNow 最终会返回所有在工作队列中未执行的任务。

四、实战案例:定制一个监控线程池运行状态的ThreadPoolExecutor

  • 现在让我们来看一个实战案例,其中我们将创建一个监控线程池的 ThreadPoolExecutor 并实施自定义扩展。其实就是实现钩子函数。
public class MonitoringThreadPoolExecutor extends ThreadPoolExecutor {
    private final ConcurrentHashMap<String, Long> timing = new ConcurrentHashMap<>();

    // 构造方法和其他必要的方法

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        timing.put(String.valueOf(r.hashCode()), System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Long startTime = timing.remove(String.valueOf(r.hashCode()));
        long taskDuration = System.nanoTime() - startTime;
        // Record or log the task duration
    }

    @Override
    protected void terminated() {
        super.terminated();
        // Do some final logging or resource release
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

参考资料:

冰河:《深入理解高并发变成 JDK核心技术》
方腾飞:《Java并发编程的艺术》
文章:一文搞懂ThreadPoolExecutor原理
文章:BlockingQueue(阻塞队列)详解

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/944560
推荐阅读
相关标签
  

闽ICP备14008679号