赞
踩
线程池,是指管理一组同构工作线程的资源池。
线程池在工作队列(Work Queue)中保存了所有等待执行的任务。工作者线程(Work Thread)会从工作队列中获取一个任务并执行,然后返回线程池并等待下一个任务。
线程池比执行任务再创建线程会有以下优势:
ThreadPoolExecutor 是Java中线程池的实现类。下图是继承关系:
AbstractExecutorService
«Interface»
Executor
«Interface»
ExecutorService
ThreadPoolExecutor
下图是ThreadPoolExecutor的大致结构
Executor 的实现通常会创建线程来执行任务。由于 Executor 以异步的方式来执行任务,因此之前提交任务的状态不是立即可见的,有些任务已经完成,有些还在运行,有些在等待执行。当关闭应用程序时,是完成当前任务并不接受新任务,还是直接关闭所有任务(不管是在执行还是没有执行)。
为了解决执行服务的生命周期问题,Executor 扩展了 ExecutorService 接口,该接口提供了对生命周期管理的方法。
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean awaitTermination(long timeout, TimeUnit unit);boolean isShutdown();boolean isTerminated();// 以及一些用于创建异步任务的方法<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);Future<?> submit(Runnable task);....}
在ThreadPoolExecutor中提供了五种状态:
下图是状态之间的转换关系:
如果任务队列被填满(在队列大小有限的情况下)或者某个任务被提交到一个已经被关闭的Executor中时应该怎么处理这些情况?JDK提供了一种策略来处理这些情况--饱和策略
在ThreadPoolExecutor中通过ctl字段来维护了线程池的运行状态和线程数量(工作者线程)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
具体的可以通过两个参数来说明:
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
每个状态的取值如下,每个取值都向左边移动了29位:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
所以高三位用来表示线程池的状态
之后用来确定线程池中的线程数量都使用COUNT_MASK来计算,这样就能计算低29位
private static int workerCountOf(int c) { return c & COUNT_MASK; }
ThreadPoolExecutor 通过如下的方法来检测当前线程池的状态:
运行是要比终结状态小的
// 当前状态要比给定的状态小,如 running < terminatedprivate static boolean runStateLessThan(int c, int s) { return c < s; } // 当前状态要高于给定的状态,如 terminated >= runningprivate static boolean runStateAtLeast(int c, int s) { return c >= s; } // 检测当前线程是否在运行private static boolean isRunning(int c) { return c < SHUTDOWN; }
ThreadPoolExecutor的饱和策略可以通过调用
setRejectedtExecutionHandler 来修改。
JDK 提供了几种不同的 RejectedExecutionHandler 的实现:
如果请求速率超过了线程池的处理速率,那么新到来的请求将会累计起来,这些请求会在一个由 Executor 管理的 Runnable 队列(也就是任务队列 Work Queue)中等待。但这仍有可能超出缓存的数量。
基本的任务排队方法有三种:
public interface BlockingQueue<E> extends Queue<E> {offer();pool();put();take();....}
JDK 提供了以下几种实现类:
所有的任务都是由工作者线程来执行的,那么工作者线程是如何执行这些任务的,以及线程池是如何维护工作者线程的。下面是ThreadPoolExecutor中的工作者线程类Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// 线程final Thread thread;// 第一个任务Runnable firstTask;// 执行任务的数量volatile long completedTasks;....}
工作者线程实现了Runnable接口,并且它包含了一个Thread ,说明工作者线程是一个特殊的任务也是一个线程,它可以去执行一些其他任务也可以自控制。
工作者线程继承自AQS,而不是使用的可重入锁ReentrantLock,目的是实现可不重入的特性(线程封闭)。每个工作者线程必须是线程封闭的。
在工作者线程构造器中有一段:
this.thread = getThreadFactory().newThread(this);
通过调用线程工厂创建线程并将this(也就是工作者线程)提交给Thread
也就是执行Thread就启动了自己,所以工作者线程可以自己管理自己
addWorkder中创建工作者线程之后执行了它,这里的t就是thread
t.start();
线程池中的工作者线程的状态由两个字段来控制:
// 生存时间private volatile long keepAliveTime;// 是否允许核心线程超时等待private volatile boolean allowCoreThreadTimeOut;
keepAliveTime 是当线程数量大于核心线程数数量时工作者线程没有任务时存活的时间
例如当前工作者线程数量是30,核心线程数量上限是20,最大线程数量是30。那么多出来10个线程在线程池比较空闲的时候是需要清除的,因为这是占用了多余的系统资源。
keepAliveTime是为了保证突然之间线程池繁忙的情况,这时候就没必要立马清除这些线程,可以"等等看"有没有突发情况。
allowCoreThreadTimeOut 则是使得核心线程也受keepAliveTime的影响
这些具体体现在从队列中获取任务的时候,下面会详细描述
每当线程池创建一个线程,都是通过线程的工厂方法创建的。
默认的线程工厂方法创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过制定一个工厂方法,可以定制线程池的配置信息。每当线程池需要一个新的线程都会调用getThreadFactory()这个方法。
以下是ThreadPoolExecutor 创建一个工作者线程,是通过工厂方法创建的:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
ThreadFactory 是一个接口
public interface ThreadFactory { Thread newThread(Runnable r); }
在构造线程池ThreadPoolExecutor时,可以传入一个线程工厂,使得创建线程时通过该线程工厂创建。
ThreadPoolExecutor 是通过实现Executor来执行任务的
具体分为三个步骤:
copy from Java线程池实现原理及其在美团业务中的实践
通过源码来理解一下:
public void execute(Runnable command) { // 任务不能为空 if (command == null) throw new NullPointerException();// 任务数量int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
当执行一个任务会将一个任务放到工作队列中或者是直接创建一个工作者线程去执行该任务。
addWorker 是创建一个工作者线程并运行一个任务的(可以不运行一个任务)
这段代码主要做两个工作:
private boolean addWorker(Runnable firstTask, boolean core) { // 保证能添加工作者线程 retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 如果超过限定数量,这个数量可以是最小的活跃线程数量可以是最大的活跃线程数量 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) // 只有能增加工作者线程才退出 break retry; //重新获取并检查运行状态 c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 从这里创建一个工作者线程并运行该线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检测线程池运行状态 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 检测线程状态,线程状态必须为 NEW if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 将该工作者线程添加到工作者线程组中 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } // 如果工作者线程已经添加则运行该线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }折叠
工作者线程获取任务主要有两个途径:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 有两种情况 // 1. 执行firstTask // 2. 获取一个任务并执行 while (task != null || (task = getTask()) != null) { w.lock(); // 如果线程池已经停止,确保当前线程是中断的 // 如果线程池正在运行,确保线程没有中断 // 第二次获取控制状态是为了确保线程池在关闭的过程中能够正常中断当前线程if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();// 从这里开始执行任务 try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; // 增加任务执行数量,这里是线程封闭的,所以不需要考虑并发的情况 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 线程自回收 processWorkerExit(w, completedAbruptly); } }
工作者线程通过 getTask获取一个任务来执行
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. // 检测线程池状态,队列不能为空 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // int wc = workerCountOf(c); // 这一段用来表示核心线程是否受影响 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 工作者线程太多或者任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 减少工作者线程数量 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 从队列中获取一个任务 try { // 这一段体现了keepAliveTime的作用 // 超过keepAliveTime给定时间没有获取到任务,那么线程将会被清理/回收掉 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
processWorkerExit 负责这一工作,具体流程如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completed
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。