赞
踩
众所周知,Java 提供的线程池(ThreadPoolExecutor)构造函数中有几个重要的参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
allowCoreThreadTimeOut
参数Java 自带了四种拒绝策略,分别是:
CallerRunsPolicy
:让调用者直接在自己的线程里面执行,线程池不做处理AbortPolicy
:线程池直接抛出异常(默认)DiscardPolicy
:线程池直接把任务丢掉,当做什么也没发生DiscardOldestPolicy
:把队列中最老的任务删除掉,把该任务放到队列中其实除了这几个构造参数以外,还有几个非常重要的参数需要我们理解。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
线程池的状态有五种,分别是 RUNNING(-1)、SHUTDOWN(0)、STOP(1)、TIDYING(2)和 TERMINATED(3),其关系如下图所示:
private final HashSet<Worker> workers = new HashSet<>();
通过 workers 参数可以发现线程池并未执行使用 Thread 去执行 Runable 任务,而是将二者封装成了一个 Worker 对象,方便对线程的复用以及状态控制:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 通过线程工厂创建的线程 final Thread thread; // 创建需要绑定一个任务,也是 worker 的第一个任务 Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
线程池有两个方法提交任务:
Runnable
入参Runnable
和 Callable
作为入参,同时支持返回值 Future
executor.execute(() -> {});
executor.submit(() -> {});
二者实现功能一致,我们以 execute()
方法实现为例,梳理线程池任务提交的流程:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
通过对 Worker 生命周期的详细梳理,理解线程池中 Thread 和 Queue 的关系,队列中的任务是如何被执行,以及线程在池中如何保活和销毁的。先来看一下三者的关系:
既然是线程池,那么总要先创建线程。为了了解线程池中线程的创建过程,我们不得不再次看一下 Worker 类的定义,先是继承了 AQS,再实现 Runnable 接口,即本身是一把锁,又是一个可执行任务。
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
执行线程池 execute()
方法提交任务时,如果需要创建一个新的工作线程,则会调用 addWorker()
方法:
private boolean addWorker(Runnable firstTask, boolean core) {
// ......
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// ......
t.start();
// ......
}
}
start()
方法启动线程。run()
方法。前面我们知道线程池提交任务创建工作线程后执行到 Worker 的 run()
方法,最终的任务执行逻辑在 runWorker()
方法中:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //释放锁,将state设置为0,允许中断任务的执行 w.unlock(); boolean completedAbruptly = true; try { //如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环 while (task != null || (task = getTask()) != null) { //如果任务不为空,则获取Worker工作线程的独占锁 w.lock(); //如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程 //大家好好理解下这个逻辑 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中断线程 wt.interrupt(); try { //执行任务前执行的逻辑 beforeExecute(wt, task); Throwable thrown = null; try { //调用Runable接口的run方法执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行任务后执行的逻辑 afterExecute(task, thrown); } } finally { //任务执行完成后,将其设置为空 task = null; //完成的任务数量加1 w.completedTasks++; //释放工作线程获得的锁 w.unlock(); } } completedAbruptly = false; } finally { //执行退出Worker线程的逻辑 processWorkerExit(w, completedAbruptly); } }
其核心逻辑非常简单,执行创建时添加的 firstTask
任务或不断调用 getTask()
方法从队列拉取任务执行,如果没有可执行的任务,则退出并销毁线程。
我们知道线程池中为了工作效率,非核心工作线程即使没有任务时也需要等待一段存活时间才会最终销毁,但前面的 runWorker()
方法显然没有这段逻辑,想必另有乾坤。
不卖关子,答案就藏在获取任务的 getTask()
方法中:
private Runnable getTask() { //轮询是否超时的标识 boolean timedOut = false; //自旋for循环 for (;;) { //获取ctl int c = ctl.get(); //获取线程池的状态 int rs = runStateOf(c); //检测任务队列是否在线程池停止或关闭的时候为空 //也就是说任务队列是否在线程池未正常运行时为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //减少Worker线程的数量 decrementWorkerCount(); return null; } //获取线程池中线程的数量 int wc = workerCountOf(c); //检测当前线程池中的线程数量是否大于corePoolSize的值或者是否正在等待执行任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果线程池中的线程数量大于corePoolSize //获取大于corePoolSize或者是否正在等待执行任务并且轮询超时 //并且当前线程池中的线程数量大于1或者任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //成功减少线程池中的工作线程数量 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //从任务队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //任务不为空直接返回任务 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
函数前面是一大堆标志位校验,以校验线程池和线程状态,之间看最后 try 代码块中获取任务的方法:当需要超时等待时调用了 BlockingQueue
的 poll()
方法,该方法支持获取队列元素时如果队列为空,则阻塞等待相应时间,如果还是没有数据则返回空。
而在阻塞队列中,通过 ReentrantLock 和 Condition 实现队列的阻塞。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
总结起来就是线程池的 Worker 工作线程在创建并启动后,会通过不断自旋和 BlockingQueue 的阻塞实现不停地作业和一段时间的存活。
并且还可以看出核心线程和非核心线程并无区分,只是维护一定数量的工作线程存活即可,并非先创建的就是核心线程。
通过对线程池 ThreadPoolExecutor 中成员变量的分析,我们了解到了 Runnable、Worker、Thread 以及 BlockingQueue 在线程池中的关系。
然后以任务提交 sumbit()
函数作为入口,探究了线程池中线程的创建、执行以及保活的实现,对线程池的工作流程和原理也有了很好的理解。
https://blog.csdn.net/JavaMonsterr/article/details/125992719
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。