赞
踩
ThreadPoolExecutor 结构图如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)):
原子整型,高三位:线程池状态。低29位:Worker数量,即当前线程池中正在执行任务的线程数。todo
线程状态字段:
private static final int RUNNING = -1 << COUNT_BITS:
线程池正在运行,可以接收新任务并处理已有任务。private static final int SHUTDOWN = 0 << COUNT_BITS:
线程池处于关闭状态,不再接受新任务,但会继续处理已有任务,直到任务队列为空。private static final int STOP = 1 << COUNT_BITS:
线程池立即停止,正在执行的任务会被中断,尚未执行的任务会被移出队列。private static final int TIDYING = 2 << COUNT_BITS:
线程池正在整理线程,等待终止状态。private static final int TERMINATED = 3 << COUNT_BITS:
线程池已终止,不再执行任何任务。private static final int COUNT_BITS = Integer.SIZE - 3
:表示线程池中线程数量的位数 (32 - 3)。
private static final int CAPACITY = (1 << COUNT_BITS) - 1
:表示线程中的最大线程数量(2^29 - 1)。
private final BlockingQueue<Runnable> workQueue:
用于存放已提交待执行的任务。
private final ReentrantLock mainLock = new ReentrantLock():
可重入锁,ThreadPoolExecutor 存放线程池中的线程集合使用 HashSet< Worker > 存储的,而HashSet 是非线程安全的,所以操作前会先获取锁,在进行操作。
private final HashSet<Worker> workers = new HashSet<Worker>():
存放线程池中线程的集合,在访问这个集合之前,必须获得 mainLock 锁。
private final Condition termination = mainLock.newCondition():
用来分装线程的 Worker 继承了 AQS,termination 就是用来关联等待队列(或称呼条件队列)的。
private volatile ThreadFactory threadFactory:
七大参数之一,用来创建线程的线程工厂。
private volatile RejectedExecutionHandler handler:
七大参数之一,拒绝策略。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy():
默认的拒绝策略。
private int largestPoolSize:
用来追踪线程池达到的最大线程个数,更新之前也需要获取 mainLock 锁。通过这个数据可以知道线程池是
否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
private long completedTaskCount:
线程池在运行过程中已完成的任务数量。
private volatile long keepAliveTime:
七大参数之二,使用 unit.toNanos(keepAliveTime) 计算得来,为 纳秒。表示线程没有任务执行,空闲的最大时间,当线程数量大于核心线程数量(corePoolSize)时,超出的部分线程空闲超过这个时间,则会被销毁。如果allowCoreThreadTimeOut = true,核心线程超过时间也会被销毁。
private volatile boolean allowCoreThreadTimeOut:
private volatile int corePoolSize:
七大参数之一:核心线程数量。
private volatile int maximumPoolSize:
七大参数之一:最大线程数。
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"):
权限属性,用于检查是否设置了安全管理器,是则看当前调用shutdown命令的线程是否具有关闭线程的权限,如果有还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出异常。
private static final boolean ONLY_ONE = true:
中断时默认只中断一个工作者线程,tryTerminate方法会用到此常量。
源码如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private static final boolean ONLY_ONE = true;
任务
和 用 来执行任务的线程。
源码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; //用于执行任务的线程,一般不为空,这由ThreadFactory决定 final Thread thread; //worker执行的第一个任务(对于后来的其他任务它会到任务队列中去获取) Runnable firstTask; //此线程完成的任务数 volatile long completedTasks; //firstTask:需要执行的任务,可以为null Worker(Runnable firstTask) { //将state置为负数-1,它的主要目的是防止线程被过早中断,直到runWorker开始执行任务时才清除state的负数状态 //什么叫过早的中断? //通过 interruptIfStarted 方法可知,想要中断必须是 state >=0,才有可能执行t.interrupt();方法, //而在 开始执行任务之前是没有必要中断的,所以设置了-1,在执行任务之前,即便调用了中断方法,也不会发生中断。 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //getThreadFactory是外部类的ThreadPoolExecutor的实例方法,返回一个实现ThreadFactory接口的对象. //newThread根据当前worker所代表(Worker实现了Runnable接口,worker也是Runnable对象)的Runnable对象 //创建一个线程对象。所以this.thread线程启动就会执行worker.run(). this.thread = getThreadFactory().newThread(this); } //当Worker.thread.start()调用后,Worker.thread线程启动,就会执行这里的run方法,run方法又委托给runWorker, //换句话说,worker线程启动后,最终会执行runWorker方法。 public void run() { runWorker(this); } //这3个方法均是父类AQS的模板方法所调用的应被重写的方法, //其具体原理在之前AbstractQueuedSynchronizer实现原理分析帖子中做过说明,这里不再细说。 protected boolean isHeldExclusively() { return getState() != 0; } //重写了 AQS 的 tryAcquiree 方法,成功将 state 从0设置为1,则认为获取锁成功 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //重写了 AQS 的 tryReleas 方法,释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //尝试获取锁,获取失败则加入到同步队列中 public void lock() { acquire(1); } //尝试获取锁,获取失败则直接返回false,不加入到同步队列中 public boolean tryLock() { return tryAcquire(1); } //释放锁 public void unlock() { release(1); } //判断锁是否空闲 //true:共享资源已被加锁 //false:共享资源未被加锁 public boolean isLocked() { return isHeldExclusively(); } //调用此方法可以在任务启动后中断线程。 void interruptIfStarted() { Thread t; //state不能小于0,为负数表明worker线程还未启动 //(在构造方法中将state初始为-1,在worker线程启动后runWorker方法开头处的w.unLock会将state置为0) //!t.isInterrupted()如果线程已经是中断状态,也不需要再去中断它了。 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
worker执行的第一个任务(对于后来的其他任务它会到任务队列中去获取):firstTask。
用来执行任务的线程:thread。
该线程已经执行完的任务数量:completedTasks。
//传入了五个参数 //1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列。使用默认的拒绝策略与线程工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } //传入了6个参数 //1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:线程工厂。使用默认的拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } //传入了6个参数 //1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:拒绝策略。使用默认的线程工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } //传入了7个参数 //1:核心线程数;2:最大线程数;3:空闲时间;4:空闲时间单位;5:工作队列;6:线程工厂;7:拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //判断参数有效性,违规则抛出异常 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); //一系列赋值 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; //计算空闲时间,单位 纳秒 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
//传入的参数 command 就是交给线程池的待执行任务 public void execute(Runnable command) { //任务不可为空 if (command == null) throw new NullPointerException(); //获取 ctl int c = ctl.get(); //workerCountOf:获取线程池中的线程数量 //addWorker(command, true):创建核心线程 //1:如果线程池中的线程数量 < 核心线程数,则创建新的新城用于执行该任务,并直接返回 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //todo c = ctl.get(); } //到这里说明 2:线程池中的线程数量已经达到了核心线程数,则如果 工作队列没有满的话,加入到工作队列中 //线程池正在运行 && 工作队列未满(offer 在队列满时返回false) if (isRunning(c) && workQueue.offer(command)) { //因为在上面的操作过程中,ctl可能被别的线程更新过,所以这里再获取一次 int recheck = ctl.get(); //如果线程池已经不在运行,则无需将任务添加到工作队列中了, if (! isRunning(recheck) && remove(command)) //直接委托给拒绝策略handler调用rejectedExecution方法。 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //到这里说明 3:核心线程和工作队列都满了,则尝试创建非核心线程。 else if (!addWorker(command, false)) //4:如果核心线程和工作队列满了,且线程已经达到最大线程数了,则执行拒绝策略 reject(command); }
综上 execute 方法主要分四步走:
用来新建 Worker,传入两个参数:
Runnable firstTask:
需要执行的任务。boolean core:
true 则需要核心线程来执行,fales 则需要非核心线程来执行。addWorker() 方法中只允许两种情况可以创建Worker:
private boolean addWorker(Runnable firstTask, boolean core) { //标记,让里层的for循环跳到此处 retry: for (;;) { //获取 clt int c = ctl.get(); //获取线程池的状态(ctl的高三位) int rs = runStateOf(c); //如果 rs >= SHUTDOWN ,说明线程池不在运行了 // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()): //翻译一下就是 线程池状态为 SHUTDOWN && 工作队列不为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //到这里说明有两种情况: //1:线程池正在运行 或 //2:线程池是 SHUTDOWN 状态 且 传入的任务为空 且 工作队列不为空 for (;;) { //获取线程数量 int wc = workerCountOf(c); //如果 线程数量达到了线程池所能承受的极限 || 线程数量达到了核心线程数量(core = true) 或 最大线程数 (core = false) //说明无法创建新的线程了,直接返回false,创建 Worker 失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //使用 cas 的方法,将线程数量 + 1,成功则跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; //失败说明又有其他线程修改了 ctl,这里重新获取ctl c = ctl.get(); // Re-read ctl //如果线程池状态被修改了,则返回外层 for 循环重新执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //综合上面的代码分析,到这里说明线程池可能有两种状态 RUNNING 或 SHUTDOWN(传入的任务为空),且在 ctl 中 //将将记录的线程数量 + 1 了。 //记录 工作线程是否启动 boolean workerStarted = false; //记录是否成功创建了工作线程Worker boolean workerAdded = false; //记录新建的工作线程Worker Worker w = null; try { //使用传入的任务创建 Worker w = new Worker(firstTask); //获取 新建Worker 的线程 final Thread t = w.thread; //如果线程不为空 if (t != null) { //获取重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池状态 int rs = runStateOf(ctl.get()); //如果线程池是运行状态 或 是SHUTDOWN 状态且传入的任务为空 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //检查一下Worker的线程是否可以启动(处于活动状态的线程无法再启动) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将 新建的 Worker 加入到工作队列中 workers.add(w); //获取工作队列的大小 int s = workers.size(); //如果大小超过了已记录的最大值,更新下 if (s > largestPoolSize) largestPoolSize = s; //标记创建 Worker 成功 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果新建的 Worker 加入到工作队列成功了,则启动Worker 绑定的线程,并标记线程启动成功 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果启动失败,则将 Worker 从工作队列 workers 移出,并且将 ctl 记录线程数 - 1 if (! workerStarted) addWorkerFailed(w); } //返回线程启动结果 return workerStarted; } //现成启动失败后,将 w 从 工作队列中移除,并且 ctl 记录线程数量的位数 - 1,并且尝试终止线程池 rivate void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); //尝试终止线程池 decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } //两种情况下会终止线程池 //1:线程池的状态为SHUTDOWN,Worker数量为0,任务阻塞队列为空; //2:线程池的状态为STOP,Worker数量为0。 //官方注释中还说明在所有可能导致线程池终止的操作中都应该调用tryTerminate() 方法来尝试终止线程池, //因此线程池中Worker被删除时和任务阻塞队列中任务被删除时会调用tryTerminate(),以达到在线程池符合终止条件时及时终止线程池。 final void tryTerminate() { for (;;) { //获取ctl int c = ctl.get(); // 如果线程池状态为RUNNING,则没有资格终止线程池 // 如果线程池状态大于等于TIDYING,则没有资格终止线程池 // 如果线程池状态为SHUTDOWN但任务阻塞队列不为空,则没有资格终止线程池 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //到这里说明线程池状态为 SHUTDOWN 且工作队列为空 或 线程池状态为 STOP //如果成立,说明工作队列中的 Worker 数量不为0,说明还有正在执行或空闲的 Worker,此时中断一个Worker。todo,为何中断的是一个。 if (workerCountOf(c) != 0) { // Eligible to terminate //中断空闲的Worker,因为 ONLY_ONE 为 true,只中断一个 //其实就是通过中断信号,唤醒阻塞的线程(getTask()阻塞的) interruptIdleWorkers(ONLY_ONE); return; } //到这里说明没有现成了,则进行关闭线程池的操作 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //使用cas方式将线程池状态设置为 TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //终止线程池 todo terminated(); } finally { // 将线程池状态最终置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //todo termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
小结:
execute 整体分四步走:
如果线程池中的线程数量 < 核心线程数,
则创建新的新城用于执行该任务,并直接返回。线程池中的线程数量已经达到了核心线程数,则如果 工作队列没有满的话
,加入到工作队列中。核心线程和工作队列都满了
,则尝试创建非核心线程。如果核心线程和工作队列满了,且线程已经达到最大线程数了
,则执行拒绝策略提交的 command 是如何执行它的run方法的?
//w 中分装了 Thread 和 需要执行的 firstTask (其实就是调用 execute 传入的 command) final void runWorker(Worker w) { //获取当前线程,和w.thread 是同一个 Thread wt = Thread.currentThread(); //获取需要执行的任务 Runnable task = w.firstTask; //firstTast已被去除,可设为空, w.firstTask = null; //因为Worker创建时state设为-1,防止过早被中断,现在准备执行任务了,会进行加锁解锁或中断操作,所以设为0 w.unlock(); // allow interrupts //worker 有没有发生异常 //false:没有 //true:有 boolean completedAbruptly = true; try { //循环执行任务,第一个先执行拿到的 task,后面的通过 getTask 从工作队列中获取任务,直到工作队列中的任务全部执行完毕 while (task != null || (task = getTask()) != null) { //这里不应该理解为上锁,因为不同的Worker 对应不同的线程,也对应不同的锁,不会出现多线程同时访问同一个任务的情况。 //这里其实有以下几个作用: //1:表示当前线程正在执行任务,则不应该被中断。 //2:如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可 以对该线程进行中断; //3:线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来 中断空闲的线程, //interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程 是否是空闲状态 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 正如英文描述的一样,为了确保 线程池停止了(线程池状态是大于等于STOP的话(执行了shutdownNow)),线程也需要中断。 //可拆分两种情况: //1: runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted() //2: (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted() //可以看出,如果线程池停止了,现成没有中断,则将线程进行中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //走到这里说明线程池为停止状态,而线程不为中断,则需要中断线程,确保线程池被中断的情况下,线程也需要中断 wt.interrupt(); try { //钩子函数,可用于扩展。执行任务之前 beforeExecute(wt, task); Throwable thrown = null; try { //执行我们调用 execute传入的command里的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //钩子函数,可用于扩展。执行任务之后 afterExecute(task, thrown); } } finally { //任务执行完后置空,方便下一轮循环 task = null; //当前worker 执行完的任务数量 + 1 w.completedTasks++; //解锁 w.unlock(); } } //正常结束,没有发生异常 completedAbruptly = false; } finally { //执行Worker的退出 processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { //获取ctl的值 int c = ctl.get(); //获取线程池状态 int rs = runStateOf(c); // Check if queue empty only if necessary. //if 为 true 有两种情况 //1:线程池状态为 SHUTDOWN 工作队列为空,说明没有任务可处理了。 //2:线程池状态为 STOP 或 TIDYING 或 TERMINATE,这些状态下不会在执行任务。 //所以当前Worker 需要被移除,移除前先 decrementWorkerCount 减少 worker 数量。 //并返回null,未获取到任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取 worker 数量 int wc = workerCountOf(c); // Are workers subject to culling? //判断当前 Worker 收不收空闲时间的有影响。 //allowCoreThreadTimeOut = true:所有 worker 都受影响,超过空闲时间需要被回收。 //allowCoreThreadTimeOut = false:超出核心线程数的Worker 收影响,超过空闲时间需要被回收。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //1:线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize() //被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize //2:timed && timedOut 如果为 true,上次for循环调用下面的 poll,并且超过了存活时间,返回了null, //那么这次for循环,才能保证timed && timedOut 为true //获取任务发生了超时.其实就是体现了空闲线程的存活时间 //3:wc > 1 || workQueue.isEmpty() 线程数量 > 1 或 工作队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //线程数量 - 1,返回null,未获取到任务 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //判断是用 poll 还是 take //poll(keepAliveTime, TimeUnit.NANOSECONDS):从BlockingQueue取出一个队首的对象,如果在指定时间内, //队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回null。 //take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入, //注意 take 会阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //从工作对了中获取到任务则直接返回 if (r != null) return r; //调用了 poll,超时了未获取到 任务,记录本次循环经历了超时未获取到任务,用于下次循环 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } //执行Worker的退出 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果任务执行过程中发生了报错,则CAS的方式把任务数-1。ctl的低29位。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // CAS的方式将任务数-1。 decrementWorkerCount(); // 上锁 保证将任务移除队列的线程安全。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 将任务移除队列,因为任务已经执行完了 workers.remove(w); } finally { mainLock.unlock(); } // 尝试中断线程池 tryTerminate(); /* * 下面这段代码的含义是如果线程池是RUNNING或者SHUTDOWN状态的话, * 且任务顺利完成(completedAbruptly=false)的话,那么判断是否设置了允许核心线程超时 * 如果允许核心线程超时,且任务队列不等于空的话,那么开启一个线程来执行任务。 * * 一言以蔽之:如果线程池是RUNNING或者SHUTDOWN状态的话,且任务队列不是空,那么至少保证线程池中有一个线程在执行任务 */ int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //如果允许核心线程有空闲时间(allowCoreThreadTimeOut = true,默认false),最 最小线程数0,否则最小为核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; //两种情况 //1:线程数量 >= 1。保证至少有一个 Worker 来处理任务 //2:线程数量 >= 核心线程数 线程数还未达到核心线程数,再建一个 Worker 来处理任务 if (workerCountOf(c) >= min) return; // replacement not needed } //再建一个 Worker addWorker(null, false); } }
小结:
如果核心线程数满了,工作队列没满,提交的任务是如何做到线程复用的呢?
非核心线程超过了空闲时间,是如何回收的呢?核心线程超过了空闲时间会回收吗?
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 循环通过CAS方式将线程池状态置为SHUTDOWN advanceRunState(SHUTDOWN); // 中断所有空闲的 Worker interruptIdleWorkers(); //钩子函数,可扩展 onShutdown(); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); }
//中断空闲Worker private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 中断线程前需要先尝试获取Worker的锁 // 只能获取到空闲Worker的锁,所以shutdown()方法只会中断空闲Worker if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
public void lock() {
acquire(1);
}
protected boolean tryAcquire(int unused) {
// 以CAS方式将state从0设置为1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire() 方法会以CAS方式将state从0设置为1
,因此这样的设计让Worker是一把不可重入锁。所谓的中断空闲Worker(这里不分核心和非核心,只要空闲就中断),实际就是中断没有执行任务的Worker(在getTask中无法获取任务而阻塞了)
,那些执行任务的Worker在shutdown() 方法被调用时不会被中断,这些Worker执行完任务后会继续从任务阻塞队列中获取任务来执行,直到任务阻塞队列为空,此时没有被中断过的Worker也会被删除掉,等到线程池中没有Worker以及任务阻塞队列没有任务后,线程池才会被终止掉。将线程池状态置为SHUTDOWN并拒绝接受新任务,等到线程池Worker数量为0,任务阻塞队列为空时,关闭线程池。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //将线程池状态修改为 STOP,则无需工作队列中未处理完的任务的任务,以及拿到任务的 Worker,在执行前也会先判断是都为STOP状态, //是则会停止,进行中断操作(体现在runWorker方法中),但是仍会执行后面的 task.run() 方法。但是下一次 getTask //无法获取到任务了,移除Worker 和 停止线程 advanceRunState(STOP); //中断所有 worker interruptWorkers(); //获取所有在工作队列中没有处理的任务进行返回 tasks = drainQueue(); } finally { mainLock.unlock(); } //尝试中断线程池 tryTerminate(); return tasks; } //获取所有工作队列中的任务 private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); //一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法, //可以提升获取数据效率;不需要多次分批加锁或释放锁。 q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
小结:
ThreadPoolExecutor 是如何关闭线程池的?
shutdown:
首先将线程池状态更新为 SHUTDOWN,紧接着中断所有空闲线程,因为 在 runWorker 中,处理任务之前会先上锁,所以是通过 worker.tryLock 来判断是否空闲线程,是 则进行 interrupt 中断操作,通过getTask可知,空闲线程是在获取工作队列任务时进行了阻塞,而这里调用了中断方法,阻塞的线程就会抛出异常,返回null,从而调用processWorkerExit 方法,移除worker 和 线程数量 - 1。非空闲的线程则不中断,继续执行工作队列的任务,直到工作队列为空。接着尝试关闭线程池,当所有线程都关闭后,将线程池设为 TIDYING 在设置为 TERMINATED,关闭完成。
那么是如何关闭哪些非空闲线程,达到所有线程都关闭呢?
shutdownNow:
首先将线程池状态更新为 STOP,并且中断所有的线程,从而导致所有的空闲线程都会报InterruptedException,并从getTask返回 null,从而调用 processWorkerExit进行关闭,而正在执行的线程,同样的在调用 getTask 无法获取到任务,也会调用 processWorkerExit进行关闭。所以哪些在工作队列中的任务不会被执行了,当然,如果一个任务正在执行,它是不会中断的,因为 interrupt只是打上中断标记,可以保障下一次无法获取到任务而中断。shutdownNow 最终会返回所有在工作队列中未执行的任务。public class MonitoringThreadPoolExecutor extends ThreadPoolExecutor { private final ConcurrentHashMap<String, Long> timing = new ConcurrentHashMap<>(); // 构造方法和其他必要的方法 @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); timing.put(String.valueOf(r.hashCode()), System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { Long startTime = timing.remove(String.valueOf(r.hashCode())); long taskDuration = System.nanoTime() - startTime; // Record or log the task duration } @Override protected void terminated() { super.terminated(); // Do some final logging or resource release } }
参考资料:
冰河:《深入理解高并发变成 JDK核心技术》
方腾飞:《Java并发编程的艺术》
文章:一文搞懂ThreadPoolExecutor原理
文章:BlockingQueue(阻塞队列)详解
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。