赞
踩
Collections.synchronizedList(new ArrayList<>());
内部实现,增加一个锁对象【Object mutex】,方法的使用对这个对象加锁Synchronized。
Executors创建线程池
简单使用
public class ExecutorTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService e=Executors.newFixedThreadPool(3);
//执行Runnable任务
Future<String > f=e.submit(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Runnable end");
System.out.println(f.get());
//执行Callable任务
FutureTask<String> task=(FutureTask<String>) e.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "Callable end";
}
});
System.out.println(task.get());
//执行FutureTask任务,FutureTask既是任务也是结果
FutureTask<String> task1=new FutureTask<>(()->{
Thread.sleep(1000);
return "FutureTask end";
});
new Thread(task1).start();
System.out.println(task1.get());
e.shutdown();
}
}
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL 正常
* NEW -> COMPLETING -> EXCEPTIONAL 异常
* NEW -> CANCELLED 取消
* NEW -> INTERRUPTING -> INTERRUPTED 中断
*/
private volatile int state; 线程状态
private static final int NEW = 0; 新建
private static final int COMPLETING = 1; 执行中
private static final int NORMAL = 2; 正常
private static final int EXCEPTIONAL = 3; 异常
private static final int CANCELLED = 4; 取消
private static final int INTERRUPTING = 5; 中断
private static final int INTERRUPTED = 6; 被中断
//设置Callable有返回值的任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 新建状态
}
//设置Runnable任务,通过Executors构建成Callable有返回值的任务,成功后返回result
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // 新建状态
}
public void run() {
// 如果状态不是NEW ,返回
//如果状态为new,则尝试把当前线程赋值过去,如果赋值失败,返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //调用callable.call(),执行任务
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); //抛出异常则设置异常
}
if (ran)
set(result); //正常完成则设置对应的完成结果
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING) //任务处于中断中的状态,则进行中断操作
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {//正常返回值设置
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将状态位设置成中间状态COMPLETING
outcome = v;//设置输出为正常返回结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 将状态更为最终状态NORMAL
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) //执行中状态
s = awaitDone(false, 0L);//等待任务结束
return report(s); //完成返回
}
//等待完成,可能是是中断、异常、正常完成,timed:true,考虑等待时长,false:不考虑等待时长
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { //如果线程已中断,则直接将当前节点q从waiters中移出,并抛出异常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { //如果state已经是最终状态了,则直接返回state
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 如果state是中间状态(COMPLETING),意味很快将变更过成最终状态,让出cpu时间片即可
Thread.yield();
else if (q == null) //如果发现尚未有节点,则创建节点
q = new WaitNode();
else if (!queued) //如果当前等待线程还没有进入等待队列,入列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { //线程被阻塞指定时间后再唤醒
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 如果不是超时等待,就超时阻塞
}
else //线程一直被阻塞直到被其他线程唤醒
LockSupport.park(this);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
//如果正处于NEW状态,mayInterruptIfRunning是true就设置为INTERRUTPTING,否则直接设置CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) { // 允许运行中进行中断操作
try {
Thread t = runner;
if (t != null)
t.interrupt(); // 并不是实时取消!
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); //中断成功,则置为最终状态
}
}
} finally {
finishCompletion(); //执行唤醒所有等待线程操作
}
return true;
}
ThreadPoolExecutor中有两个集合 一个是HashSet线程集合workers,另一个是BlockingQueue任务集合workQeuue。
参数定义
corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
unit:keepAliveTime的单位
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
threadFactory:线程工厂,用于创建线程,一般用默认(DefaultThreadFactory)即可;
handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
Integer 类型是 32 位二进制表示,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程 个数 。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
常用变量的解释
// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
execute 方法
核心线程执行任务,加入任务(BlockingQueue)队列,非核心线程执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//worker数量超过核心线程数,直接加入任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//线程池状态不是RUNNING,说明执行过shudown,需要对新加入的任务拒绝(reject)操作
if (! isRunning(recheck) && remove(command))
reject(command);
//如果核心线程等于0,添加非核心线程执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果线程池不是RUNNING状态,或者进入队列失败,创建worker执行任务,如果返回值时候false,执行拒绝操作
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
worker线程数量加1,workers容器中添加线程,启动执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//线程数量加1
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//满足添加线程状态的条件
for (;;) {
int wc = workerCountOf(c);
//worker超过容量直接返回false;
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS方式添加worker数量,成功,直接跳出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程池转态发生变化,外层循环自旋,其他情况内层循环自旋
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//worker线程容器添加信息,必须加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//worker已经调用了start方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到workers容器
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker->runWorker执行方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用unlock()是为了让外部可以中断
w.unlock(); // allow interrupts
// 这个变量用于判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 这儿是自旋
// 1. 如果firstTask不为null,则执行firstTask;
// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
// 这两个方法在当前类里面为空实现。
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 已完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
jvm提供的一个用于并行执行的任务框架。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果,得到最终的结果。
public class RecursiveTaskTest extends RecursiveTask<Integer>{
private static final int THRESHOLD = 10;
private int start;
private int end;
public RecursiveTaskTest(int start,int end) {
this.start = start;
this.end = end;
}
protected Integer compute() {
if(end - start < THRESHOLD) {
Integer result=0;
for(int i=start;i<=end;i++) {
result+=i;
}
return result;
}else {
int middle = (start + end) / 2;
RecursiveTaskTest firstTask = new RecursiveTaskTest(start,middle);
RecursiveTaskTest secondTask = new RecursiveTaskTest(middle+1,end);
invokeAll(firstTask,secondTask);
return firstTask.join()+secondTask.join();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool fjp=new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask=fjp.submit(new RecursiveTaskTest(0, 50));
Integer result=forkJoinTask.get();
System.out.println(result);
fjp.awaitTermination(3, TimeUnit.SECONDS);
fjp.shutdown();
}
}
public class RecursiveActionTest extends RecursiveAction{
private static final int THRESHOLD = 10;
private int start;
private int end;
public RecursiveActionTest(int start,int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end - start < THRESHOLD) {
for(int i=start;i<=end;i++) {
System.out.println(Thread.currentThread().getName()+",i="+i);
}
}else {
int middle = (start + end) / 2;
RecursiveActionTest firstTask = new RecursiveActionTest(start,middle);
RecursiveActionTest secondTask = new RecursiveActionTest(middle+1,end);
invokeAll(firstTask,secondTask);
}
}
public static void main(String[] args) throws InterruptedException {
ForkJoinPool fjp=new ForkJoinPool();
fjp.submit(new RecursiveActionTest(0, 50));
fjp.awaitTermination(3, TimeUnit.SECONDS);
fjp.shutdown();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。