赞
踩
线程池中的线程是如何复用的?
其实我们调用线程池的execute方法(ExecutorService的submit方法最终也是调用execute)传进去的Runnable,并不会直接以new Thread(runnable).start()的方式来执行,而是通过一个正在运行的线程来调用我们传进去的Runnable的run方法的。
那么,这个正在运行的线程,在执行完传进去的Runnable的run方法后会销毁吗?
看情况。
大部分场景下,我们都是通过Executors的newXXX方法来创建线程池的,就拿newCachedThreadPool来说:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
看第三个参数(keepAliveTime):60L,后面的单位是秒,也就是说,newCachedThreadPool方法返回的线程池,它的工作线程(也就是用来调用Runnable的run方法的线程)的空闲等待时长为60秒,如果超过了60秒没有获取到新的任务,那么这个工作线程就会结束。如果在60秒内接到了新的任务,那么它会在新任务结束后重新等待。
还有另一种常用的线程池,通过newFixedThreadPool方法创建的:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
它跟上面的newCachedThreadPool方法一样,创建的都是ThreadPoolExecutor的对象,只是参数不同而已。
可以看到第三个参数设置成了0,这就说明,如果当前工作线程数 > corePoolSize时,并且有工作线程在执行完上一个任务后没拿到新的任务,那么这个工作线程就会立即结束。
再看第二个参数(maximumPoolSize),它设置成了跟corePoolSize一样大,也就是说当前工作线程数 永远不会大于 corePoolSize了,这样的话,即使有工作线程是空闲的,也不会主动结束,会一直等待下一个任务的到来。
源码分析:
我们来探究一下ThreadPoolExecutor是如何管理线程的。
先来看精简后的execute方法:
private final BlockingQueue workQueue;
public void execute(Runnable command) {
int c = ctl.get();
//当前工作线程还没满
if (workerCountOf(c) < corePoolSize) {
//可以创建新的工作线程来执行这个任务
if (addWorker(command, true)){
//添加成功直接返回
return;
}
}
//如果工作线程满了的话,会加入到阻塞队列中
if (workQueue.offer(command)) {
int recheck = ctl.get();
//加入到队列之后,如果当前没有工作线程,那么就会创建一个工作线程
if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
}
逻辑很清晰:当execute方法被调用时,如果当前工作线程 < corePoolSize(上面ThreadPoolExecutor构造方法的第一个参数)的话,就会创建新的线程,否则加入队列。加入队列后如果没有工作线程在运行,也会创建一个。
好,接着看它是怎么创建新线程的:
private final HashSet workers = new HashSet<>();
private boolean addWorker(Runnable firstTask, boolean core) {
//再次检查
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= corePoolSize)
return false;
boolean workerStarted = false;
Worker w = null;
//创建Worker对象
w = new Worker(firstTask);
//添加到集合中
workers.add(w);
final Thread t = w.thread;
//启动工作线程
t.start();
workerStarted = true;
return workerStarted;
}
主要是创建Worker对象并启动它里面的线程,来看看Worker里面是怎么样的:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
可以看到,这个Worker也是一个Runnable。构造方法里面还创建了一个Thread,这个Thread对象,对应了上面addWorker方法启动的那个thread。
再看run方法,它调用了runWorker,并把自己传了进去:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
w.completedTasks++;
}
}
}
Worker里面的firstTask,就是我们通过execute方法传进去的Runnable,可以看到它会在这个方法里面被执行。
执行完成之后,接着就会通过getTask方法尝试从等待队列中(上面的workQueue)获取下一个任务,如果getTask方法返回null的话,那么这个工作线程就会结束。
最后来看看getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int c = ctl.get();
int wc = workerCountOf(c);
//如果当前工作线程数大于指定的corePoolSize的话,就要视情况结束工作线程
boolean timed = wc > corePoolSize;
//(当前工作线程数 > 指定的最大线程数 || (工作线程数 > 指定的核心线程数 && 上一次被标记超时了)) && (当前工作线程数有2个以上 || 等待队列现在是空的)
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
return null;
}
//如果当前工作线程数大于指定的corePoolSize,就看能不能在keepAliveTime时间内获取到新任务
//如果线程数没有 > corePoolSize的话,就会一直等待
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
//没能在keepAliveTime时间内获取到新任务,标记已超时
timedOut = true;
}
}
emmm,就像刚刚说的那样,如果是newCachedThreadPool的话:
核心线程数 0;
最大线程数 Integer.MAX_VALUE;
空闲线程存活时间 60秒;
那么当执行到getTask方法时,里面的timed肯定每次都是true的,也就是每次获取任务的时候,最多只能等60秒,如果60秒内没有获取到新的任务,那么getTask就会返回null(工作线程会结束)。
像newFixedThreadPool,如果我们传的是5:
Executors.newFixedThreadPool(5)
那么,它的参数是:
核心线程数 5;
最大线程数 5;
空闲线程存活时间 0秒;
在判断当前工作线程数是否大于核心线程数的时候,肯定就是false了,因为在前面提交任务的时候,就已经有判断:小于核心线程数才创建新的工作线程。
timed是false的话,从workQueue中取任务的时候,调用的就不是poll方法,而是take方法,这个take方法会一直阻塞,直到拿到元素为止。
吃饭去。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。