赞
踩
池化思想:线程池、数据连接池等,比如我们 Spark 的 Executor 就是典型的线程池,用户在启动 Spark 作业的同时启动线程池,这样 Spark 的 Task 就可以直接获取资源,而不用像 MR 程序那样等待容器上的进程开启了。
如果不使用线程池的话,我们需要:
优点:
我们先看看 ThreadPoolExecutor 的构造器的源码:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime); // 统一转为亚秒格式
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
参数大概意思就是:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列(存放任务)、线程创建工厂、拒绝策略。我们也可以通过下面的例子理解一下:
线程池就好比我们的银行,它有很多个接待客人的柜台(一个柜台就是一个线程,用来执行任务)和用来供客人等待的座位(等待的执行的任务)。上面的图中,我们有 5 个柜台(对应第2个参数:maximumPoolSize) ;而其中绿色的三个柜台代表常驻柜台(也就是一直都有人,对应第1个参数:corePoolSize);红色的柜台代表预备柜台,也就是当业务繁忙的时候,最多还可以打电话摇两个人来(maximumPoolSize - corePoolSize);但是摇人来帮忙是有时效性的,如果帮完忙一段时间(取决于第3个和第4个参数)没有活干,这些线程就会被释放;摇人的方式取决于第6个参数(下面我们是通过默认的线程工厂来再创建线程的);蓝色的等候区区域代表座位(可允许等待的任务数量),当柜台前和座位都满了的时候,如果再有任务进来就会被拒绝,具体的拒绝方式取决于第7个参数,下面我们给出的拒绝策略是直接报错:
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- public class TestThreadPool {
- public static void main(String[] args) {
- // 1.核心线程数 2.最大线程数 3.存活时间 4.时间单位 5.等待队列 6.线程工厂 7.拒绝策略(直接抛异常)
- ExecutorService executorService = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
- for (int i = 0; i < 9; i++) {
- executorService.execute(()->{
- System.out.println(Thread.currentThread().getName()+"正在工作");
- });
- }
- // 关闭线程池
- executorService.shutdown();
- }
- }
上面的案例中,我们的线程池的最大接收的任务量是 8 (最大线程数:5 + 等待队列容量:3),但并不是说只能跑8个任务,如果有任务释放资源仍然可以继续执行任务。
所以,上面的案例同一时刻最多只能共存8个任务(其中最多只有五个任务会同时执行),如果,当我们的任务超过8时,会直接报错(因为我们设置了拒绝策略就是直接抛异常)。
创建一个包含 10 个核心线程、总线程数为 20、存活时间为 0 s、拒绝策略为直接抛异常的一个线程池对象:
- public static void main(String[] args) {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 10, 20,
- 0L, TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
- );
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
-
- }
- });
- }
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) { // 如果总线程数小于核心线程数
- if (addWorker(command, true)) // 添加一个核心线程
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) { // 如果当前全部线程都在工作就把任务添加到阻塞任务队列当中
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false)) // 如果上面没有成功把任务加到队列就新加一个线程
- reject(command); // 如果新加线程失败(比如超过最大线程数)就拒绝任务
- }
这里的注释已经说的很明白了:
如果任务能够成功入队,我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有线程死亡)或者线程池在进入此方法后已经关闭。所以我们重新检查状态,如果有必要回滚入队操作(如果已停止),或者如果没有线程,则启动一个新线程。
如果我们无法将任务入队,则尝试添加一个新的线程。如果添加失败,我们知道线程池已经关闭或饱和,因此拒绝该任务。
所以在上面我们创建线程池的代码中,并不是线程池创建好之后就会立马创建 10 个核心线程,而是真正有任务来的时候才会去新创建一个线程。
思考:如果线程的任务结束了,线程对象会怎么样呢?
从创建线程池的构造器就不难想到,构造器中有一个阻塞队列的参数,其实当线程没有任务的时候,线程并不会关闭,而是一直阻塞,也叫保活。
保活线程的关键在于阻塞队列,即LinkedBlockingDeque。当队列为空时,如果线程尝试从队列中取元素,线程会被阻塞,直到队列中有元素可供取出。这样,线程就会在等待任务的过程中保持活跃状态。
getTask 方法是 runWorker 方法里的调用的,而 runWoker 又是 Worker 对象的方法,这个 Worker 实例又是上面的 execute 方法中的 addWorker 方法中实例化出来的。
getTask 方法是所有线程池中的线程一直在不断调用检查的(在 runTask 方法中的 while 循环中被调用):
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
-
- for (;;) {
- int c = ctl.get();
-
- // Check if queue empty only if necessary.
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
-
- // 总线程数
- int wc = workerCountOf(c);
-
- // 判断是否需要超时回收线程
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 回收超时的线程
- workQueue.take(); // 阻塞
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
方法解释:
如果满足以下任一条件,该工作线程将返回null并退出:
返回值:
如果满足上述条件之一,返回null,表示工作线程必须退出,此时workerCount会减1。
否则,返回task,表示成功获取到任务。
从源码中可以看到,线程池中的工作线程会执行getTask()方法来获取任务。在这个方法中,线程会调用workQueue.poll()或workQueue.take()方法来尝试从LinkedBlockingDeque中获取任务。如果队列为空,这些方法会使线程阻塞,直到有新的任务添加到队列中。这就是线程在没有任务执行时仍然保持活跃(保活)的机制。
思考:现在的核心线程数是 10,如果此时正在工作的线程有 11 个(10个核心线程,一个其它线程),那如果所有线程的任务都完成了,那么线程池又会执行怎样的逻辑呢?
在上面的 getTask 方法中,线程池中的每个线程在调用这个方法的时候都会判断是否需要回收线程:
- // 判断是否需要超时回收线程
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
现在我们有 11 个线程,所以明显总线程数 wc(11) > corePoolSize(10),所以自然会执行下面的逻辑:
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
也就是 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):当前调用该方法的线程会阻塞一段时间(keppAliveTime 个单位),如果这段时间过后依然访问不到任务,那么下面的 timeOut = true ,getTask 方法返回 null。
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- try {
- task.run();
- afterExecute(task, null);
- } catch (Throwable ex) {
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
因为 while 中的条件均为 false,所以 runWorker 会先执行下面的 completedAbruptly = false; 然后执行 finally 中的 processWorkerExit(w, completedAbruptly); 这里的 processWorkerExit 方法正是回收线程的关键所在,如果没有它,我们的所有空闲线程就都将被回收,所以这个方法中就定义了回收的逻辑:
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
-
- tryTerminate();
-
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c) >= min)
- return; // 线程直接结束
- }
- addWorker(null, false); // 本线程结束,再新建一个线程
- }
- }
比如现在我们一共有 11 个线程(10个核心线程),那么 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; (暂不考虑 allowCoreThreadTimeOut)的结果就是 10,所以这个 11 个线程在执行 if (workerCountOf(c) >= min) 的时候,第一个执行该语句的线程会直接 return,之后执行该判断语句的 10 个线程由于不满足条件,但是自己又存活不了,所以在自己结束之前,会再创建一个没有初始任务的新线程,这样就保证了线程池中的线程数总是不低于核心线程数的。
下面是 ThreadPoolExecutor 源码中对于常量 ctl 的解释:
主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,表示线程的有效数量runState,表示是否正在运行、关闭等。
为了将它们打包为一个整数,我们将workerCounts限制为(2^29)-1(约5亿)个线程,而不是(2^31)-1(20亿)个其他可表示的线程。如果这在未来是一个问题,可以将变量更改为AtomicLong,并调整下面的移位/掩码常数。但是,在需要之前,使用int,此代码会更快、更简单。workerCount是允许启动但不允许停止的工人数量。该值可能与实际活动线程数暂时不同,例如,当ThreadFactory在被请求时未能创建线程,以及当退出的线程在终止前仍在执行记账时。用户可见的池大小报告为工作者集的当前大小。
runState提供了主要的生命周期控制,取值为:
- RUNNING:正常接受新任务和处理排队的任务
- SHUTDOWN:不接受新任务,但处理排队任务
- STOP:不接受新建任务,不处理排队任务,并中断正在进行的任务
- TIDING:所有任务都已终止,workerCount为零,转换到状态的线程TIDING将运行terminated()钩子方法terminated:terminated。
runState随时间单调增加,但不需要达到每个状态。转换为:RUNNING->SHUTDOWN On invocation of SHUTDOWN()(RUNNING或SHUTDOWN)->STOP On invocationof shutdownNow()SHUTDOWN->TIDING当队列和池都为空时STOP->TIDIING当池为空时TIDIING->TERMINATED当TERMINATED()钩子方法完成时等待awaitTermination()的线程将在状态达到TERMINATED时返回。检测从SHUTDOWN到TIDING的转换并不像您希望的那样简单,因为队列在非空之后可能会变空,反之亦然,但我们只能在看到它是空的之后,看到workerCount为0时终止(这有时需要重新检查——见下文)
线程池的五大状态从源码中可以看到:
这里的 COUNT_MASK 代表的是线程池的最大数量,也就是 2 的 29 次方 -1 个。
Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:
所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如 ctl 为:11100000 00000000 00000000 00001010,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。
把线程状态放到高3位,把线程数量放到剩下的29位的好处就是方便之后频繁修改线程个数。
下面是线程状态的转换关系:
- public static void main(String[] args) throws InterruptedException {
- Thread t1 = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true){
- System.out.println("线程 t1 在执行任务");
- }
- }
- });
-
- t1.start();
-
- Thread.sleep(3000);
-
- t1.interrupt();
-
- System.out.println("线程 t1 被阻塞了");
- }
事实上,上面的代码并不会真正中断线程 t1 ,因为 interrupt 只是一个信号,线程要不要停掉还是取决于它自己。所以要想真正打断线程,需要线程自己判断:
- Thread t1 = new Thread(new Runnable() {
- @Override
- public void run() {
- while (!Thread.interrupted()){ // 如果线程未被中断
- System.out.println("线程 t1 在执行任务");
- }
- }
- });
这样才能真正的关闭线程,否则 interrupt 方法没有意义。
线程池的 shutdown 方法会把线程池状态设置为 SHUTDOWN ,但是 SHUTDOWN 状态并不会立即关闭线程池中的所有线程,而是会等任务执行完毕再关闭线程,毕竟直接关闭线程可能会出现一些安全问题。
如果希望立即关闭线程,需要调用 shutdownNow方法,这个方法会把线程池的状态设置为 STOP。
这里的 onShutdown 方法是个空的(它是提供给子类用的),这里不需要关心。,而下面的 tryTerminate 才会对线程池的状态进行修改,这才是我们需要关心的:
同样,这里的 terminate 方法其实也只是一个空方法,它也是留给子类在需要时使用的,就相当于一个扩展机制,以后要是再 TIDYING 状态之后希望对线程池进行什么操作,就重写它就可以了。
上面代码中的 isRunning 方法也特别简单:
只要当前的线程池状态小于 SHUTDOWN 就是 RUNNING 状态。
首先了解乐观锁:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (int c = ctl.get();;) {
- // Check if queue empty only if necessary.
- if (runStateAtLeast(c, SHUTDOWN) // 如果当前线程池的状态 >= SHUTDOWN
- && (runStateAtLeast(c, STOP) // 并且当前线程池的状态 >= STOP
- || firstTask != null // execute(command) 的command为null代表不携带任务
- || workQueue.isEmpty())) // 任务队列为空
- return false; // 返回false代表创建线程失败
-
- for (;;) {
- if (workerCountOf(c) // 线程池的总线程数
- // 在execute方法源码中,当当前线程数<核心线程数是addWorker的core=true
- >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 是否大于核心线程数或者总线程数
- return false;
- // 当有多个线程同时执行compareAndIncrementWorkerCount方法时,只会有一个返回true
- if (compareAndIncrementWorkerCount(c)) // 先把总线程数+1再去创建线程,ctl是原子类型的,所以这样不会出现线程安全的问题
- break retry; // 如果新建线程成功,就会跳出去取真正新建线程
- c = ctl.get(); // 新建线程失败的话刷新总线程数
- if (runStateAtLeast(c, SHUTDOWN)) // 判断线程池状态是否>=SHUTDOWN
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- // 否则继续执行该循环
- }
- }
-
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
-
- if (isRunning(c) ||
- (runStateLessThan(c, STOP) && firstTask == null)) {
- if (t.getState() != Thread.State.NEW)
- throw new IllegalThreadStateException();
- workers.add(w);
- workerAdded = true;
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
比如我们设置核心线程数为10,但是现在只创建了9个核心线程,如果此时有 2 个以上线程同时执行了 executor.execute(task) 方法,那么在 execute 中会先判断当前线程总数是否大于核心线程数,因为这些线程是同时执行的,所以都返回 9 ,那么 addWorker 的 core 参数就是 true,进入 addWorker 方法,这些线程会都先给总线程数 +1 再去创建线程,而不是先去创建线程再给总线程数 + 1,因为 addWorker 的参数 core = true(代表创建一个核心线程),那么如果都去创建核心线程就会使得核心线程数超过设置的值。
所以第一个抢先给 ctl + 1 的线程会真正创建一个核心线程(ctl 是原子性的),而其它线程只能被跳出循环重新执行,这样当第二次执行循环时,就会发现此时的核心线程数已经达到了,就会返回 false,代表创建核心线程失败。
所以我们需要知道,在 Java 的线程池中,它并不是上来就把所有核心线程都开启,而是需要的时候才会开启;此外,当核心线程都忙碌的时候,新来的任务并不会去创建非核心线程,而是优先放到一个任务队列当中去,只有当任务队列满了(这里用的 LinkedBlockingQueue 是链表结构,所以不会满)才会去创建一个新的非核心队列。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。