赞
踩
首先简单看一下JUC架构图,我们将会从CAS,Volatile基础知识讲起,而AQS等知识都会使用到这两部分,AQS作为并发基石,起到承上启下的作用,又为上层提供了基础。因此AQS是本专栏的重点知识。
本专栏会尽可能按照合理的顺序去安排知识顺序,给大家良好的阅读体验,构建起对于JUC的知识体系。(本来一直在语雀更新的,但是因为语雀现在会员才可以分享,无奈转移到CSDN。本来打算是在22年底更新完毕一次性发出来,现在还有一部分内容,只是建立了标题,但是没有更新,后面会慢慢完善的)
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
这些compareAndSwap操作,在底层调用的是cmpxchg指令,如果程序在多核处理器上,则在cmpxchg指令前加上lock前缀(lock cmpxchg);在单核处理器上,则不需要加lock。
lock前缀指令的作用(与volatile相同):
如果Core1 和 Core2的高速缓存中有相同的数据,然后Core1在自己的高速缓存中修改了该数据,会通过总线让Core2高速缓存中对应的值失效,而Core2发现自己缓存中的数据失效了,会立刻从内存中读取最新的数据,保证缓存一致性。
CPU通过MESI协议保证变量的缓存一致性,为了保障缓存一致性,需要在总线上来回通信,如果该流量过大,总线将会成为瓶颈,这就是总线风暴
Java轻量级锁在争用激烈的情况下,会从轻量级锁升级为重量级锁,其一是为了减少CAS空自旋,其二是变量同一时间大量的CAS操作导致的总线风暴。
那么基于CAS实现的轻量级锁是如何避免总线风暴的呢?使用队列对抢锁线程进行排队,最大程度减少了CAS操作数量。(CLH自旋锁就是基于队列排队的自旋锁)
1、线程阻塞和唤醒需要从用户态到内核态,而cas自旋不需要。但是呢cas自旋也存在问题,也就是cas空自旋和总线风暴问题,存在两种方式可以减少cas空自旋的问题,一个就是使用分散热点如LongAdder,一个是使用队列削峰如AQS,都是使用空间换时间的方式,后面会AQS章节会再继续展开
后文的总多知识使用到了AQS,如线程池中的Worker,独占锁ReentrantLock,共享锁CountDownLatch与Semaphore,因此先来学习这块知识,为第三章对于线程池的学习打好基础。
学习AQS之前,最好是先学习一下CLH自旋锁,因为AQS中的同步队列和CLH中的单向链表差不多,都是后继节点在前驱节点上自旋,等待释放锁。
悲观锁:如Java中的synchronize,可以确保哪个线程持有锁,就可以访问到临界资源
乐观锁:乐观锁是一种思想,有两部分组成:冲突检测和数据更新。具体的实现有cas,数据库带版本号更新,原子类
悲观锁存在的问题:
乐观锁存在的问题:
如何避免CAS总线风暴呢?使用队列对抢锁线程进行排队,最大程度减少CAS操作数量。
CLH自旋锁就是一种基于队列排队的自旋锁(具体为单向链表)。
CLH自旋锁原理:申请加锁的线程首先会在单向链表末尾CAS添加新的节点,之后在其前驱节点上自旋,检查前驱节点是否释放锁即可,如果发现前一个节点释放锁,则抢锁成功。这个过程只有入队的时候需要CAS,入队之后不需要CAS自旋,只需要普通自旋就可以了。在争用激烈的情况下,可以大大减少CAS操作的数量,避免总线风暴。
/** * <p> * </p> * * @author LZH * @date 2022-10-03 21:16 */ public class CLHLock implements Lock { static class Node{ //true:表示当前线程正在抢占锁,或者已经抢占锁 //false:当前线程已经释放锁,下一个线程可以占有锁了 volatile boolean locked; Node prevNode; public Node(boolean locked, Node prevNode) { this.locked = locked; this.prevNode = prevNode; } public static final Node EMPTY = new Node(false,null); public boolean isLocked() { return locked; } public Node getPrevNode() { return prevNode; } public void setLocked(boolean locked) { this.locked = locked; } public void setPrevNode(Node prevNode) { this.prevNode = prevNode; } } /** * 当前节点的线程本地变量 */ private static ThreadLocal<Node> curNodeLocal = new ThreadLocal<>(); /** * CLHLock队列末尾节点 */ private AtomicReference<Node> tail = new AtomicReference<>(null); public CLHLock() { //设置队尾节点 tail.getAndSet(Node.EMPTY); } @Override public void lock() { Node curNode = new Node(true, null); Node preNode = tail.get(); //cas自旋设置尾节点 while(!tail.compareAndSet(preNode,curNode)){ preNode = tail.get(); } //设置前驱节点 curNode.setPrevNode(preNode); //自旋:在前驱节点上普通自旋,监听前驱节点是否释放锁 while(curNode.getPrevNode().isLocked()){ //前驱节点还占用着锁,让CPU时间片给他干活 Thread.yield(); } //来到这里,说明前驱节点已经释放锁了 //只有当前线程进入了临界区,其他线程都还在普通自旋等待前驱节点释放锁的过程中 //将节点放到线程本地变量中,释放锁的时候会用到 curNodeLocal.set(curNode); } @Override public void unlock() { Node curNode = curNodeLocal.get(); curNode.setPrevNode(null);//Help GC curNode.setLocked(false); curNodeLocal.set(null); } }
package java.util.concurrent.locks; import sun.misc.Unsafe; public class LockSupport { private LockSupport() {} // Cannot be instantiated. private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); } //唤醒某个被阻塞的线程 public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } //无限期阻塞当前线程,带block对象,用于给诊断工具确定线程阻塞的原因 public static void park(Object blocker) { //获取当前线程 Thread t = Thread.currentThread(); //设置当前线程的blocker setBlocker(t, blocker); //无限期阻塞 UNSAFE.park(false, 0L); //唤醒后,设置阻塞原因为null setBlocker(t, null); } //限时阻塞 public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { //获取当前线程 Thread t = Thread.currentThread(); //设置当前线程的阻塞原因 setBlocker(t, blocker); //限时阻塞 UNSAFE.park(false, nanos); //唤醒后阻塞原因设置为空 setBlocker(t, null); } } //阻塞当前线程到某个时间点 public static void parkUntil(Object blocker, long deadline) { //获取当前线程 Thread t = Thread.currentThread(); //设置当前线程阻塞原因 setBlocker(t, blocker); //设置阻塞到某个时间点 UNSAFE.park(true, deadline); //唤醒后设置阻塞原因为空 setBlocker(t, null); } //获取被阻塞线程的block对象,用于分析阻塞的原因 public static Object getBlocker(Thread t) { if (t == null) throw new NullPointerException(); return UNSAFE.getObjectVolatile(t, parkBlockerOffset); } //无限期阻塞当前线程 public static void park() { UNSAFE.park(false, 0L); } //限时阻塞 public static void parkNanos(long nanos) { if (nanos > 0) //限时阻塞 UNSAFE.park(false, nanos); } //阻塞当前线程到某一个时间 public static void parkUntil(long deadline) { UNSAFE.park(true, deadline); } /** * Returns the pseudo-randomly initialized or updated secondary seed. * Copied from ThreadLocalRandom due to package access restrictions. */ static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero UNSAFE.putInt(t, SECONDARY, r); return r; } // Hotspot implementation via intrinsics API private static final sun.misc.Unsafe UNSAFE; private static final long parkBlockerOffset; private static final long SEED; private static final long PROBE; private static final long SECONDARY; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; parkBlockerOffset = UNSAFE.objectFieldOffset (tk.getDeclaredField("parkBlocker")); SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed")); PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception ex) { throw new Error(ex); } } }
使用基于CAS自旋实现的轻量级锁有两个大的问题:
解决CAS恶性空自旋的有效方式之一是空间换时间,常见的有分散热点和队列削峰。JUC中LondAdder使用了分散热点的方式解决该问题,但是更常见的是使用队列削峰的方式解决该问题,并且还提供了一个基于双向队列的削峰基类–抽象基础类(AbstractQueuedSynchronizer 抽象队列同步器,简称AQS)
CLH自旋锁内部维护了一个FIFO的单向队列,队头的节点表示占有该锁的节点,新加入的节点需要等待,会插入到队列的尾部。
Zookeeper的分布锁方案:
JUC中有众多的类是基于AQS的,如ReentrantLock,CountDownLatch,Semaphore,ReentrantReadWriteLock,FutureTask等。AQS解决了实现同步容器的大量细节问题。
AQS是CLH队列的一个变种,内部是一个FIFO的双向链表,这种结构是每个节点都有前驱和后继节点,每一个节点都封装了线程,当线程竞争失败的时候会封装成节点进入AQS队列;当获取线程的线程释放锁后,会从队列中唤醒一个阻塞的节点(线程)。
AQS是基于模板模式实现的,分离变与不变。AQS为锁获取,锁释放,入队,出队提供了一系列的模板方法,而具体的细节实现抽取出钩子方法,交给子类去实现。
static final class Node { //AQS为两种模式提供了不同的模板流程 //枚举:共享模式 static final Node SHARED = new Node(); //枚举:独占模式 static final Node EXCLUSIVE = null; //表示当前节点处于取消状态 static final int CANCELLED = 1; //标示后继线程处在等待状态 static final int SIGNAL = -1; //标示当前线程正在进行条件等待 static final int CONDITION = -2; //标示下一次共享锁的acquireShared操作需要无条件传播 static final int PROPAGATE = -3; //node状态,可选值(0,SIGNAL[-1],CANCELLED[1],CONDITION,PROPAGATE) //普通的同步节点默认初始值为0,条件等待节点的初始值为-2 //waitStatus > 0 取消状态 //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点 volatile int waitStatus; //因为node需要构建成 fifo队列,索引prev 指向 当前 前继节点 //当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态 volatile Node prev; //指向后继节点 volatile Node next; //当前node封装的线程本身 volatile Thread thread; //若当前节点不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上, //此属性指向下一个条件等待节点,即条件队列上的后继节点 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } //返回当前节点的上一个节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
CANCELLED = 1:表示当前的线程节点已经释放(超时,中断),已取消的节点不会再阻塞,需要从同步队列中取消等待。该节点不会参与竞争,且会一直保持取消状态(那么抢锁的时候,自旋获取前驱节点状态的时候就要跳过这些CANCELLED的节点)
SIGNAL = -1:表示后继节点处于等待唤醒的状态啊,当前节点如果释放了锁,就需要通知后继节点,使后继节点的线程得以运行
CONDITION = -2:表示线程在条件队列中阻塞(Condition中使用了),表示节点在等待队列中。当持有锁的线程调用了CONDITION的signal()方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁。(同步队列指的是AQS中维护的FIFO队列,等待队列是每一个CONDITION关联的队列)
PROPAGATE = -3:表示下一个线程获取到
为了更好的学习线程池,需要提前学习一下阻塞队列
关于线程的基础知识,本篇就不在赘述了,直接从使用Callable和FutureTask创建线程讲起,我们使用继承Thread或者是实现Runnable的方式去创建线程都没办法获得异步任务执行的结果,因此可以使用Callable和FutureTask来解决这个痛点。
public enum State { //new Thread(...)已经创建线程,但是还没有start NEW, //Java把Ready和Running合并成一种状态:Runnable(可执行状态) //调用了start方法后,线程就处于就绪状态,得到CPU时间片执行run方法后,就进入了执行状态 RUNNABLE, //处于blocke阻塞状态的线程不会占用CPU资源(等待获取锁或者IO阻塞) BLOCKED, //waiting无限期等待,不会被分配CPU时间片,需要其他线程显示唤醒,才可以进入就绪状态 WAITING, //限时等待,指定时间内线程没被唤醒,就会自动唤醒,进入就绪状态 TIMED_WAITING, //线程任务执行结束或者发送异常而没有被处理,线程进入死亡状态 TERMINATED; }
)
Callable接口相比Runnable接口,有返回值,但是Thread没办法直接使用Callable去创建异步任务
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
RunnableFuture,即是任务,也可以获取异步任务结果。适配器模式。
public interface RunnableFuture<V> extends Runnable, Future<V> {
//异步任务,留给子类重新
void run();
}
Future可以获取异步任务的结果或者状态
public interface Future<V> { //取消异步任务 boolean cancel(boolean mayInterruptIfRunning); //判断任务是否去下 boolean isCancelled(); //判断任务是否完成 boolean isDone(); //阻塞获取异步任务执行结果 V get() throws InterruptedException, ExecutionException; //限定时间内获取异步任务执行结果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
RunnableFuture只是接口,具体的实现由FutureTask来实现,FutureTask是一个异步任务,可以获取执行结果或状态
public class FutureTask<V> implements RunnableFuture<V> { /** * 可能的转变状态 * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ //表示当前task状态 private volatile int state; //当前任务尚未执行 private static final int NEW = 0; //当前任务正在结束,尚未完全结束 private static final int COMPLETING = 1; //当前任务正常结束 private static final int NORMAL = 2; //当前任务执行过程中发生了异常,内部封装的Callable.run()向上抛出了异常 private static final int EXCEPTIONAL = 3; //当前任务被取消 private static final int CANCELLED = 4; //当前任务中断中 private static final int INTERRUPTING = 5; //当前任务已经中断 private static final int INTERRUPTED = 6; //装饰器模式,submit(Runnable/Callable) //这个属性就是FutureTask能够获取异步任务执行结果的关键,原本没办法直接传送Callable //给Thread,现在经过FutureTask的包装,FutureTask又实现了RunnableFuture接口, //因此便可以传送给Thread了 private Callable<V> callable; //正常情况:任务执行结束,outcome返回保存的结果 //非正常情况:callable向上抛异常,outcome返回异常 private Object outcome; //当前任务执行期间,保存当前任务执行的线程对象引用 private volatile Thread runner; //get可能有多个线程当前任务的结果,所以这里使用一种头插头取的队列 private volatile WaitNode waiters; /** * 返回结果给调用线程 * @param s completed state value ,get得到的状态 */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { //返回结果给调用线程 Object x = outcome; //任务正常结束 if (s == NORMAL) return (V)x; //任务取消 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); //callable就是程序员自己业务对象 this.callable = callable; //状态为NEW this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { //装饰器模式,将Runnable转为Callable //call()的结果可能是null或者插进来的result this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } }
FutureTask#run()
//线程池submit(runnable/callable) -》 //任务执行的入口 public void run() { //条件1:state != NEW成立,当前task已经被执行过,或者被取消掉 //条件2:CAS给runner设置为当下线程,只有一个线程能够cas runner成功 //两个条件是为了保证当前的任务只能有一个线程在执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //执行到这里,任务一定是NEW状态,当前线程抢占任务成功 try { //Callable 就是程序员自定义封装逻辑对象或者被装饰的对象。 //可以自定义传入一个Callable,里面的call方法调用了runnable的run方法,可以对该方法进行扩展 Callable<V> c = callable; //条件1:防止传的是一个null对象 //条件2:state == NEW 防止外部线程取消任务 if (c != null && state == NEW) { //结果引用 V result; //true 执行成功 没有异常 //false 执行失败 存在异常 boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //任务执行成功,设置任务结束标志 并且 唤醒所有的订单线程 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //任务执行结束,释放runner线程 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //获取任务结束时的状态 int s = state; //任务执行结束线程中断状态,处理中断 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
setException和set保存异步任务执行的结果
protected void setException(Throwable t) {
//设置状态为异常
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
//设置状态为异常
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒所有阻塞线程
finishCompletion();
}
}
protected void set(V v) {
//CAS设置为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//修改任务为已经完成
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//清空WaitNode链,把所有的等待线程唤醒
finishCompletion();
}
}
按照继承Thread或者是实现Runnable接口的方式,在start后直接去调用其run方法,该方法就是异步任务的逻辑。FutureTask也实现了Runnable接口,也可以作为Runnable实例作为异步任务,但是它强就强在了并没有在run方法内直接编写异步任务的逻辑,而是将异步任务的逻辑交给了Callable接口去实现,并且Callable返回值可以保存在outcome属性中,最终可以使用FutureTask#get()方法去获取任务执行的结果。
public class FutureTaskDemo { public static final int COMPUTE_TIMES = 1000000000; static class CallableTask implements Callable<Long>{ //异步任务 @Override public Long call() throws Exception { long start = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + "线程开始运行"); for (int i = 0; i < COMPUTE_TIMES; i++){ int j = i * 100; } long end = System.currentTimeMillis(); long used = end - start; System.out.println(Thread.currentThread().getName() + "线程运行结束"); return used; } } public static void main(String[] args) { //实现一个FutureTask实例,并且传递Callable给其构造函数作为异步任务 CallableTask callableTask = new CallableTask(); FutureTask<Long> futureTask = new FutureTask<Long>(callableTask); Thread thread = new Thread(futureTask,"returnThread"); thread.start(); System.out.println(Thread.currentThread().getName() + "做点自己的事情"); for (int i = 0; i < COMPUTE_TIMES; i++){ int j = i * 100; } System.out.println(Thread.currentThread().getName() + "获取异步任务执行结果"); try { System.out.println("异步任务执行耗时:" + futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
通过FutureTask获取异步任务的时候,有两种情况:
提醒:学习线程池之前首先需要有FutureTask和BlockingQueue的基础,没有学过的同学可以回过头看一下我的专栏
//丛上往下,值越来越小
//-1补码表示:111 1*29位;-1 左移 29位 是[111] 0*29位;转为整数是一个负数。
private static final int RUNNING = -1 << COUNT_BITS;
//0补码表示是全0;0 左移 29位还是0。[000]
private static final int SHUTDOWN = 0 << COUNT_BITS;
//1补码表示高位全是0,最低位是1;1 左移 29位 是[001] 0*29
private static final int STOP = 1 << COUNT_BITS;
//2补码表示高位全是0,最低位是10;2 左移 29位 是 [010] 0*29
private static final int TIDYING = 2 << COUNT_BITS;
//3补码表示高位全是0,最低位是11;3 左移 29位 是 [011] 0*29
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务
SHUTDOWN:线程池执行shutdown方法,这种状态下不再接受新的任务,但是会等待阻塞队列中的任务执行完毕
STOP:线程池执行shutdownNow方法,这种状态下不再接受新的任务,并且会中断现有任务,清空任务队列
TIDYING:这种状态下线程池所有任务已经终止或者处理完成,将会调用terminated钩子方法
TERMINATED:调用完钩子terminated之后的状态
//高三位表示线程池运行状态 //除去高三位的剩余位数表示线程池中拥有的线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //计算ctl低位用来存放线程池线程数量 //COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; //低COUNT_BITS位所能表示的最大数值。即 000 1*COUNT_BITS个1 //1 << COUNT_BITS == 1 << 29 = 001 00000000000000000000000000000 //(1 << 29) - 1 == 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //从上往下一次变大,RUNNING负数最小 //-1补码表示:111 1*29位;-1 左移 29位 是[111] 0*29位;转为整数是一个负数。 private static final int RUNNING = -1 << COUNT_BITS; //0补码表示是全0;0 左移 29位还是0。[000] private static final int SHUTDOWN = 0 << COUNT_BITS; //1补码表示高位全是0,最低位是1;1 左移 29位 是[001] 0*29 private static final int STOP = 1 << COUNT_BITS; //2补码表示高位全是0,最低位是10;2 左移 29位 是 [010] 0*29 private static final int TIDYING = 2 << COUNT_BITS; //3补码表示高位全是0,最低位是11;3 左移 29位 是 [011] 0*29 private static final int TERMINATED = 3 << COUNT_BITS; //作用:通过ctl获取当前线程池运行状态 private static int runStateOf(int c) { //CAPACITY:000 1*COUNT_BITS个1,即000 11111111111111111111111111111 //~CAPACITY:111 00000000000000000000000000000 //c = ctl //假设拿到ctl为:111 000000000000000111 //那么 111 000000000000000111 & 111 000000000000000000 = 111 000000000000000000,即RUNNING状态 return c & ~CAPACITY; } //作用:通过ctl获取线程池线程数量 //假设拿到ctl为:111 000000000000000111, //假设CAPACITY为:000 111111111111111 //那么 111 000000000000000111 & 000 111111111111111 = 000 000000000000000111,即线程池有7个线程 private static int workerCountOf(int c) { return c & CAPACITY; } //作用:根据线程池的状态和线程池中的线程数量来计算ctl //rs:线程池状态 //wc:线程池中的worker(线程)数量 //假设rs:111 000000000000000000,wc:000 000000000000000111 //那么:111 000000000000000000 | 000 000000000000000111 = 111 000000000000000111,得到新的ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; } //比较当前线程池ctl值所表示的线程池状态是否小于某个状态s private static boolean runStateLessThan(int c, int s) { return c < s; } //当前线程池的状态是否大于等于某个状态s private static boolean runStateAtLeast(int c, int s) { return c >= s; } //小于shutdown的一定是running private static boolean isRunning(int c) { return c < SHUTDOWN; } //作用:修改ctl,标示线程数量加1 //使用cas操作使ctl值加1,成功返回true,失败返回false。加1代表的是线程数量加1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } //作用:修改ctl,标示ctl线程数量减1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } //将ctl值减1,这个方法一定成功。失败就cas自旋重试,直到cas操作成功 private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } //任务队列。 //作用:存放任务的地方。当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue,就是阻塞队列中。 private final BlockingQueue<Runnable> workQueue; //线程池全局锁。 //作用:增加work,减少work 时,需要持有mainLock,修改线程池运行状态时,也需要持有mainLock private final ReentrantLock mainLock = new ReentrantLock(); //线程池中存放work(thread)线程的地方。 private final HashSet<Worker> workers = new HashSet<Worker>(); //当外部线程调用awaitTermination()方法时,外部线程会等待当前线程状态为termination为止 //等待的实现:将外部线程封装成waitNode放到了Condition队列中了,waitNode.Thread就是外部线程,会被park掉,处waiting状态 //当线程池 状态 变为 Termination 时,会去唤醒这些线程。通过termination.signalAll()去唤醒,唤醒之后,这些线程会进入阻塞队列,头结点会去抢占mainLockc //抢占到的线程会去继续执行awaitTermination()后面程序。这些线程都会正常执行。 //简单理解:termination.await()会将线程阻塞在这里 //termination.signalAll()会将阻塞在这的线程依次唤醒 private final Condition termination = mainLock.newCondition(); //作用:创建线程时会使用到线程工厂。 //使用Executors.xxx(),使用的是DefaultThreadFactory,不建议使用,建议自己实现ThreadFactory指定线程池名字 private volatile ThreadFactory threadFactory; //拒绝策略,juc提供了4种。一般自己业务实现 private volatile RejectedExecutionHandler handler; //核心线程数量 private volatile int corePoolSize; //线程池最大线程数量限制 private volatile int maximumPoolSize; //记录线程池生命周期内线程池线程数最大数量 private int largestPoolSize; //记录线程池所完成的任务总数 -> 当worker退出时,会将worker完成的任务累计到completedTaskCount private long completedTaskCount; //控制核心线程数量内的线程是否可用被回收,true可以,false不可以 //false:核心线程即使是空闲,也能够存活(默认) //true:超时回收 private volatile boolean allowCoreThreadTimeOut; //空闲线程存活时间。当allowCoreThreadTimeOut == false时,会维护线程数量内的线程存活,超出部分会被超时。 //allowCoreThreadTimeOut == true的话,核心数量内的线程空闲也会被回收 private volatile long keepAliveTime; //默认的拒绝策略AbortPolicy(抛异常) private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private final AccessControlContext acc;
Worker继承了AQS,所以需要实现其钩子方法tryAcquire和tryRelease,提供给模板方法使用。模板方法指的是AQS#release和AQS#acquire两个方法。
如果对于AQS不熟悉,最好是先去学习AQS的相关知识。
/** * Worker 基础了AQS * 采用了AQS的独占模式 * AQS的两个重要属性:state 和 ExclusiveOwnerThread * state:0 表示未被占用(只有等于0才可以尝试抢锁),大于0时表示被占用,小于0时表示初始状态(这种情况不能被抢锁) * ExclusiveOwnerThread:表示独占锁的线程。 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ //作用:Worker内部封装的工作线程 final Thread thread; /** Initial task to run. Possibly null. */ //作用:假设firstTask不为空,那么当worker启动后(worker内的Thread启动后)会优先执行firstTask,当执行完firstTask后,会到queue中获取下一个任务 Runnable firstTask; /** Per-thread task counter */ //记录当前worker完成的任务数量 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ //first可以为空,为空启动后会到queue中获取。 Worker(Runnable firstTask) { //设置AQS独占模式为初始化状态,这个时候不能被抢占锁 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //当worker启动时会执行run方法 public void run() { //runWorker:ThreadPoolExecutor的核心方法 runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. //判断当前worker的独占锁是否被独占,0表示未被占用,1表示已占用 protected boolean isHeldExclusively() { return getState() != 0; } //重写钩子函数 //尝试去占用worker的独占锁 protected boolean tryAcquire(int unused) { //使用CAS修改AQS中的state,期望值为0,0表示未被占用。 //修改成功表示当前线程抢占成功,那么则设置setExclusiveOwnerThread为当前线程 //返回值表示是否抢占成功 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //重写钩子函数 //尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease() protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //加锁,加锁失败时会阻塞线程,知道获取到锁为止 public void lock() { acquire(1); } //尝试加锁,如果当前锁未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false public boolean tryLock() { return tryAcquire(1); } //一般情况下,调用unlock要保证当前线程是持有锁的 //特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0 //启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0 public void unlock() { release(1); } //就是返回worker的lock是否被占用 public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; //线程是启动状态,并且没有设置过中断,那么就设置中断 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
//加锁,加锁失败时会阻塞线程,知道获取到锁为止
public void lock() { acquire(1); }
//一般情况下,调用unlock要保证当前线程是持有锁的
//特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0
//启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0
public void unlock() { release(1); }
lock调用了AQS中的模板方法,我们来看看AQS中的这个模板方法。
//独占模式 获取资源模板方法:调用tryAcquire独占模式获取资源钩子方法 public final void acquire(int arg) { //条件1:!tryAcquire 尝试获取锁,没有获取成功返回true //acquireQueued需要做什么? //1、当前节点有没有被park?挂起?没有 =》所以需要挂起的操作 //2、唤醒之后的逻辑在哪呢? =》 唤醒之后的逻辑。 //条件2.1:首先 addWaiter 将当前线程封装成node入队(两种情况,快速入队,完整入队) //条件2.2:acquireQueued挂起当前线程,唤醒后相关的逻辑。 //acquireQueued返回true,表示挂起过程中线程被中断唤醒过...false表示未被中断过 //为了不浪费资源,acquireQueue过程中,tryAcquire成功,返回,没有tryAcquire成功,会阻塞该线程,等待前驱节点唤醒后才启动循环 //如果头结点如果退出了acquireQueue自旋,获取了锁,就会转去执行临界区代码,其他线程继续自旋 if (!tryAcquire(arg) //子类钩子方法的实现 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //再次设置中断标记为true selfInterrupt(); }
//重写钩子函数
//尝试去占用worker的独占锁
protected boolean tryAcquire(int unused) {
//使用CAS修改AQS中的state,期望值为0,0表示未被占用。
//修改成功表示当前线程抢占成功,那么则设置setExclusiveOwnerThread为当前线程
//返回值表示是否抢占成功
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
如果tryAcquire失败,那么久将当前抢锁线程封装为Node节点,放到AQS同步队列末尾。
//节点指定模式入队 //最终返回值是当前线程包装的节点 private Node addWaiter(Node mode) { //Node.EXCLUSIVE //构建mode,把当前线程封装到对象node中 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速入队 Node pred = tail; //pred != null:条件成立,当前节点不是第一个节点 if (pred != null) { node.prev = pred;//这里需要注意,可能这步执行完后,可能unparkSuccessor执行了,此时通过next拿不到节点,只能从后往前用prev if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //完整入队 //前提:1、当前节点是第一个节点 2、cas操作修改尾节点失败,需要进入自旋设置 enq(node); return node; }
//节点入队 private Node enq(final Node node) { //只有入队成功才会跳出循环 for (;;) { Node t = tail; //条件成立:当前节点是第一个节点 //说明当前 锁被占用,且目前线程 可能是第一个获取锁失败的线程(当时可能存在一批获取锁失败的线程) if (t == null) { // Must initialize //作为当前持锁线程的第一个后继线程,需要做什么? //1、因为当前持锁的线程,它获取锁时,直接tryAcquire成功了,没有向 阻塞队列 中添加任何node,所以作为后继需要为它擦屁股 //2、为自己追加自己的node //设置一个无参的new Node()作为头结点 if (compareAndSetHead(new Node())) //头尾节点设置成相同的 tail = head; //特别注意:这里没有reture,继续循环设置自己的node } else { //普通入队方式 //当前节点不是第一个,防止cas设置尾节点失败,自旋 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //返回当前节点的前置节点 return t; } } } }
//一般情况下,调用unlock要保证当前线程是持有锁的
//特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0
//启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0
public void unlock() { release(1); }
unlock调用了AQS中的模板方法release,release调用了Worker的子类实现的tryRelease钩子方法
//独占模式 释放资源模板方法:调用tryRelease独占模式释放资源钩子方法 public final boolean release(int arg) { //尝试释放锁,tryRelease(会设置state[和ownerThread],不会设置head) //返回 true 表示当前线程已经完全释放锁 //返回false,说明当前线程尚未完全释放锁 //独占模式 钩子函数 释放资源,成功返回true,失败返回false if (tryRelease(arg)) { //不存在重入,完全释放锁了,可以唤醒下一个线程了 //head什么情况会被创建出来? //当持锁线程未释放线程时,且持锁期间 有其他线程想要获取锁时, //其他线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁线程构建一个head节点, //然后后续线程节点会追加到head节点后面 Node h = head; //node状态,可选值(0,SIGNAL[-1],CANCELLED[1],CONDITION,PROPAGATE) //waitStatus == 0 默认状态 //waitStatus > 0 取消状态 //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点 //条件1成立,说明队列中的head节点以及初始化过了,ReentrantLock在使用期间发生过多线程竞争了 //条件2成立,说明head后面一定插入过node节点【队列中不只有一个节点】 if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
//重写钩子函数
//尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease()
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/** * Wakes up node's successor, if one exists. * * @param node the node */ //waitStatus == 0 默认状态 //waitStatus > 0 取消状态 //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点 //作用:唤醒当前节点的下一个节点 //release()进来,则参数Node为head节点,且node.waitStatus != 0 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获取当前节点的状态 int ws = node.waitStatus; //若头结点状态小于0,设置为0,表示初始状态,移除head节点交给唤醒的节点在acquireQueue中tryAcquire成功移除掉 if (ws < 0)//-1 SIGNAL状态 改成零的原因:因为当前节点以及完成后继节点的任务 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //node节点的第一个后继节点 Node s = node.next; //条件1: //s什么时候等于null? //1、当前节点就是tail节点时,s == null //2、addWriter操作中,还没有设置好pred.next的指向的时候,就会产生s == null的情况 //需要找到可以被唤醒的节点 //条件2: //成立:说明 当前node节点的后继节点是取消状态 //s为null 或者 s是CANCELLED状态(s 为 null 可能是在shouldParkFaildAcquire中是cancelled状态被移除了) if (s == null || s.waitStatus > 0) { //查找可以被唤醒的节点 s = null; //从尾向头遍历,找到一个离当前node最近的可以被唤醒的node。node可能找不到 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果找到合适的可以被唤醒的node,则唤醒,找不到啥也不做 if (s != null) //唤醒这个节点的线程,唤醒的节点会重新设置head LockSupport.unpark(s.thread); }
为什么要从后往前找呢?
因为在lock过程中,addWaiter和enq,会先设置新节点的prev,但是next是后面才设置的,可能会导致设置完prev后,调用了unlock -> release -> tryRelease,unparkSuccessor。
此时unparkSuccessor拿不到释放资源的节点的后继节点,所以需要通过从后往前去找到该节点,并且unpark其线程
//节点指定模式入队 //最终返回值是当前线程包装的节点 private Node addWaiter(Node mode) { //Node.EXCLUSIVE //构建mode,把当前线程封装到对象node中 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速入队 Node pred = tail; //pred != null:条件成立,当前节点不是第一个节点 if (pred != null) { node.prev = pred;//这里需要注意,可能这步执行完后,可能unparkSuccessor执行了,此时通过next拿不到节点,只能从后往前用prev if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //完整入队 //前提:1、当前节点是第一个节点 2、cas操作修改尾节点失败,需要进入自旋设置 enq(node); return node; }
unpark成功后,那么该线程将从acquireQueue中重新开始自旋
//独占模式 获取资源模板方法:调用tryAcquire独占模式获取资源钩子方法 public final void acquire(int arg) { //条件1:!tryAcquire 尝试获取锁,没有获取成功返回true //acquireQueued需要做什么? //1、当前节点有没有被park?挂起?没有 =》所以需要挂起的操作 //2、唤醒之后的逻辑在哪呢? =》 唤醒之后的逻辑。 //条件2.1:首先 addWaiter 将当前线程封装成node入队(两种情况,快速入队,完整入队) //条件2.2:acquireQueued挂起当前线程,唤醒后相关的逻辑。 //acquireQueued返回true,表示挂起过程中线程被中断唤醒过...false表示未被中断过 //为了不浪费资源,acquireQueue过程中,tryAcquire成功,返回,没有tryAcquire成功,会阻塞该线程,等待前驱节点唤醒后才启动循环 //如果头结点如果退出了acquireQueue自旋,获取了锁,就会转去执行临界区代码,其他线程继续自旋 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //再次设置中断标记为true selfInterrupt(); }
//自旋抢占,节点入队后,启动自旋抢占锁的流程 //当前节点会在死循环中不断获取同步状态,并且不断在前驱节点上自旋,前驱节点是头结点的时候才可以获取锁。 //原因: //1、头结点是成功获取同步状态(锁)的节点,头结点在释放同步状态后,会唤醒后继节点,后继节点唤醒后检查自己的前驱节点是否是头结点 //2、维护同步队列的FIFO原则,节点进入同步队列后,就进行了自旋的过程,每个节点都在不断的for死循环 final boolean acquireQueued(final Node node, int arg) { //true:表示当前线程抢占锁成功。普通情况下【lock】当前线程早晚会拿到锁 //false:表示失败,需要执行出队逻辑..(响应中断的lock的方法时再讲) boolean failed = true; try { //当前线程是否被中断 boolean interrupted = false; //自旋:当前节点会在前驱节点上不断自旋,直到tryAcquire成功,头结点出队,自己成为头结点 for (;;) { //什么时候会执行这里? //1、进入for循环,在线程尚未park前会执行 //2、线程park之后,被唤醒后,也会执行这里 //获取当前节点的前置节点 final Node p = node.predecessor(); //条件1:前置节点是否是head节点,head.next节点在任何时候都有权利去争夺锁 //条件2:tryAcquire(arg) //条件2成立:说明head对应的线程已经释放锁了,head.next节点对应的线程,正好获取到锁了 //条件2不成立:说明head对应的线程,还没有释放锁,head.next任然需要被park //前驱节点是head,继续调用子类实现的独占模式获取资源钩子方法tryAcquire if (p == head && tryAcquire(arg)) { //拿到锁之后需要做什么 //1、设置自己为head节点 setHead(node); //将上个线程对应的node的引用置为null,协助老的head出队.. p.next = null; // help GC //当前线程获取锁过程没有异常 failed = false; //返回当前线程的中断标记 return interrupted; } //条件1:shouldParkAfterFailedAcquire:当前线程获取锁资源失败后,是否需要挂起。 //true:当前线程需要挂起 false:当前线程不需要挂起 //条件2:parkAndCheckInterrupt:挂起线程,返回中断标记(唤醒:1.正常唤醒 其他线程unpark 2.其他线程给当前挂起的线程一个中断信号) //判断当前线程是否需要挂起,如果需要挂起的调用parkAndCheckInterrupt,直到被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //interrupted = true,表示当前node对应的线程是被中断信号唤醒的 interrupted = true; } } finally { //如果没有成功获取资源(timeout 或者 可中断情况下被中断了) if (failed) //取消请求,将当前节点从队列移除 cancelAcquire(node); } }
/* * Methods for setting control state */ /** * Transitions runState to given target, or leaves it alone if * already at least the given target. * * @param targetState the desired state, either SHUTDOWN or STOP * (but not TIDYING or TERMINATED -- use tryTerminate for that) */ //小于targetState状态,则变为该状态,大于等于该状态,则不变 private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); //条件成立,假设targetState == SHUTDOWN,说明当前线程状态 >= SHUTDOWN //条件不成立,假设targetState == SHUTDOWN,说明当前线程状态 == RUNNING if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } /** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
线程池任务提交脑子里需要有这张图片
1、新任务提交的时候,如果核心线程数还未达到,直接开线程处理
2、如果核心线程数已经达到了,如果阻塞队列未满,放入阻塞队列,等待处理
3、如果阻塞队列也满了,最大线程数量未满,开一个线程来处理
4、如果最大线程数也满了,执行拒绝策略
//任务队列。 //作用:存放任务的地方。当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue,就是阻塞队列中。 private final BlockingQueue<Runnable> workQueue; //Runnable可以是普通你的Runnable实现类,也可以是FutureTask public void execute(Runnable command) { //非空判断 if (command == null) throw new NullPointerException(); //获取ctl最新值。ctl分为两部分,高3位(线程池状态)和其他位(当前线程池线程数量) int c = ctl.get(); //workerCountOf:取出当前线程数量 //条件成立:当前线程数量小于核心线程数,此次提交任务直接创建一个新的worker,对应线程池中多了一个新的线程 if (workerCountOf(c) < corePoolSize) { //addWorker即为创建线程的过程,会创建worker对象,并将command作为firstTask //core == true表示采用核心线程数数量限制 //core == false表示采用maxmumPoolSize线程数限制 if (addWorker(command, true)) //创建成功后直接返回,addWorker中会启动新创建的worker,将firstTask执行 return; //执行到这里,说明addWorker失败 //失败原因: // ①、存在并发现象:execute方法可能是有多个线程调用的,addWorker方法成立后,其他线程可能也成立了,并且向线程池中创建了worker, // 这个时候线程池中的核心线程数已经达到,所以失败了 //②、当前线程池状态发生改变了。RUNNING SHUTDOWN STOP TIDYING TERMINATION //当线程池状态非RUNNING状态时,addWorker(firstTask != null, true | false)一定会失败 //当线程池处于SHUTDOWN状态下也有可能提交成功,但是前提firstTask == null 而且当前 queue 不为空(特殊情况) //线程池状态只有在RUNNING的时候,addWorker才有可能成功 c = ctl.get(); } //执行到这里有几种情况 //1、当前线程数已经达到了corePoolSize //2、addWorker失败 if (isRunning(c) //条件成立:说明当前线程池处于RUNNING状态,则 && workQueue.offer(command)//尝试将task放到workQueue中 ) { //执行到这里,说明offer提交任务成功 //再次获取ctl保存到recheck中 int recheck = ctl.get(); if (! isRunning(recheck)//成立说明提交到队列之后线程池状态被外部线程修改了,比如shutdown()或者shutdownNow()。这种情况需要把刚刚提交的任务删除掉 && remove(command)//有可能成功,有可能失败。成功说明提交后线程池中的线程还未处理,失败,在shutdown()或者shutdownNow()之前就被线程池中的线程给处理了。 ) //提交之后,线程池状态为非RUNNING,且任务出队成功,走拒绝策略 reject(command); //有几种情况来这? //1、当前线程池是RUNNING(这个大概率) //2、线程池状态时非RUNNING,但是remove提交的任务失败(这个小概率) //担心 当前线程池是RUNNING状态,但是线程池中的存活线程数量是0 //这是一个担保机制,保证线程池在RUNNING下有一个线程在工作 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //执行到这里,有几种情况 //1、线程不是处于RUNNING //2、offer失败 //①、offer失败,需要做什么?说明当前queue满了!这个时候如果当前线程数量尚未达到maxmumPoolSize的话,尝试创建新线程,直接执行task的command //假设当前线程池的线程数达到maxmumPoolSize,这里也会失败,走拒绝策略 // //②、线程池状态为非RUNNING状态,这个时候因为command != null addWorker 一定是返回false else if (!addWorker(command, false)/**/) reject(command); }
//线程池中存放work(thread)线程的地方。 private final HashSet<Worker> workers = new HashSet<Worker>(); //返回值总结: //true:表示创建worker成功,且启动成功 //false:表示创建失败 //1、线程池状态rs > SHUTDOWN(STOP/TIDYING/TERMINATION) //2、线程池状态rs = SHUTDOWN,但是队列中已经没有任务了,或者当前状态是SHUTDOWN且队列未空,但是firstTask不为null //3、当前线程池已经达到指定指标(corePoolSize或者maxmumPoolSize), //4、threadFactory创建的线程是null //firstTask:可以为null,表示启动worker后,worker自动到queue中获取任务,如果不是null,则worker优先执行firstTask //core:表示采用的线程数线程限制,如果为true 采用 核心线程数限制,false采用maxmumPoolSize线程数限制 private boolean addWorker(Runnable firstTask, boolean core) { //自旋 retry: //外层自旋作用:负责判断当前线程池状态是否允许创建线程,不允许则直接退出addWorker方法,可以则进入内部自旋(内部自旋的作用为了让ctl加1)。 for (;;) { //获取当前ctl int c = ctl.get(); //获取线程池的状态 int rs = runStateOf(c); // Check if queue empty only if necessary. //条件1:rs >= SHUTDOWN,条件成立,说明当前线程池状态不是RUNNING //条件2:! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) //条件2前置条件:当前的线程池状态不是RUNNING //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() //表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的 & 当前任务队列不是空 //排除掉上面的情况,当前线程池是SHUTDOWN状态但是队列里面还有任务尚未处理完,这个时候允许添加worker,但是不允许再次提交task if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //当线程池状态rs > SHUTDOWN //rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask == null return false; //内部自旋作用:获取创建线程池的令牌,创建成功则ctl+1 for (;;) { //获取当前线程池中,线程数量 int wc = workerCountOf(c); //条件1:wc >= CAPACITY ,永远不可能处理,CAPACITY5亿多的 //条件2:wc >= (core ? corePoolSize : maximumPoolSize) //core == true,判断当前线程数是否 大于 核心线程数 //core == false,判断当前线程数是否 大于 maxmumPoolSize线程数限制 if (wc >= CAPACITY || //core:表示采用的线程数线程限制,如果为true 采用 核心线程数限制,false采用maxmumPoolSize线程数限制 wc >= (core ? corePoolSize : maximumPoolSize)) //别放了,线程池放不下你了 return false; //前提条件:当前线程数量没有超过线程池线程数限制 //成功:ctl加1(线程数+1) //失败;可能存在其他线程修改过了ctl //可能发生什么事 //1.其他线程execute()申请过令牌了,在这之前,导致cas失败 //2.其他外部线程可能调用过shutdown()或者shutdownNow()导致线程池发生变化了,状态发送变化则ctl发生变化,期望值不正确,cas必然失败 if (compareAndIncrementWorkerCount(c)) //线程数加1,cas成功,退出外层自旋(退出自旋的唯一方式) break retry; //前提条件:线程数加1失败 //原因:可能存在其他线程修改了ctl //重新获取ctl c = ctl.get(); // Re-read ctl //判断线程池ctl是否被其他线程修改 //成立:被修改了状态,进入外层循环判断状态 //不成立:cas修改ctl发生看竞争,再次自旋内层循环,重新尝试cas if (runStateOf(c) != rs) //被其他线程修改,退出本次外层自旋,重新进入外层循环,获取ctl continue retry; // else CAS failed due to workerCount change; retry inner loop } } //前提条件:break retry,线程池线程数ctl加1成功 //workerStarted表示创建的worker是否已经启动,false未启动,true启动 boolean workerStarted = false; //workerAdded表示创建的worker是否已经添加到线程池(HashSet) boolean workerAdded = false; Worker w = null; try { //创建Worker,执行完后,线程应该是已经创建完成的 w = new Worker(firstTask); //将创建的Worker节点的线程赋值给t final Thread t = w.thread; //为什么要做t != null 的判断? //防止ThreadFactory实现类有bug,因为ThreadFactory是一个接口,谁都可以实现 //万一直接给个thread是null,这里就有问题,所以要判断是否为null if (t != null) { //拿到全局锁保存引用到mainLock中 final ReentrantLock mainLock = this.mainLock; //持有全局锁,可能会阻塞,知道获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持有锁 mainLock.lock(); //从这里加锁之后,其他线程是无法修改线程池状态的 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取当前的线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||//线程池处于RUNNING状态 (rs == SHUTDOWN && firstTask == null)//线程池处于SHUTDOWN状态,但是firstTask为null ) { //判断线程是否已经start //防止程序员在ThreadFactory创建线程返回给外部之前将线程start了 if (t.isAlive()) // precheck that t is startable //失败的来这里了! throw new IllegalThreadStateException(); //将创建的worker添加到线程池中 //线程还没start,将worker线程放到HashSet中 workers.add(w); //获取线程池的线程数量 int s = workers.size(); //largestPoolSize:记录线程池生命周期内线程数最大值 if (s > largestPoolSize) //修改线程池生命周期内的最大线程数量 largestPoolSize = s; //线程池添加线程成功 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } if (workerAdded) { //线程池添加线程成功,启动线程 //执行的runnable实际上就是worker重写的run方法,会不断执行阻塞队列中的任务 t.start(); //修改线程启动状态 workerStarted = true; } } } finally { //判断线程是否启动成功 if (! workerStarted) //1、添加线程失败,释放令牌 //2、将当前worker清理出workers集合 addWorkerFailed(w); } //返回新创建线程启动状态 return workerStarted; }
shutdown,shutdownNow:这两个方法都不会等待线程池完全关闭
awaitTermination:等待线程池完全关闭
因为shutdown和shutdownNow使用interrupt只是设置了中断标记位为true,线程关闭的处理需要交给用户来处理,但是线程是在sleep,wait中,则会抛出 InterruptedException ,这个时候会设置中断标记位为false,用户没办法捕获状态关闭线程,因此需要重新设置状态标记位,所以我们就可以借鉴dubbo关闭线程池的源码。
dubbo关闭线程池首先会先shutdown,再通过awaitTermination来检查是否线程池已经完全关闭,如果是true则已经关闭,如果是false则等待超时,没有完全关闭,可能是因为中断标记位被sleep或者wait抛出的异常重新设置了false,因此我们可以重新shutdown来给他设置上中断标记位。
public static void gracefulShutdown(Executor executor, int timeout) { if (!(executor instanceof ExecutorService) || isShutdown(executor)) { return; } final ExecutorService es = (ExecutorService) executor; try { es.shutdown(); // Disable new tasks from being submitted } catch (SecurityException ex2) { return; } catch (NullPointerException ex2) { return; } try { if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { es.shutdownNow(); } } catch (InterruptedException ex) { es.shutdownNow(); Thread.currentThread().interrupt(); } if (!isShutdown(es)) { newThreadToCloseExecutor(es); } }
dubbo如何关闭线程池:https://www.cnblogs.com/notlate/p/10204834.html
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //作用:Worker内部封装的工作线程 final Thread thread; //作用:假设firstTask不为空,那么当worker启动后(worker内的Thread启动后)会优先执行firstTask,当执行完firstTask后,会到queue中获取下一个任务 Runnable firstTask; //记录当前worker完成的任务数量 volatile long completedTasks; //first可以为空,为空启动后会到queue中获取。 Worker(Runnable firstTask) { //设置AQS独占模式为初始化状态,这个时候不能被抢占锁 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口 this.thread = getThreadFactory().newThread(this); } //当worker启动时会执行run方法 public void run() { //runWorker:ThreadPoolExecutor的核心方法 runWorker(this); } //重写钩子函数 //尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease() protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } }
//w就是启动worker final void runWorker(Worker w) { //wt == w.thread Thread wt = Thread.currentThread(); //将初始执行task赋值给task Runnable task = w.firstTask; //清空w.firstTask的引用 w.firstTask = null; //释放锁,因为创建的时候就加锁了(state在构造函数中设置为了-1) //为了初始化 worker state == 0 和 exclusiveOwnerThread == null w.unlock(); // allow interrupts //是否是突然退出 //true -> 发生异常了,表示线程是突然退出的,需要回头做一些处理 //false -> 正常退出 boolean completedAbruptly = true; try { //有任务就做,这就是线程池提交任务执行的核心了 //条件1:task != null,firstTask是不是 null //条件2:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,该方法会阻塞 //getTask如果返回null,当前线程需要执行结束逻辑。 while (task != null || (task = getTask()) != null) { //worker设置独占锁为当前线程 //为什么要设置独占锁?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断worker是否正在工作 w.lock(); //条件1:runStateAtLeast(ctl.get(), STOP),说明线程池目前处于STOP或TYDING或TERMINATION,此时线程一定要给他一个中断信号 //条件1成立:runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted() //上面条件成立,说明当前线程池状态时 >= STOP 且当前线程是未设置中断状态的,此时需要接入到if里面,给当前线程一个中断。 //假设:runStateAtLeast(ctl.get(), STOP) == false //Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 在干吗 //Thread.interrupted()获取当前中断状态,且设置中断为为false。连续调用两次,第二次一定是返回false //runStateAtLeast(ctl.get(), STOP)大概率这里还是false //其实它在强制刷新线程的中断标记为false,因为有可能上一次执行task if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //这个if的作用,只要是线程池状态为STOP TYDING TERMINATION 无论当前线程是否是中断,都设置为中断,接触其他地方的阻塞状态 wt.interrupt(); try { //钩子方法,留给子类实现 beforeExecute(wt, task); //表示异常情况,如果thrown不为空,表示task运行过程中,向上抛出异常了。 Throwable thrown = null; try { //task可能是FutureTask也可能是普通的Runnable接口实现类 //如果前面是通过submit提交的runnable/callable会被封装成FutureTask task.run(); //封装futureTask抛出的异常 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //钩子方法,留给子类实现 afterExecute(task, thrown); } } finally { //将局部变量task置为空 task = null; //更新worker完成任务数量 w.completedTasks++; //worker处理完一个任务后,会释放掉独占锁,然后再次到queue中获取任务 //1、正常情况下,会再次回到getTask()那里获取任务。while(getTask) //2、task.run()时内部抛出异常了 w.unlock(); } } //什么情况下,会来到这里? //1、getTask()方法返回null,说明当前线程应该执行退出逻辑了 completedAbruptly = false; } finally { //task.run()内部抛出异常时,直接从w.unlock();调到这里 //正常退出completedAbruptly == false。异常退出completedAbruptly == true processWorkerExit(w, completedAbruptly); } }
worker处理结束执行processWorkerExit逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //记录线程池完成的任务数量 completedTaskCount += w.completedTasks; //当前线程和队伍任务完成,移除 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); //考虑移除之后,线程池线程线程够不够用? int c = ctl.get(); //条件成立,说明当前线程池的状态时RUNNING if (runStateLessThan(c, STOP)) { //条件成立,说明当前线程是正常退出 if (!completedAbruptly) { //allowCoreThreadTimeOut:控制核心线程数量内的线程是否可用被回收,true可以,false不可以 //false:核心线程即使是空闲,也能够存活(默认) //true:超时回收 //min 表示线程池最低持有的线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //前提条件:线程池状态为RUNNING //条件成立:当前线程池最低线程数为0,并且阻塞队列还有任务,此时线程池必须要有线程来处理这些任务 //如果min设置为1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //条件成立:当前线程池线程数量已经超过了保证线程池运行的最低线程数量,直接退出 if (workerCountOf(c) >= min) return; // replacement not needed } //没达到最低线程,添加一个线程来处理 addWorker(null, false); } }
通过submit提交的runnable或者callable方法,会包装成futureTask,再传入execute调用执行任务方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
看到这里,是不是觉得很奇怪,Worker#run()方法究竟是在什么时候调用的?
这个我们需要看看Worker的构造方法
Worker(Runnable firstTask) {
//设置AQS独占模式为初始化状态,这个时候不能被抢占锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口
this.thread = getThreadFactory().newThread(this);
}
这个构造方法实际上传入了该worker,并且该worker实现了Runnable,重写了run方法
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } //r:Worker实现了Runnable,因此可以直接传入Worker,以后调用worker.start //或者是worker.thread.start()实际上都是调用了worker里面的run方法 //这个start的调用,实际上是在addWorker添加worker成功后调用的,run会调用其runWorker方法 //不断执行getTask去获取阻塞队列中的任务去执行 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
private Runnable getTask() { //表示当前线程获取任务是否超时,默认是false,true表示已超时 boolean timedOut = false; // Did the last poll() time out? //自旋 for (;;) { //获取最新ctl值保存到c int c = ctl.get(); //获取线程池当前运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. //条件1:rs >= SHUTDOWN //条件成立:当前线程池是非RUNNING状态,可能是SHUTDOWN,STOP,... //条件2:rs >= STOP || workQueue.isEmpty() //条件2.1:rs >= STOP,成立说明当前的状态最低也是STOP状态,一定要返回null(STOP不接受新任务,不执行任务队列任务,直接中断执行中的任务) //条件2.2:前置条件 状态时SHUTDOWN,workQueue.isEmpty()条件成立说明当前线程池状态为SHUTDOWN状态,且任务队列为空,此时一定返回null //返回null,runWorker方法将会返回null的线程执行退出线程池的逻辑 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //使用cas + 死循环的方式让ctl减1 decrementWorkerCount(); return null; } //执行到这里有几种情况: //1、线程池是RUNNING状态 //2、线程池是SHUTDOWN状态,但是队列未空,此时可以创建线程(线程池状态为SHUTDOWN,不接受新任务但是药执行完任务队列的任务) //获取线程池中的线程数量 int wc = workerCountOf(c); // Are workers subject to culling? //timed == true 表示 当前线程获取task时是支持超时机制的,使用queue.poll(xxx,xxx); //当获取task超时的情况下,下一次自旋就可能返回null了 //timed == false 表示当前的线程是不支持超时机制的,当前线程会使用 queue.take() //情况1:allowCoreThreadTimeOut == true,表示核心线程数内的线程也可以被回收。所有线程都是支持超时机制的。都是使用queue.poll获取task //情况2:allowCoreThreadTimeOut == false,表示当前线程池会维护核心数量内的线程 //wc > corePoolSize //条件成立:当前线程池中的线程数量都是大于核心线程数的,此时让所有路过的这里的线程,都是用poll支持超时的方式去获取任务 //这样,就会可能有一部分线程获取不到任务,获取不到任务,返回null,然后runWorker会执行线程退出逻辑 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //条件1:(wc > maximumPoolSize || (timed && timedOut)) //条件1.1:wc > maximumPoolSize为什么会成立?setMaxmumPoolSize()方法,可能存在外部线程将线程池最大线程数量设置比初始时小 //条件1.2:timed && timedOut条件成立:前提条件:当前线程使用poll方式获取task。上一次循环时,使用poll方式获取任务时超时了 //条件1为true,表示线程可以被回收,达到回收标准了,当确实需要回收时回收 //条件2:wc > 1 || workQueue.isEmpty() //条件2.1:wc > 1 条件成立:说明当前线程池中还有其他线程,当前线程可以直接回收,返回null //条件2.2:workQueue.isEmpty(),前置条件wc==1,条件成立:说明当前任务队列已经空了,最后一个线程也可以放心退出 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //使用CAS机制,将ctl -1,减1成功的线程返回null //cas成功的返回null //cas失败?为什么会cas失败 //1、其他线程先你一步退出了 //2、线程池状态发生了变化。 if (compareAndDecrementWorkerCount(c)) return null; //再次自旋时,timed有可能时false了,因为当前线程cas失败,很有可能是因为其他线程退出导致的,再 continue; } //这里才是正式开始获取任务 try { //take:会响应中断,会一直阻塞到得到任务或者当前线程中断 //poll:会响应中断,会一直阻塞到超时返回null或者拿到任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //前提条件:获取不到元素 //获取任务超时,可以尝试减少worker了 timedOut = true; } catch (InterruptedException retry) { //中断了,重试 timedOut = false; } } }
getTask方法需要结合调用他的方法runWorker来看,如果getTask方法返回为null,那么runWorker将会直接调用processWorkerExit(w, completedAbruptly)方法执行减少worker的退出操作,该操作首先是累加完成的任务数量,减少线程池线程数量(worker),最后如果当前线程池还是RUNNING状态,那么要保持其正常运转需要的最低线程数,没有达到就给他新增线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //记录线程池完成的任务数量 completedTaskCount += w.completedTasks; //当前线程和队伍任务完成,移除 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); //考虑移除之后,线程池线程线程够不够用? int c = ctl.get(); //条件成立,说明当前线程池的状态时RUNNING if (runStateLessThan(c, STOP)) { //条件成立,说明当前线程是正常退出 if (!completedAbruptly) { //allowCoreThreadTimeOut:控制核心线程数量内的线程是否可用被回收,true可以,false不可以 //false:核心线程即使是空闲,也能够存活(默认) //true:超时回收 //min 表示线程池最低持有的线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //前提条件:线程池状态为RUNNING //条件成立:当前线程池最低线程数为0,并且阻塞队列还有任务,此时线程池必须要有线程来处理这些任务 //如果min设置为1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //条件成立:当前线程池线程数量已经超过了保证线程池运行的最低线程数量,直接退出 if (workerCountOf(c) >= min) return; // replacement not needed } //没达到最低线程,添加一个线程来处理 addWorker(null, false); } }
钩子方法beforeExecute:重新初始化ThreadLocal线程本地变量,更新日志,计时统计,更新上下文变量
钩子方法afterExecute:清除ThreadLocal线程本地变量,更新日志记录,收集统计信息,更新上下文变量
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
public class ThreadPoolExecutorGouzi { public static void main(String[] args) { ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(6)){ private final ThreadLocal<Long> threadLocal = new ThreadLocal<>(); public ThreadLocal<Long> getThreadLocal() { return threadLocal; } @Override protected void beforeExecute(Thread t, Runnable r) { //开始计时 this.threadLocal.set(System.currentTimeMillis()); super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); //计时结束 Long used = System.currentTimeMillis() - this.threadLocal.get(); this.threadLocal.set(used); System.out.println(Thread.currentThread().getName() + "--用时:" + used); this.threadLocal.remove(); } }; for (int i = 0; i < 10; i++){ poolExecutor.submit(() -> { int sum = 0; for(int j = 0; j < 1000000000; j++){ sum += j; } System.out.println(Thread.currentThread().getName() + "--sum:" + sum); }); } try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } poolExecutor.shutdown(); } }
"D:\Program Files\Java\jdk1.8.0_251\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 2020.1.4\lib\idea_rt.jar=64385:D:\Program Files\JetBrains\IntelliJ IDEA 2020.1.4\bin" -Dfile.encoding=UTF-8 -classpath "D:\Program Files\Java\jdk1.8.0_251\jre\lib\charsets.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\deploy.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\access-bridge-64.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\cldrdata.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\dnsns.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\jaccess.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\jfxrt.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\localedata.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\nashorn.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunec.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunjce_provider.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunmscapi.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunpkcs11.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\zipfs.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\javaws.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jce.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jfr.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jfxswt.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jsse.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\management-agent.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\plugin.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\resources.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\rt.jar;D:\project\learning\juc-learning\target\classes" book2.ThreadPoolExecutorGouzi pool-1-thread-2--sum:-1243309312 pool-1-thread-2--用时:398 pool-1-thread-3--sum:-1243309312 pool-1-thread-3--用时:401 pool-1-thread-4--sum:-1243309312 pool-1-thread-4--用时:407 pool-1-thread-1--sum:-1243309312 pool-1-thread-1--用时:417 pool-1-thread-3--sum:-1243309312 pool-1-thread-3--用时:406 pool-1-thread-1--sum:-1243309312 pool-1-thread-3--sum:-1243309312 pool-1-thread-3--用时:355 pool-1-thread-4--sum:-1243309312 pool-1-thread-4--用时:755 pool-1-thread-2--sum:-1243309312 pool-1-thread-1--用时:745 pool-1-thread-2--用时:764 pool-1-thread-3--sum:-1243309312 pool-1-thread-3--用时:380 Process finished with exit code 0
public interface RejectedExecutionHandler {w4
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
//调用者线程执行策略 public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //e.isShutdown():false表示线程池处在RUNNING状态 true表示线程池处在其他状态 //条件1:成立表示线程池处在RUNNING状态 //线程池只有在RUNNING状态下才可以继续处理新的任务 if (!e.isShutdown()) { r.run(); } } }
//抛出异常策略,超过阻塞队列和maximumPoolSize,抛出异常 public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ //不做任何操作,直接抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
//直接抛弃策略,不抛出异常 public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ //方法内没有代码逻辑,表示不对任务作出处理,也不抛异常,直接抛弃 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
//抛出最旧任务,再次尝试提交任务到队列, public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //e.isShutdown():false表示线程池处在RUNNING状态 true表示线程池处在其他状态 //条件1:成立表示线程池处在RUNNING状态 //线程池只有在RUNNING状态下才可以继续处理新的任务 if (!e.isShutdown()) { //抛出最旧的任务 e.getQueue().poll(); //再次尝试将任务提交 e.execute(r); } } }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判断权限 checkShutdownAccess(); //设置当前线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲线程 //中断线程池中所有可以中断的线程(空闲线程,没有设置为中断状态的线程,为什么会会有中断状态,可能是阻塞被外界唤醒的中断) interruptIdleWorkers(); //回调方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; //修改workers,先上个全局锁 mainLock.lock(); try { //获取所有的worker for (Worker w : workers) { Thread t = w.thread; //条件成立:获取的worker线程没有被中断,尝试上锁改worker成功 if (!t.isInterrupted() && w.tryLock()) { try { //设置worker线程的中断标记位 t.interrupt(); } catch (SecurityException ignore) { } finally { //解锁worker w.unlock(); } } //条件成立;说明只中断一个,可以退出了 if (onlyOne) break; } } finally { //解开全局锁 mainLock.unlock(); } }
onShutdown是一个钩子方法,需要子类实现
void onShutdown() {
}
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //设置线程池状态为STOP advanceRunState(STOP); //中断所有的线程(已经启动的线程) interruptWorkers(); //删除阻塞队列中的所有没有被处理的任务,并且返回 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
//中断已经启动的线程
void interruptIfStarted() {
Thread t;
//线程是启动状态,并且没有设置过中断,那么就设置中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List<Runnable> drainQueue() { //获取阻塞队列引用 BlockingQueue<Runnable> q = workQueue; //存放所有没有被处理的任务 ArrayList<Runnable> taskList = new ArrayList<Runnable>(); //将所有任务转移到taskList q.drainTo(taskList); //条件成立:阻塞队列还有任务没有转移完 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { //删除阻塞队列中的任务 if (q.remove(r)) //添加任务到队列 taskList.add(r); } } return taskList; }
final void tryTerminate() { for (;;) { //获取线程池状态 int c = ctl.get(); //不允许执行tryTerminate的情况 //条件1:成立线程池处于RUNNING状态,还在处理任务,退出 //条件2:成立线程池处于TIDYING,TERMINATE状态,已经走过这些逻辑,直接退出 //条件3:成立线程池处于SHUTDOWN状态并且阻塞队列还有任务,任务没有处理完,退出 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //前提条件:线程池状态为STOP,SHUTDOWN并且任务队列为空 //条件成立:线程池线程数量大于0 if (workerCountOf(c) != 0) { // Eligible to terminate //中断一个空闲线程 interruptIdleWorkers(ONLY_ONE); return; } //上全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //设置线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //设置线程池状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //termination条件已经满足了,可以唤醒所有等待termination的线程 termination.signalAll(); } return; } } finally { //接触全局锁 mainLock.unlock(); } // else retry on failed CAS } }
//一池固定线程 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //一池一线程 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //可扩容的线程池 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //可调度线程池 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()//任务的排队队列,按到期时间升序排序的阻塞队列 ); }
IO密集型:线程数为CPU核心数 * 2
CPU密集型:线程数为CPU核心数
混合型:( IO耗时与CPU耗时之比+ 1)* CPU核心数
学习ThreadLocal之前,需要补充一下Java中的引用类型。
强引用是默认的引用类型,即声明一个对象直接赋值给变量,该变量在不赋值为null的情况下,无论在什么情况,甚至在OOM的情况下依然不回收。只有在引用为null的情况才会被回收。
public static void strongReferenceTest(){
//强引用
Test2 test2 = new Test2();
System.out.println(test2);
test2 = null;
System.gc();
System.out.println(test2);
}
软引用在系统内存空间充足的情况下不会被回收,只有在系统内存空间不足的时候才会被回收。通常使用在内存敏感的地方,如高速缓存。软引用使用java.lang.ref.SoftReference类来实现,可以豁免一些垃圾回收。
//-Xms9M -Xmx9m
public static void softReferenceTest(){
SoftReference<MyObject> softReference = new SoftReference<>(new MyObject());
System.gc();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---gc after 内存够用:" + softReference.get());
byte[] bytes = new byte[1024 * 1024 * 9];
System.out.println("----- gc after 内存不够:" + softReference.get());
}
弱引用生命周期更短,对于只有弱引用的对象,只要垃圾回收一启动就会回收该对象占用的内存。通过java.lang.ref.WeakReference类来实现。
弱引用和软引用可以使用在需要加载大量图片的情况下,此时可能会造成OOM,gc的时候就可以释放了,
Map<String,SoftReference<Bitmap>> imageCache = new HashMap<String,SoftReference<Bitmap>>;
public static void weakReferenceTest(){
WeakReference<MyObject> weakReference = new WeakReference<>(new MyObject());
System.out.println("---gc before内存够用:" + weakReference.get());
System.gc();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("----gc after内存够用:" + weakReference.get());
}
虚引用就是形同虚设,如果一个对象仅持有虚引用,那么就和没有任何引用一样,在任何时候都可能会被垃圾回收期回收,它不能订单使用,也不能通过它访问对象,虚引用必须和引用队列(ReferenceQueue)联合使用
public static void phantomReferenceTest(){ ReferenceQueue<MyObject> myObjectReferenceQueue = new ReferenceQueue<>(); PhantomReference<MyObject> phantomReference = new PhantomReference<>(new MyObject(), myObjectReferenceQueue); List<byte[]> bytes = new ArrayList<>(); new Thread(() -> { while(true){ list.add(new byte[1 * 1024 * 1024]); try { TimeUnit.MILLISECONDS.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(phantomReference.get()); } },"t1").start(); new Thread(() -> { while(true){ Reference<? extends MyObject> reference = myObjectReferenceQueue.poll(); if (reference != null){ System.out.println("有虚对象加入了队列"); } } },"t2").start(); }
CLH自旋锁是后面AQS抽象队列同步器的基础,需要掌握。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。