赞
踩
今天来看看JDK下的线程池- ThreadPoolExecutor 的一些源码分析。
纯手打码字硬肝1天1w+字,有一些可能笔误或者理解有误的地方欢迎大家指出。谢谢。
看完本文你可以了解到:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 200, 30, TimeUnit.SECONDS, new LinkedBlockDeque<>(500)
);
这段代码很简单,就是new了一个线程池出来,几个参数分别是:
corePoolSize
(10):线程池的核心线程数,即线程池中保持活动状态的线程数。即使线程池处于空闲状态,核心线程也不会被回收。maximumPoolSize
(200):线程池允许的最大线程数,包括核心线程和非核心线程。当工作队列已满并且活动线程数小于最大线程数时,线程池会创建新的线程来处理任务。keepAliveTime
(30):非核心线程的空闲时间超过该值时,线程会被回收销毁,直到线程数量减少到核心线程数为止。unit
(TimeUnit.SECONDS):空闲时间的单位,这里是秒。workQueue
(new LinkedBlockingQueue<>(500)
):用于保存等待执行的任务的阻塞队列。这里使用了一个容量为 500 的 LinkedBlockingQueue
,它是一个基于链表的无界队列。实际上我们打开源码可以看到,这个它要的是 BlockingQueue
也就是阻塞队列
Java里面的队列可以分为阻塞和非阻塞,要new线程池的话,最好传阻塞队列(不过其实你给他传个非阻塞队列也能创建出来)。后面会讲到这是为什么。
这里有个问题:这个线程池new出来了对象,这个线程池里面,现在有几个线程呢?
答案是0。
来看这个ThreadPoolExecutor
构造方法的源码:
我们可以看到,它并没有什么创建线程的操作,它就做了一些基本参数的判断和赋值。
这和 Tomcat还是有点不一样的。Tomcat也有个线程池嘛。
我们也去看看源码:在Tomcat的 AbstractEndpoint
的类下,有个 createExecutor()
方法,这里面有个方法。
new org.apache.tomcat.util.threads.ThreadPoolExecutor(基本参数);
这个线程池命名和JDK的一模一样
我们点进去看看,它的实现基本和JDK一样(甚至可以说是照抄,不过高一点的版本的Tomcat已经改了,但是也大差不差的)。有个很关键的区别在于:
Tomcat在创建完线程池之后,马上就会启动一定量的线程。
另一个区别是 Tomcat 的线程池可以在运行时动态调整线程的数量。它可以根据当前的请求负载情况,自动增加或减少线程数量,以适应不同的并发需求。这个特性在高并发场景下非常有用。
回到JDK这边。
那到底什么时候线程池才会去创建线程呢?
我们接着往下看:
线程池创建出来之后,我们可以往它里面去提交任务,去执行了。
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("abc...");
}
});
而 execute
方法就会去做很多事情了。
打开源码看看:
可以看到它会先获取现在线程池中线程个数 workerCountOf(c)
拿它跟 核心线程数来比较
如果小于,那么它直接调用 addWorker()
方法。
到这里也能回答一个问题:如果现在构造了任务2,任务1已经完成了,是先去拿线程1呢执行呢 还是去把线程2创建出来执行。
答案是创个新的。 也就是说,虽然现在是有空闲的线程,但是没到核心线程数,它就会直接去创新的。
另外 addWorker()
的两个参数蛮留个心眼:
true
我们进到addWorker()
方法看看:
前面半部分是做一些状态和参数的判断 那部分代码我们先不看,直接从new Worker()开始。
它会new 一个Worker对象出来,而这个构造方法中,就会通过线程工厂,去获取一个新的线程。
这三个参数分别是:
也就是说 每个 Worker
对象 都会有一个自己的线程。而线程池真正调度的 实际上就是这个 Worker
对象
我们往上拉,可以看到Worker继承了AQS
并实现了Runnable
接口
AQS
可能一下子有些朋友不知道是干嘛的。
实际上我们常见的 ReentrantLock
锁 实际上就是使用AQS
作为基础。在ReentrantLock中,锁的状态由AQS的state字段表示,0表示锁是可用的,非0表示锁被占用。(当然 这只是一个类似口头约定,不是硬性要求的。就是说 具体它 0表示什么 1表示什么 2表示什么 还是要看具体的代码实现的)
那回到addWorker()
方法中,它会比对一大堆参数,然后调用线程的start()
方法
而了解Runnable
接口的朋友都知道,实现了该接口,线程调用start()
方法时,是会去调用run()
方法的。而Worker
类是重写了该方法的:
昂看的出来它的关键其实是在这个 runWorker()
方法中,但是这个方法我们后面再看再说。
我们知道线程其实执行完闲着就会等待系统回收资源,那我们肯定不能让它被回收的。要是用完就给回收消失,线程池就没有存在的意义了。所以这些空闲的线程要怎么保活呢?
这下面是一段伪代码:
当线程执行完自己的任务之后,它就去线程池中的阻塞队列执行poll方法。
阻塞队列的特点是这样的:如果有任务,线程来了poll了就去干活,没有任务就等着。
所以 线程就通过这种 一边等任务,一边让自己活下来的方式存活。
那现在有好几个线程是闲着的,比如 t1 t2 t3 。来了第11个任务。会给谁呢?
这个可以理解为随机。。
这个具体实现 不是本文的内容了,因为这个任务的分配,具体是阻塞队列里面的poll方法干的。它里面会有一些cas的操作。
这也是为什么要阻塞队列的原因:从JDK开发者的角度,他们的两大难题:
都伴着阻塞队列的特性一起解决了。非常聪明。
感叹一下开发者们的智慧,也是我们分析源码的意义,学习前人的开发经验和模式,以便更好地理解和应用现有的软件框架和技术。
addWorder
方法 去创建一个Worker
对象,并赋予它线程。有的朋友可能就会问了:为什么不直接分配给线程去干呢。
因为实现起来麻烦,通过这种类似监听一样的方式,利用阻塞队列的机制,一边又能实现线程的保活,一边还能实现任务的直接分配,非常优雅。
好继续。
我们在创建的时候,会分配阻塞队列的容量。比如我们上面的代码是500
假如现在500满了,也就是队列放不下了,我们的10个线程也在忙。
这时候又来任务,它怎么处理呢?
还是配合源码来看:
workQueue.offer(command)
尝试放进队列,但是放不进了。那它就走进下面的else
!addWorker(command, false)
它又去addWorker了。
注意:一定是队列满了,它才会去创建核心线程数以外的线程。
从本质上来讲。这是不公平的实现。为什么?
你想想,队列是满的,也就是说它起码是第501个任务。
而第11个任务都还没开始,它第501个任务就已经在弄了。
确实是设计上的一些缺陷。但是没有办法。
这时候就要看我们设置的最大线程数了。我们上面的代码是给了 200
那除去核心线程数的10个,也就是说 最多最多我们还能再创建190个临时线程。
那要是现在创到了200个线程了,还接着来任务 线程池会怎么做。
接着看源码:
前面提到过addWorker
的参数,除了任务,还有一个Boolean值
它只用在了一个地方,就这里。
前面我只截了后半部分的代码,也就是new Worker()
还有 线程.start()
的过程
前半部分如上图所示。
wc
是现在线程池中的线程数。当它大于 maximumPoolSize
也就是最大线程数的时候,它直接
return false;
后面的 线程.start就不会执行了。
并且它会拒绝你提交的任务。
进到这个rejectedExecution
中
我们可以看到它是有4个实现,这就对应着线程池的4种拒绝策略。
我们还是看源码:
前面的所有方法的调用啥的,其实都是主线程在调用的。而这个r.run()
实际上就是让调用的那个线程去执行这个任务。说白了就是主线程去做这个任务。这也是一种线程池的拒绝策略,因为是由我们自己的主线程去干这个活了,没有让线程池里面的线程做。
从名字也能读出来 丢弃最旧的。
这种策略是把阻塞队列里面最早来的 还没有线程处理的,直接淘汰掉,然后新的放到队列的末尾。
直接丢弃 啥都不做
一般我们就是前两种用的比较多,要么抛异常,要么就让调用者线程去执行。
前面我们说到了,临时在任务比较多,核心线程不够用的情况,我们会去创建新的线程。其实这个问题不用想也能知道答案 耗费了一些资源创建的线程,怎么能让他说散了就散了。跟核心线程一样,它们也是回去阻塞队列那边盯着,有任务就接着干。
那没有任务的话嘞,或者说 任务已经做完的情况呢?
我们创建的核心线程数是10嘛。那现在有10多个线程了,在没任务的情况下,肯定是不能让它们继续在这边呆着了。那我们还淘汰掉哪几个线程呢?一定是后面来的那几个线程吗?
其实并不是的。反正它就是根据去掉两个。因为不管是临时线程还是核心线程,不都是线程。都是addWorder()
出来的线程。是一样的。
也就是说,假设 t1 - t10 是原来的核心线程,然后任务比较多的时候,又创建了 t11、t12、t13。那现在这13个线程要去掉三个,它们本质上是一样的,所以去哪三个无所谓,留下的10个,就是核心线程。
它又是怎么淘汰的呢?那么继续往下
前面我们说到Worker
类是实现了Runnable的接口。一个类实现了Runnable接口并创建了该类的实例后,需要将实例传递给Thread的构造方法,等调用Thread对象的start方法,它会去调用Runnable对象的run方法。
线程会调用实现了Runnable接口的类的run方法来执行任务的逻辑。可以将任务的逻辑代码放在run方法中,线程启动后会自动调用该方法,执行任务。不是直接调用run方法,而是通过start方法启动线程,让系统自动调度和执行run方法。
我们前面也提到了 Worder
类中的run()
方法 是直接调用了 runWorker()
方法。所以关键都在这个方法里面。
那么我们直接来看看它的源码:
代码第二行的这个firstTask
很眼熟吧,就是我们给它的第一个任务,我前面提到过。它一定是有任务 才会创建,这个第一个任务它执行完了,才会到后面。
再来梳理一下:创建这个worker
对象绝对是有任务,有任务,这个fistTask就绝对不为空 那么这个 while (task != null )
(第二个判断先不看)就一定会进去。 可以理解吧。
然后就是咔咔一顿操作。就到了这个task.run()
这是什么会知道吗?
这个就是我们 execute 的 run 方法了 也就是我们丢进去的任务。
可以看到 task.run() 的上面两行和后面几行 有个
beforeExecute(wt, task)
和 afterExecute(task, thrown)
这是切面的一些拓展。就是你想在这个任务前后做点什么 可以去写它们俩。
我们注意到 这个在最后的finally中,task它会变为null。
然后外层有个while循环,就要看第二个条件了。
while (task != null || (task = getTask()) != null)
就是又去重新获取任务,如果获取到了,那就接着干活。以此循环。
可以看到 判断条件为:
task = getTask() != null
那是不是就意味着,当getTask()
的返回值为null的时候,这个线程就循环结束 然后执行后面的一些 finally块的代码
这个线程也就消亡了对吧。
那我们是不是就可以通过控制这个null的返回时机,来控制线程的消亡。
比如:现在有15个线程,它们做完任务就在这边循环等待,我们想去掉多余的5个线程,那直接返5个null就完事了。
而我们点进来看这个getTask()
方法 我直接把源码贴进来:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
注意看第二行:for (; ; ) 它是被一个for循环包裹着的
现在先来分析一个情况,现在我有10多个线程,假如是15个。我们想象一个极端的情况,假如它们同时完成了各自的任务,然后一起过来执行getTask()
方法。
可以看到它会获取当前线程数 那这15个线程得到的结果就都为15。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
allowCoreThreadTimeOut
大家可以先当它一直为false ,我们只要看后面的
所以timed
= 当前线程数 是否大于 该线程池核心线程数 15 > 10 肯定是true现在。
看方法的第一行,timedOut
先是 false
maximumPoolSize
是最大线程数。也就是我们开始设定的200
那么 这个if 它就进不去了。(这图通俗易懂有没有)
它就直接执行后面的try块
阻塞队列的这两个方法的差别是
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
是一个具有超时特性的方法。它将等待指定的时间(keepAliveTime)来获取元素,如果超过指定时间而队列仍为空,则返回null。workQueue.take()
是一个阻塞方法,如果队列为空,它将一直等待,直到有元素可用为止。timed
是true
嘛现在 所以它会执行 poll
方法。
而我们这15个线程都在等着这个poll方法。假如现在一直是没任务,那么过了这个超时时间,它们都会返回一个null,然后就会执行
而这个timedOut改变后,它就进入下一次循环了。
而随着这个 timedOut改变 这个if 就会满足了。 timed && timedOut
那么就会执行它:
compareAndDecrementWorkerCount(c)
点进这个compareAndDecrementWorkerCount()
方法 可以看到 compareAndSet 都不用再看,从名字也能知道它是怎么一回事。CAS操作。
也就是说,这15个线程 都会一起来执行这个CAS操作。
CAS
有些朋友可能不知道 简单提一下:
就是它们会去竞争,是用的乐观锁的那种对比值的想法。让这个数量去-1。而且只会有一个线程能成功。
成功的返回null
。而这个 Return 到哪里还记得吗?
是 runWorker()
方法中调用的。
而现在返回了null
,那我不就等于退出循环,然后执行后面的代码,这个线程也就消失了。剩下的14个线程就continue
接着循环,然后接着取wc 也就是当前线程池中的线程数14,然后再判断,然后还是会进if 然后还是去CAS
。没成功的 接着循环,直到wc=10;就不再进if了。而
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
timed 由于 wc不再大于最大线程数。那它就会为 false
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
然后它就去执行 阻塞队列的workQueue的take
方法 无限等待。
当阻塞队列中没任务了,线程就一直循环,每次循环都是一次CAS操作,销毁掉一个线程,直到剩下10个 为核心线程。然后去阻塞队列中无限阻塞等待新任务。也就能解释线程是怎么保活的。
优雅 真的优雅。用这么短短不到30行的代码,就实现了这么巧妙的设计。
如果线程在执行过程中出异常了。比如报了个NPE
那请问这个线程是继续活着还是消亡呢。我们继续看源码
还是一样 我把runWorker
的部分源码直接贴过来(删掉了一些暂时无用的行,后面提到了相关的点 我会再附源码):
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { try { try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } finally { afterExecute(task, thrown); } } finally { task = null; } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
可以看到刚进来的第四行
boolean completedAbruptly = true;
然后进入while循环。而上面我们刚讲的, getTask()
的结果 直接决定这个while是否结束。
所以它有两种情况会结束:
getTask()
返回null
正常结束大家看倒3行的 completedAbruptly = false;
它的位置很巧妙,如果是正常退出,它就会将completedAbruptly
改为 false
如果报错了,异常就会被catch捕获然后finally对吧。 completedAbruptly = false; 就不会执行到,直接执行倒二行的 finally
也就是说 最后一行的 finally块的代码可以理解为:
//如果报错,出异常
processWorkerExit(w, true);
//如果正常退出
processWorkerExit(w, false);
也就是这个线程在结束之前,做的最后一个事情。
那它具体做了什么事,我们接着看源码
我们主要看异常的情况,也就是说暂时就当它 completedAbruptly = ture;
那代入到代码中,它是不是就等于不会走 里面那层if块。
而在最后 你会看到,这段代码它执行了addWorder()
所以:当任务出异常了,它退出前,它会去再补一个线程回线程池。
大家仔细想想,其实除了正常退出和异常退出销毁掉线程。还有一种也会让线程关闭,就是线程池关闭的时候嘛。线程池都关了,里面的线程肯定也要关掉。
两个都是关闭线程池的方法
shutdown()
是比较慢点的,它会把阻塞队列中的任务执行完了,再关闭shutdownNow()
是不管阻塞队列有没有任务,直接关闭这里我先问个问题,线程要怎么关闭?所谓的kill掉线程嘛。
在Thread里面确实有个 stop() 方法。
但是不建议这么用。实际上Java已经不推荐使用了,标记了@Deprecated。主要是因为我们不知道这个线程执行到哪里的,如果直接杀掉,是很危险的。而且还有一点,stop是会释放线程占用的synchronized锁,但是不会自动释放ReentrantLock锁,这也是它危险的一个点。
现在我们比较优雅、比较常用的方式是 中断
这两个代码的是怎么实现完成任务或不管有没有任务的?
还有正在执行任务的线程是继续执行任务,还是直接关闭?
我们都可以在源码中找到答案(这次是没删减 纯源码):
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
可以看到两个方法基本一样。可以看到其中一个区别是 advanceRunState
状态的区别。 一个是 SHUTDOWN
一个是 STOP
而这两个对应的值分别为 0 和 1
而在设置完这个状态后,它们都会去执行 发中断信号操作 的方法
interruptIdleWorkers()
这个中断怎么理解,它其实只是发了个中断的信号。给你说 我们停工了。
并不是直接让这个线程消失了或者别干了。
我们回头看runWorker
的 while循环
它在这里 会判断你是不是执行了中断。
注意它的位置,它是在 task.run()
之前的。
前面我们说过我们执行任务其实就是在这个 task.run()
里面执行的嘛。也就是说 如果现在有线程在执行任务,你发了这个中断信号,其实并不会直接让已经在干活的线程停止。除非你这个任务执行的本身里面,有做中断信号的检查。比如你这个run任务里面,有个
if(是否中断){
return;
}
不然,它都是会执行完的。
只有等它这次正常执行完了,下次再进到下一次while循环,取下一个任务也就是getTask()
后,才会去关注一下是否状态是否中断
我们再进到 getTask()
方法中:
比如现在是 shutdown()
的,线程池的状态会为SHUTDOWN
也就是0,0比STOP的1小嘛,所以它会继续去判断你这个队列 是不是空的。
如果是空的,那就执行decrementWorkerCount();
并且return null;
decrementWorkerCount();
return null;
而如果你现在是 shutdownNow()
那它的状态会为STOP
那么这个if 它就直接成立了。因为 1>=1 嘛。 所以,后面的workQueue
是不是Empty()
它压根不管,直接return null
这也就回答了两个问题:shutdown和shutdownNow是如何实现它们的特性的? 同时还能明白 在执行的线程是停下手头的事直接断掉,还是等做完再断掉?
shutdown()
它会把阻塞队列中的任务执行完了,再关闭。
而 shutdownNow()
则会等现在手头上的事一做完,就直接结束。
一行代码就实现了两个状态的区分,优雅我已经说不过来了。
之前我们了解到,如果线程没任务做,那它就会无限阻塞。workQueue.take()
那现在阻塞了,压根不动,那谈什么循环去判断信息,怎么去销毁线程什么的。
实际上 当一个线程处于阻塞状态(如调用了 Thread.sleep()
、Object.wait()
、BlockingQueue.take()
等方法)或者被阻塞(如调用了 synchronized
块)时,如果另一个线程向其发送中断信号,即调用了 Thread.interrupt()
方法,被中断的线程将会被唤醒,并且抛出 InterruptedException
异常。
我们这些线程现在是阻塞状态对吧,然后你发了这个中断信号,它就会被catch块捕获,然后 timedOut 就 = false;
然后进入下一次循环。剩下的逻辑就和正常退出是一样的了。
而这个中断信号是在哪里发的,其实就是在showdown
和showdownNow
两个方法设置状态后的 interruptIdleWorkers()
方法中做的。
两个方法的实现是略有差别的,但是都大同小异。
无非就是遍历了所有Worker 每个worker里面都有个线程嘛 它就是每个线程都调用一下 t.interrupt()
就是发中断信号嘛。
总结一下:
就是线程只有两种状态嘛:一种是在执行任务,一种是在队列阻塞等待任务。
在执行任务的,执行完手头上的任务后,再去拿任务,会根据状态 看是做完别的任务走 还是直接走。
而在阻塞等待的,其实等于是被中断信号唤醒了,然后也是去走一边拿任务的路,然后走到那个位置做判断,然后退出销毁。
另外,有个很有意思的点不知道大家有没有注意到:
runWorker()
这个方法中,它上来就unlock()
,这是为什么。这个其实跟这个线程中断有点关系。
不知道大家对Worker()
这个构造方法还有没有印象:
前面我们提到过Worker
类是继承 AQS的。并且在这个构造方法中把State
设为了 -1
0表示目前无锁或者说是还没上锁,1表示有锁。-1就是最开始的状态,可以理解成 它线程都还没开始运行。
对于一个线程而言,如果都还没运行它,那还谈什么中断 对吧。
实际上 这个worker
从被new出来,构造方法里面去setState
、赋予线程,到真正的start线程
中间跨度是非常大的(看图)
所以是不是有个可能,比如现在worker对象new出来了,线程还没启动呢,线程池就要关闭了。
而它现在都没start,我们还有没有必要像刚刚那些别的一样,去发送一些中断的信号呢?
其实是没有这个必要的。
实际上我们看线程池的 shotdownNow()
方法,它在设置状态为 STOP 之后,会去调用中断线程,在这个方法中,它就会去判断worker
的state
是否大于等于0
我们跟着源码来一遍:
worker调用unlock 其实是调用 release(1)
方法
在 release()
方法中调用 tryRelease
方法
而这个tryRelease
方法 是有被重写的
也就是说 它调用的 release(1)
实际上调用到的是自己重写的 tryRelease
方法 所以说白了它unlock()
就是调用自己重写的这个方法。
而你看这个方法 它不论你传什么参数过来,state都为0。
所以它第一次调用unlock()
实际上 state 是从 -1 变成 0
要知道的一点是:runWorker方法如果运行了,就说明这个线程已经开始运行了。因为只有线程 start()
之后,它才会去调用实现类的run
方法 从而才调用到worker的runWorker
方法。这个得明白 不然这一块不好理解。
所以它只要值不是-1,就说明这个线程是启动了的。
而runWorker
方法中的while
里面的 lock 和 unlock 实际上就是 0改为1 1改为0的过程。也就是加锁解锁的过程了。
再说一遍:
线程start
之后,进来的第一次unlock,state是从-1到0 表示 我这个线程开始运行了。
后面的所有加锁解锁 就是正常的锁逻辑 0->1,1->0的过程。在这里面 可以保证并发的安全的。
所以去判断 >=0了 我们才去中断它。<0也就是 = -1的时候 说明它都没运行,那我们去中断它干嘛呢。
不过:
其实去掉-1这个状态,也可以。。就是说 不管它是 -1 0 还是1。
都给它发中断信号,其实是不会报错的,也就是没问题的。
但是按照逻辑来说没有启动的线程 就不用去中断它,虽然好像直接中断也没什么问题。只能说可能有什么性能的影响吧。
到这里也就差不多了,如果有什么不对的地方,欢迎评论指正,谢谢。
如果这篇文章能对你有些帮助的话,帮帮点点赞哈哈哈哈。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。