赞
踩
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
//原子整型。1.声明当前线程池的状态;2.声明线程池中的线程数 //高3位是线程池的状态,低29位是线程池中的线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //方便后面做位运算 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池中线程的最大数量2^29-1 // 高3位是线程池的状态 //-1用二进制表示就是32个1,左移29位是高3位是1,低29位是0 //111,代表线程池为RUNNING,代表正常接收任务 private static final int RUNNING = -1 << COUNT_BITS; //000,代表线程池为SHUTDOWN状态,不接受新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理 private static final int SHUTDOWN = 0 << COUNT_BITS; //001 代表线程池为STOP状态,不接受新任务,也不去处理阻塞队列中的任务,同时会中断正在执行的任务 private static final int STOP = 1 << COUNT_BITS; //010 代表线程池为TIDYING状态,过渡的状态,代表线程池即将关闭 private static final int TIDYING = 2 << COUNT_BITS; //011 TERMINATED状态,代表线程池已经关闭 private static final int TERMINATED = 3 << COUNT_BITS; //~CAPACITY代表高3位是1,低29位是0,得到线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获得线程池中工作线程的数量 private static int workerCountOf(int c) { return c & CAPACITY; } //rs为高3位,表示线程池状态;wc是低29位,ctl是合并的结果 private static int ctlOf(int rs, int wc) { return rs | wc; }
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。这部分就是线程池的核心运行机制。
所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
从execute()方法的源码中可以看出在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会创建新线程来执行任务,除非调用了prestartCoreThread/prestartAllCoreThreads方法提前启动线程池中的核心线程。
线程池的execute方法执行
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取代表线程池状态和线程数量的整数 int c = ctl.get(); //工作线程数<核心线程数 if (workerCountOf(c) < corePoolSize) { //创建核心线程执行任务,成功就返回 if (addWorker(command, true)) return; //创建核心线程数失败,重新获取ctl c = ctl.get(); } //确认线程池是Running状态,将任务添加到阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { //再次获取ctl int recheck = ctl.get(); //再次判断是否是RUNNING,如果不是RUNNING状态,移除任务 if (! isRunning(recheck) && remove(command)) //拒绝策略 reject(command); //如果线程池处在RUNNING状态,但是工作线程为0 //线程池中allowCoreThreadTimeOut参数允许核心线程和非核心线程超时时间一样 else if (workerCountOf(recheck) == 0) //阻塞队列有任务但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务 addWorker(null, false); } //线程池是Running状态,阻塞队列已满,创建非核心线程处理任务 else if (!addWorker(command, false)) //if语句中的非核心线程创建失败,说明线程池已经达到最大线程数,拒绝策略 reject(command); }
任务的执行有两种可能:
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现。
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。
整个getTask操作在自旋下完成:
1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
所以,线程池中实现的线程可以一直执行由用户提交的任务。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { //实现工作线程数+1的操作 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // rs >= SHUTDOWN,表明线程状态不是RUNNING //rs == SHUTDOWN,表明线程池是SHUTDOWN状态,外层取反,表明只能是STOP以上更高的状态,这是不需要添加线程处理任务。 //firstTask == null,表明任务为空。外层取反,表示任务不为空,SHUTDOWN状态以上不接收新任务,创建工作线程失败 //阻塞队列不为空,取反,为空,大于等于SHUDOWN状态,证明阻塞队列中没有任务处理,不会创建新线程处理任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //创建工作线程失败 return false; for (;;) { //获取工作线程个数 int wc = workerCountOf(c); //当前线程数已经大于线程池最大容量,不去创建 //判断工作线程数是否超过核心线程数或者最大线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //使用CAS方法将工作线程数+1 if (compareAndIncrementWorkerCount(c)) //跳出外层的for循环 break retry; c = ctl.get(); //判断状态是否变化 if (runStateOf(c) != rs) //继续执行外层的for循环 continue retry; // 线程池状态没变继续执行内侧循环 } } //如何创建工作线程 boolean workerStarted = false; boolean workerAdded = false; //Worker w就是工作线程 Worker w = null; try { //创建工作线程w,传入需要执行的任务 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //获取线程池的全局锁,避免该工作线程w添加任务时,其他线程干掉了线程池,干掉线程池前需要先获取这个锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //RUNNING状态 //SHUTDOWN状态,创建空任务工作线程,处理阻塞队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将工作线程添加到集合中 workers.add(w); //获取工作线程个数 int s = workers.size(); //如果工作线程数大于之前记录的最大工作线程数,就替换 if (s > largestPoolSize) largestPoolSize = s; //添加工作线程成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动工作线程 t.start(); //启动工作线程成功 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } //返回工作是否启动 return workerStarted; }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //1. 任务不为空,执行任务 //2. 如果任务为空,通过getTask方法从阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); //获取当前状态,是否大于等于STOP. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行任务前的操作 beforeExecute(wt, task); Throwable thrown = null; try { //开始执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //线程执行完毕的后续处理 processWorkerExit(w, completedAbruptly); } }
public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Runnable worker = new MyRunnable("" + i); //执行Runnable executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
FixedThreadPool (核心和最大都为指定的值,能确保同时并发的数量)和 SingleThreadExecutor (核心和最大都为1,唯一的工作线程保证工作按照顺序执行):允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool(核心线程池数为0,最大为MAX,只要有请求进来,就会一直等待,等到运行为止。缓存线程的作用)和 ScheduledThreadPool(核心为指定的,最大为MAX,配合DelayedWorkQueue执行定时任务。) :允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。