赞
踩
java.util.concurrent.ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor类称为线程池。AbstractExecutorService是一个抽象类,它实现了ExecutorService接口
1.类变量&常量
//ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。
//ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
//RUNNING -- 对应的高3位值是111。
//SHUTDOWN -- 对应的高3位值是000。
//STOP -- 对应的高3位值是001。
//TIDYING -- 对应的高3位值是010。
//TERMINATED -- 对应的高3位值是011。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final BlockingQueue<Runnable> workQueue;//阻塞队列,当线程池中的线程数大约线程池大小事,新的任务会进入阻塞队列。
private final ReentrantLock mainLock = new ReentrantLock(); //互斥锁
/*Worker表示工作线程,workers表示多个线程的集合。当对应的线程池启动时,
会执行workers中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一
个阻塞的任务来放入workers中继续运行。workers可以使多个线程同时运行。*/
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition(); //终止条件
private int largestPoolSize; //最大的线程池长度
private long completedTaskCount; //完成的任务数
private volatile ThreadFactory threadFactory; //线程工厂,用于创建线程
private volatile RejectedExecutionHandler handler; //拒绝策略
private volatile long keepAliveTime; //线程生存时间
private volatile boolean allowCoreThreadTimeOut; //允许的核心线程的超时时间
/* 当新任务提交给线程池时(通过execute方法)。
创建流程为:
<corePoolSize --> 创建新线程,放入线程池,处理请求
>corePoolSize && < maximumPoolSize --> 放入阻塞队列中,若阻塞队列满,
则创建新线程,并将任务交给该线程直接处理,加入线程池中
> maximumPoolSize --> 执行相应的拒绝策略
*/
private volatile int corePoolSize; //核心池大小,也即Worker的最小数量,保证不超时
private volatile int maximumPoolSize; //线程池的最大长度
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy(); //默认的拒绝策略
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
2.构造方法
//采用默认的拒绝策略和使用默认的线程工程
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//增加了拒绝策略的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//增加了线程工厂的参数,但无拒绝策略参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//核心构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || //线程核心池大小不能小于0
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.内部类
1.private final class Worker extends AbstractQueuedSynchronizer implements Runnable
执行任务的线程类,实现了Runnable接口,继承AQS类
其内部的run方法调用ThreadPoolExecutor的runWorker方法,初始构造方法设置标志位为-1
构造方法为
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
2.拒绝策略类
AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy四个类均实现了RejectedExecutionHandler方法
任务被线程池拒绝的主要原因有:任务数量超过了maximumPoolSize;线程池异常关闭
2.1 AbortPolicy类
该类的拒绝策略为直接抛出RejectedExecutionException异常
2.2 CallerRunsPolicy类
该类的拒绝策略为,若线程池未关闭,则直接在调用该task的线程中执行被拒绝的任务。若线程池关闭,则抛弃该task
2.3 DiscardOldestPolicy类
该类的拒绝策略为若线程池为关闭,则将阻塞队列的头部抛出,也即抛出最“老”的线程,并且将被拒绝的task加入到等待队列中
2.4 DiscardPolicy类
该类的拒绝策略为丢弃被拒绝的任务
4.重要方法
1.execute方法,执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
int c = ctl.get();
//线程数小于corePoolSize,新建线程,并将任务command加入线程中
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//大于corePoolSize,首先尝试加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//判断线程池状态,若异常终止,则删除该任务,并执行相应的拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//阻塞队列已满,则直接新建一个线程并将command任务加入到线程池中。若创建失败,则执行相应拒绝策略
else if (!addWorker(command, false))
reject(command);
}
2. addWorker方法,将任务添加进线程池中。参数core为true,比较corePoolSize;core为false,则比较maxPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 更新"线程池状态和计数"标记,即更新ctl。
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //获取线程池状态
// 线程池是否有效
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); //线程池中Worker线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; //不能超出最大数量
// 通过CAS函数将c的值+1。操作失败的话,则退出循环。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); //重新得到ctl的值
//两次结果需一致,若不同则从retry重新开始循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //添加任务到Worker线程中,作为firsttask
final Thread t = w.thread; //获取Worker对应的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取锁
try {
//再次确认ctl的状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //将新建的线程加入到workers容器中
int s = workers.size();
if (s > largestPoolSize) //更新largestPoolSize
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { //若添加成功,则执行Worker赌赢的线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) //为添加成功,则执行相应的addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); //将该线程从workers中移除
decrementWorkerCount(); //将工作线程数减1
tryTerminate();
} finally {
mainLock.unlock();
}
}
3.submit方法,父类AbstractExecutorService实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
4.shutdown方法,不再接受新的任务,直到所有任务执行完后,关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查终止线程池的“线程”是否有权限。
advanceRunState(SHUTDOWN); // 设置线程池的状态为关闭状态
interruptIdleWorkers(); //中断线程池中空闲的线程。
onShutdown(); //在ThreadPoolExecutor中没有任何动作,钩子方法
} finally {
mainLock.unlock();
}
tryTerminate();
}
//尝试关闭所有正在执行的任务,暂停等待任务的处理,并返回等待执行的任务列表。
该方法并不能一定保证线程池一定能够关闭,因为其调用Thread.interrupt方法,有可能有中断响应的线程存在,故无法立刻关闭
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
5.线程池有五种状态
RUNNING -- 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进
-- 行处理
SHUTDOWN -- 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务
STOP -- 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会
-- 中断正在处理的任务
TIDYING -- 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING
-- 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
TERMINATED-- 线程池彻底终止,就变成TERMINATED状态
RUNNING: 线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RU
NNING状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
可知,线程池一开始ctl处于RUNNING状态
SHUTDOWN:调用shutdown方法,从RUNNING-->SHUTDOWN
STOP: 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -->
STOP。
TIDYING: 当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空
时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
TERMINATED :处在TIDYING状态,执行完terminated()后,会由TIDYING --> TERMINATED
5.Executors静态工厂类中创建的几种不同的线程池
1. newCachedThreadPool方法,创建一个corePoolSize=0,maxPoolSize=Integer.MAX_VALUE的可扩充线程池。
其超时时间为60s,采用SynchronousQueue队列(其为一个sync队列,默认为非公平锁,且没有长度界限)和默认的拒绝策略(DiscardPolicy类)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2. newFixedThreadPool方法,创建一个corePoolSize = maxPoolSize的固定大小线程池,也即当阻塞队列满时,新加任务会被抛弃。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3. newScheduledThreadPool方法,新建一个ScheduledThreadPoolExecutor对象,其corePoolSize为传入参数的大小。
该线程池为延迟线程池,支持定时及周期性的任务执行的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
4. newSingleThreadExecutor方法,创建一个一次只能有一个线程执行的线程池,可以用来进行线程调度
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。