赞
踩
Java中的线程池顶层接口是 Executor
接口,ThreadPoolExecutor
是此接口的实现类。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数含义如下:
corePoolSize
: 核心线程数最大值
线程池中有两类线程,核心线程和非核心线程,核心线程默认情况下会一直存在于线程池中,而非核心线程如果长时间的闲置,就会被销毁
maximumPoolSize
: 最大线程数量,即线程总数最大值
核心线程数量+非核心线程数量
keepAliveTime
: 非核心线程闲置超时时间
当线程池中的线程数量大于
corePoolSize
数量时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁;
unit
: keepAliveTime
参数的时间单位
TimeUnit
为枚举类型,含以下下属性:
·NANOSECONDS
: 1微毫秒 = 1微秒 / 1000
·MICROSECONDS
: 1微秒 = 1毫秒 / 1000
·MILLISECONDS
: 1毫秒 = 1秒 /1000
·SECONDS
: 秒
·MINUTES
: 分
·HOURS
: 小时
·DAYS
: 天
workQueue
: 任务阻塞队列,当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
常用几个阻塞队列:
1、LinkedBlockingQueue
: 链式阻塞队列,底层数据结构是链表,默认大小Integer.MAX_VALUE
,也可以指定大小。
· ① 吞吐量通常要高于 ArrayBlockingQueue
· ② Executors.newFixedThreadPool() 使用了此队列
2、ArrayBlockingQueue
: 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
3、SynchronousQueue
:不存储元素的阻塞队列;同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
· ① 吞吐量通常要高于 LinkedBlockingQueue
· ② 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
· ③ Executors.newCachedThreadPool使用了此队列
4、DelayQueue
:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
5、PriorityBlockingQueue
:具有优先级的、无限阻塞队列
threadFactory
:线程工厂,用于创建线程,一般用默认即可。
handler
:拒绝策略。线程数量大于最大线程数就会采用拒绝处理策略
如下四种拒绝策略:
1、ThreadPoolExecutor.AbortPolicy
:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException
异常。
2、ThreadPoolExecutor.DiscardPolicy
:丢弃新来的任务,但是不抛出异常。
3、ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列头部(最旧的)的任务,然后重新调用execute()
尝试执行程序(如果再次失败,重复此过程)。
4、ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务。(只要线程池没关闭,就直接用调用者所在线程来运行任务)
线程池本身有一个调度线程,主要用于管理线程池里的各种任务和事务。例如创建线程、销毁线程、任务队列管理、线程队列管理等。。。
public class ThreadPoolExecutor extends AbstractExecutorService { //主要用来存放 当前运行的worker数量以及线程池状态的:高3位标识线程池状态标识,低29位标识当前线程工作数量 private final AtomicInteger ctl; //用于计算线程池的状态值、容量 private static final int COUNT_BITS = 29; private static final int CAPACITY = 536870911; private static final int RUNNING = -536870912; private static final int SHUTDOWN = 0; private static final int STOP = 536870912; private static final int TIDYING = 1073741824; private static final int TERMINATED = 1610612736; //用于存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock; // worker 存放的集合 private final HashSet<ThreadPoolExecutor.Worker> workers; private final Condition termination; //历史达到线程数数最大值 private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; //当队列满了并且 worker的数量达到maxSize时,执行具体的拒绝策略 private volatile RejectedExecutionHandler handler; //超出 corePoolSize 的 worker 的生存时间 private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; // 核心 worker的数量 private volatile int corePoolSize; //最大worker的数量,一般当workQueue满了才会用到这个参数 private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private static final boolean ONLY_ONE = true; .... }
线程池创建后处于 RUNNING
状态。
调用 shutdown()
方法后,线程池由 RUNNING
状态转变为 SHUTDOWN
状态,线程池不再接受新的任务,但能处理已添加的任务,会等待所有任务执行完毕。
调用 shutdownNow()
方法后,线程池处于 STOP
状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
当所有的任务已终止,ctl
记录的 “任务数量” 为 0
,线程池会变为 TIDYING
状态。接着会调用执行 terminated()
函数,线程池由 TIDYING
状态变为 TERMINATED
状态。
ThreadPoolExecutor
中有一个控制状态的属性叫ctl
,是一个AtomicInteger
类型的变量。线程池状态就是通过AtomicInteger
类型的成员变量ctl
来获取的。
获取的
ctl
值传入runStateOf()
方法,与~CAPACITY
位与运算 (CAPACITY
是低29位全1的int
变量)。
~CAPACITY
在这里相当于掩码,用来获取ctl
的值:
高3位
:表示线程池状态;低29位
:表示线程池的线程数
execute()
线程池处理任务提交的核心方法为 execute()
,以下为 ThreadPoolExecutor
的 execute()
方法解析:
public void execute(Runnable var1) { if (var1 == null) { throw new NullPointerException(); } else { int var2 = this.ctl.get(); // 1.通过位运算,获取当前线程数;如果线程数小于 corePoolSize ,则调用 addWorker(),创建核心线程执行任务 if (workerCountOf(var2) < this.corePoolSize) { if (this.addWorker(var1, true)) { return; } //1.1 若添加的调度任务执行不成功时,尝试添加到阻塞队列中 var2 = this.ctl.get(); } //2.如果不小于corePoolSize,则将任务添加到workQueue队列。 if (isRunning(var2) && this.workQueue.offer(var1)) { int var3 = this.ctl.get(); // 2.1 若添加到阻塞队列,但线程池状态不是运行中(isRunning),则从队列中取出(remove),然后执行拒绝策略 if (!isRunning(var3) && this.remove(var1)) { //2.1.1并且执行拒绝策略 this.reject(var1); } // 2.2 线程池处于running状态,但是没有线程,则创建线程 else if (workerCountOf(var3) == 0) { this.addWorker((Runnable)null, false); } } // 3.如果放入阻塞队列失败(workQueue已满),则若当前工作线程总数小于最大线程数(maximumPoolSize)时,则创建非核心线程执行任务 else if (!this.addWorker(var1, false)) { //3.1若线程数已达最大线程数,则执行拒绝策略 this.reject(var1); } } }
思考?
ctl.get()
是获取线程池状态, 第2
步中,入队前进行了一次isRunning
判断,入队之后,又进行了一次isRunning
判断,那为什么要二次检查线程池的状态呢?
答: 在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将
var1
(线程) 加入workqueue
是线程池之前的状态。倘若没有进行二次检查,万一线程池处于非RUNNING
状态(在多线程环境下很有可能发生),那么var1
永远不会执行。
总结执行流程:
corePoolSize
,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到 corePoolSize
,在核心线程数量 < corePoolSize
时),注:此步骤需要获得全局锁)corePoolSize
时,新来的线程任务存入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现出:线程复用)maximumPoolSize
,则执行上面提到的拒绝策略进行处理。addWorker()
创建工作线程并执行 addWorker()
,源码分析如下:
private boolean addWorker(Runnable var1, boolean var2) { while(true) { int var3 = this.ctl.get(); //1.获取线程池状态 int var4 = runStateOf(var3); //2.如果线程池已关闭,则返回false if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) { return false; } while(true) { //3.通过位运算,获取当前线程数 int var5 = workerCountOf(var3); //4.判断线程是否已达上限,根据参数var2的不同,和 corePoolSize 或maximumPoolSize 进行比较 if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) { //4.1 线程数已达上限,返回false return false; } //5. 执行原子操作,对线程数进行 +1 操作 if (this.compareAndIncrementWorkerCount(var3)) { boolean var18 = false; boolean var19 = false; ThreadPoolExecutor.Worker var20 = null; try { //6. 创建一个新的 worker ,存入待执行的任务 var20 = new ThreadPoolExecutor.Worker(var1); Thread var6 = var20.thread; if (var6 != null) { //7. 加锁 (线程池全局锁) ReentrantLock var7 = this.mainLock; var7.lock(); try { //8. 加锁后再次获取线程池的状态 int var8 = runStateOf(this.ctl.get()); if (var8 < 0 || var8 == 0 && var1 == null) { // 9. 预先检查线程是否可执行 if (var6.isAlive()) { throw new IllegalThreadStateException(); } // 9. 将 待执行的worker 存入 workers 中 this.workers.add(var20); int var9 = this.workers.size(); if (var9 > this.largestPoolSize) { //10. 如果此时线程数大于历史线程数最大值,则将现在的线程总数赋值给历史线程数 this.largestPoolSize = var9; } var19 = true; } } finally { // 11.释放锁 var7.unlock(); } //12. 如果创建了新的 worker,则调用 start()方法,立即执行 if (var19) { //12. 开始执行线程 var6.start(); var18 = true; } } } finally { //如果创建的 worker ,没有执行成功,则调用 addWorkerFailed(),删除此 worker if (!var18) { this.addWorkerFailed(var20); } } return var18; } var3 = this.ctl.get(); if (runStateOf(var3) != var4) { break; } } } }
addWorker()
言简意赅总结:
var2
来判断是和 核心线程数还是和最大线程数进行比较,若超出线程数,则返回falsecompareAndIncrementWorkerCount
(CAS
) 尝试将线程数进行 +1 ,若成功则创建一个新的 worker
并立即调用 start()
方法 执行此任务addWork()
中的 创建线程 Worker
ThreadPoolExecutor.Worker
为创建一个线程 源码分析:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //当前 Worker 所处的线程 final Thread thread; // 待执行的工作任务 Runnable firstTask; //任务计数器 volatile long completedTasks; Worker(Runnable var2) { // 禁止中断,直到运行Worker this.setState(-1); this.firstTask = var2; // 创建一个新的线程 this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this); } public void run() { ThreadPoolExecutor.this.runWorker(this); } protected boolean isHeldExclusively() { return this.getState() != 0; } protected boolean tryAcquire(int var1) { if (this.compareAndSetState(0, 1)) { this.setExclusiveOwnerThread(Thread.currentThread()); return true; } else { return false; } } protected boolean tryRelease(int var1) { this.setExclusiveOwnerThread((Thread)null); this.setState(0); return true; } public void lock() { this.acquire(1); } public boolean tryLock() { return this.tryAcquire(1); } public void unlock() { this.release(1); } public boolean isLocked() { return this.isHeldExclusively(); } void interruptIfStarted() { Thread var1; if (this.getState() >= 0 && (var1 = this.thread) != null && !var1.isInterrupted()) { try { var1.interrupt(); } catch (SecurityException var3) { } } } }
Worker
实现了 Runnable
并进行了封装,所以 Worker
也是一个线程任务,在构造方法中,创建了一个线程,线程的任务就是自己,故 addWorker()
方法中调用 第12步 var6.start();
方法中的,会触发 Worker
类的 run
方法被 JVM
调用,其主要是对 待执行的任务
进行中断处理和状态监控 。
Worker
也继承了 AQS
,在每个任务执行时进行了加锁的处理。
runWorker(this)
再看 Worker
类中的 run()
方法调用的 runWorker(this)
, 源码解析:
//worker.run 方法内的 ThreadPoolExecutor.this.runWorker(this);方法源代码 final void runWorker(ThreadPoolExecutor.Worker var1) { Thread var2 = Thread.currentThread(); Runnable var3 = var1.firstTask; var1.firstTask = null; //1.线程启动之后,通过 unlock方法释放锁 var1.unlock(); boolean var4 = true; try { //2.worker执行firstTask 或从 workQueue 中获取任务,如果 getTask()方法不返回 null,则循环一直不退出 //这里达到线程复用的效果,实现线程处理多个任务 while(var3 != null || (var3 = this.getTask()) != null) { //2.1 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断) var1.lock(); //2.2检查线程池状态,若线程池处于中断状态,则当前线程将中断 if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) { var2.interrupt(); } try { //2.3 执行 beforeExecute,记录正在运行的任务 this.beforeExecute(var2, var3); Object var5 = null; try { //2.4 执行实际任务 var3.run(); } catch (RuntimeException var28) { var5 = var28; throw var28; } catch (Error var29) { var5 = var29; throw var29; } catch (Throwable var30) { var5 = var30; throw new Error(var30); } finally { //2.5 执行afterExecute方法;任务执行完成后,清楚记录信息 this.afterExecute(var3, (Throwable)var5); } } finally { var3 = null; //当前worker计数器+1,统计worker执行了多少个任务,最后累加进 completedTasks 变量,可调用相应方法返回一些统计信息 ++var1.completedTasks; //2.6 最后执行解锁操作 var1.unlock(); } } // 表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量 var4 = false; } finally { // completedTasks累加到completedTaskCount变量中 this.processWorkerExit(var1, var4); } }
runWorker()
的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。
注意: 当执行完此时提交的任务后,worker
的生命周期并没有结束,在 while
循环中,worker 会不断地调用 getTask() 方法从 阻塞队列 中获取任务,然后调用 task.run()
执行任务,从而达到线程复用的效果,只要 getTask()
方法不返回 null
,此线程就不会退出。
当然,核心线程池中创建的线程想要拿到阻塞队列中的任务,先要判断线程池的状态,如果 STOP
或者 TERMINATED
,返回null。
getTask()
再在阻塞队列中获取任务 getTask()
方法,源码解析:
private Runnable getTask() { boolean var1 = false; while(true) { int var2 = this.ctl.get(); int var3 = runStateOf(var2); if (var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) { this.decrementWorkerCount(); return null; } int var4 = workerCountOf(var2); //1. allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁,如果为true,核心线程在keepAliveTime内仍空闲则会被销毁 boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize; //2.如果运行线程数 小于等于 最大线程数 或 阻塞队列不为空,并且运行线程数 小于等于 1 if (var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) { try { //3. 如果 var5 为 true(可以思考一下哪些情况下var5为true),则会调用 workQueue的poll方法获取任务 // 超时时间是keepAliveTime。如果超过keepAliveTime时长,poll返回了null,上边 3.3.4 中提到的while循序就会退出,线程也就执行完了 //如果var5为false(allowCoreThreadTimeOut为false, 且 var4 > corePoolSize为false),则会调用workQueue的take方法阻塞在当前 //队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。 Runnable var6 = var5 ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take(); if (var6 != null) { return var6; } var1 = true; } catch (InterruptedException var7) { var1 = false; } //3.如果 worker数量为空,则直接返回null } else if (this.compareAndDecrementWorkerCount(var2)) { return null; } } }
核心线程的会一直卡在 workQueue.take
方法,被阻塞并挂起,不会占用 CPU
资源,直到拿到 Runnable
然后返回(如果allowCoreThreadTimeOut
设置为 true
,那么核心线程就会去调用 poll
方法,因为 poll
可能会返回 null
,所以此时核心线程满足超时条件也会被销毁)
非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount
就会返回 null
, Worker
对象的 run()
方法循环体的判断为 null
,任务结束,然后线程被系统回收 。
processWorkerExit(var1, var4)
在 3.3.4 的 runwork()
方法中的 getTask()
方法获取待执行任务,若 getTask()
返回 null
,说明此时无任务需要执行,则执行 finally
内的 this.processWorkerExit(var1, var4);
方法,源码解析:
private void processWorkerExit(ThreadPoolExecutor.Worker var1, boolean var2) { // 1.上一步传过来的值,,如果为异常结束,则工作线程数减1 if (var2) { this.decrementWorkerCount(); } //2.获取全局锁 ReentrantLock var3 = this.mainLock; var3.lock(); try { //3. 累加 this.completedTaskCount += var1.completedTasks; this.workers.remove(var1); } finally { //4.释放锁 var3.unlock(); } //5.尝试终止线程池 this.tryTerminate(); int var4 = this.ctl.get(); //6. 判断状态是否小于 STOP if (runStateLessThan(var4, 536870912)) { if (!var2) { // 6.1 检查 allowCoreThreadTimeOut 设置的值: int var5 = this.allowCoreThreadTimeOut ? 0 : this.corePoolSize; // 6.1 -> allowCoreThreadTimeOut设置为true, 1)若队列不为空,至少保留一个worker,2)若队列为为空,则直接退出。线程池worker数量减少,最终可能会为 0 if (var5 == 0 && !this.workQueue.isEmpty()) { var5 = 1; } // 6.2 -> allowCoreThreadTimeOut设置为false,则worker数量不少于corePoolSize if (workerCountOf(var4) >= var5) { return; } } this.addWorker((Runnable)null, false); } }
总结:
第6步:判断状态是否小于 STOP
llowCoreThreadTimeOut
设置为 true
:
1)若队列不为空:设置
worker
数量为1
2)若队列为空:直接退出,线程池的worker
数减少,最终可能为0
allowCoreThreadTimeOut
设置为 false
:
worker
的数量不少于corePoolSize
(若线程数小于corePoolSize,则添加null
任务创建worker
进行补充)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。