当前位置:   article > 正文

炼气期第一式:JUC进阶式学习_总线风暴

总线风暴

专栏预热

https://img-blog.csdnimg.cn/img_convert/6ffedc650149303a0391ea57d7fc8a4f.png#clientId=uf3fd7b9f-e9b9-4&crop=0&crop=0&crop=1&crop=1&errorMessage=unknown error&from=paste&height=740&id=u27f5955f&margin=[object Object]&name=image.png&originHeight=917&originWidth=1564&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44519&status=error&style=none&taskId=u764f5af7-2c0e-4b60-8fdb-7cdc04486e8&title=&width=1261.714245268421
首先简单看一下JUC架构图,我们将会从CAS,Volatile基础知识讲起,而AQS等知识都会使用到这两部分,AQS作为并发基石,起到承上启下的作用,又为上层提供了基础。因此AQS是本专栏的重点知识。
本专栏会尽可能按照合理的顺序去安排知识顺序,给大家良好的阅读体验,构建起对于JUC的知识体系。(本来一直在语雀更新的,但是因为语雀现在会员才可以分享,无奈转移到CSDN。本来打算是在22年底更新完毕一次性发出来,现在还有一部分内容,只是建立了标题,但是没有更新,后面会慢慢完善的)

第零章 并发基础

第1节 并发基石CAS

CAS存在的问题
  1. cas空自旋
  2. 总线风暴
什么是总线风暴
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
  • 1
  • 2
  • 3
  • 4
  • 5

这些compareAndSwap操作,在底层调用的是cmpxchg指令,如果程序在多核处理器上,则在cmpxchg指令前加上lock前缀(lock cmpxchg);在单核处理器上,则不需要加lock。

lock前缀指令的作用(与volatile相同):

  1. 将当前CPU缓存行的数据立刻写入系统内存
  2. 让其他CPU中缓存了该内存地址的数据无效
  3. 禁止指令重排序

在这里插入图片描述

如果Core1 和 Core2的高速缓存中有相同的数据,然后Core1在自己的高速缓存中修改了该数据,会通过总线让Core2高速缓存中对应的值失效,而Core2发现自己缓存中的数据失效了,会立刻从内存中读取最新的数据,保证缓存一致性。
CPU通过MESI协议保证变量的缓存一致性,为了保障缓存一致性,需要在总线上来回通信,如果该流量过大,总线将会成为瓶颈,这就是总线风暴

Java轻量级锁在争用激烈的情况下,会从轻量级锁升级为重量级锁,其一是为了减少CAS空自旋,其二是变量同一时间大量的CAS操作导致的总线风暴。

那么基于CAS实现的轻量级锁是如何避免总线风暴的呢?使用队列对抢锁线程进行排队,最大程度减少了CAS操作数量。(CLH自旋锁就是基于队列排队的自旋锁)

第2节 Volatile

有序性与内存屏障
JMM模型
Happens-Before

第3节 原子类

第4节 LongAdder

小结

1、线程阻塞和唤醒需要从用户态到内核态,而cas自旋不需要。但是呢cas自旋也存在问题,也就是cas空自旋和总线风暴问题,存在两种方式可以减少cas空自旋的问题,一个就是使用分散热点如LongAdder,一个是使用队列削峰如AQS,都是使用空间换时间的方式,后面会AQS章节会再继续展开

第一章 AQS

后文的总多知识使用到了AQS,如线程池中的Worker,独占锁ReentrantLock,共享锁CountDownLatch与Semaphore,因此先来学习这块知识,为第三章对于线程池的学习打好基础。
在这里插入图片描述

学习AQS之前,最好是先学习一下CLH自旋锁,因为AQS中的同步队列和CLH中的单向链表差不多,都是后继节点在前驱节点上自旋,等待释放锁。

第0节 悲观锁与乐观锁

悲观锁:如Java中的synchronize,可以确保哪个线程持有锁,就可以访问到临界资源
乐观锁:乐观锁是一种思想,有两部分组成:冲突检测和数据更新。具体的实现有cas,数据库带版本号更新,原子类

悲观锁存在的问题:

  1. 多线程池竞争下,加锁与释放锁会导致频繁的上下文切换和调度延时,引起性能问题
  2. 一个线程持有锁后,会导致其他线程被阻塞挂起
  3. 如果一个优先级高的线程等待一个优先级低的线程释放锁,就会导致线程的优先级导致,引发性能安全问题

乐观锁存在的问题:

  1. 在争用激烈的场景下,如果某个线程持有锁的时间过长,会导致其他的线程空自旋耗尽CPU资源
  2. 如果大量的线程空自旋,可能导致硬件层面的总线风暴

如何避免CAS总线风暴呢?使用队列对抢锁线程进行排队,最大程度减少CAS操作数量。

CLH自旋锁

CLH自旋锁就是一种基于队列排队的自旋锁(具体为单向链表)。

CLH自旋锁原理

CLH自旋锁原理:申请加锁的线程首先会在单向链表末尾CAS添加新的节点,之后在其前驱节点上自旋,检查前驱节点是否释放锁即可,如果发现前一个节点释放锁,则抢锁成功。这个过程只有入队的时候需要CAS,入队之后不需要CAS自旋,只需要普通自旋就可以了。在争用激烈的情况下,可以大大减少CAS操作的数量,避免总线风暴。

手写CLH自旋锁
/**
 * <p>
 * </p>
 *
 * @author LZH
 * @date 2022-10-03 21:16
 */
public class CLHLock implements Lock {
    static class Node{
        //true:表示当前线程正在抢占锁,或者已经抢占锁
        //false:当前线程已经释放锁,下一个线程可以占有锁了
        volatile boolean locked;

        Node prevNode;

        public Node(boolean locked, Node prevNode) {
            this.locked = locked;
            this.prevNode = prevNode;
        }


        public static final Node EMPTY = new Node(false,null);

        public boolean isLocked() {
            return locked;
        }

        public Node getPrevNode() {
            return prevNode;
        }

        public void setLocked(boolean locked) {
            this.locked = locked;
        }

        public void setPrevNode(Node prevNode) {
            this.prevNode = prevNode;
        }
    }

    /**
     * 当前节点的线程本地变量
     */
    private static ThreadLocal<Node> curNodeLocal = new ThreadLocal<>();

    /**
     * CLHLock队列末尾节点
     */
    private AtomicReference<Node> tail = new AtomicReference<>(null);

    public CLHLock() {
        //设置队尾节点
        tail.getAndSet(Node.EMPTY);
    }

    @Override
    public void lock() {
        Node curNode = new Node(true, null);
        Node preNode = tail.get();
        //cas自旋设置尾节点
        while(!tail.compareAndSet(preNode,curNode)){
            preNode = tail.get();
        }
        //设置前驱节点
        curNode.setPrevNode(preNode);

        //自旋:在前驱节点上普通自旋,监听前驱节点是否释放锁
        while(curNode.getPrevNode().isLocked()){
            //前驱节点还占用着锁,让CPU时间片给他干活
            Thread.yield();
        }

        //来到这里,说明前驱节点已经释放锁了
        //只有当前线程进入了临界区,其他线程都还在普通自旋等待前驱节点释放锁的过程中
        //将节点放到线程本地变量中,释放锁的时候会用到
        curNodeLocal.set(curNode);

    }

    @Override
    public void unlock() {
        Node curNode = curNodeLocal.get();
        curNode.setPrevNode(null);//Help GC
        curNode.setLocked(false);
        curNodeLocal.set(null);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

第1节 等待唤醒机制(可再完善)

  1. Java内置锁的wait和notify,由于是基于内置锁的,所以需要是同步对象的owner,才能够调用这两个方法
  2. Condition的await和signal,由于是基于显式锁Lock的,所以也需要先上锁,才可以在调用这两个方法,调用完后需要释放锁。并且Lock是公平的那么Condition就是公平的,如果Lock是非公平的那么Condition就是非公平的。最后Condition没有数量限制
  3. LockSupport,JUC提供的线程阻塞与唤醒的工具类。该根据可以让线程在任意位置阻塞和唤醒,所有方法都是静态的。
package java.util.concurrent.locks;
import sun.misc.Unsafe;

public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.

    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    //唤醒某个被阻塞的线程
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    //无限期阻塞当前线程,带block对象,用于给诊断工具确定线程阻塞的原因
    public static void park(Object blocker) {
        //获取当前线程
        Thread t = Thread.currentThread();
        //设置当前线程的blocker
        setBlocker(t, blocker);
        //无限期阻塞
        UNSAFE.park(false, 0L);
        //唤醒后,设置阻塞原因为null
        setBlocker(t, null);
    }

    //限时阻塞
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            //获取当前线程
            Thread t = Thread.currentThread();
            //设置当前线程的阻塞原因
            setBlocker(t, blocker);
            //限时阻塞
            UNSAFE.park(false, nanos);
            //唤醒后阻塞原因设置为空
            setBlocker(t, null);
        }
    }

    //阻塞当前线程到某个时间点
    public static void parkUntil(Object blocker, long deadline) {
        //获取当前线程
        Thread t = Thread.currentThread();
        //设置当前线程阻塞原因
        setBlocker(t, blocker);
        //设置阻塞到某个时间点
        UNSAFE.park(true, deadline);
        //唤醒后设置阻塞原因为空
        setBlocker(t, null);
    }

    //获取被阻塞线程的block对象,用于分析阻塞的原因
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    //无限期阻塞当前线程
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    //限时阻塞
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            //限时阻塞
            UNSAFE.park(false, nanos);
    }

    //阻塞当前线程到某一个时间
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    /**
     * Returns the pseudo-randomly initialized or updated secondary seed.
     * Copied from ThreadLocalRandom due to package access restrictions.
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120

第2节 锁与队列

使用基于CAS自旋实现的轻量级锁有两个大的问题:

  1. CAS恶性空自旋会浪费大量的CPU资源
  2. 在SMP架构的CPU上会导致总线风暴

解决CAS恶性空自旋的有效方式之一是空间换时间,常见的有分散热点和队列削峰。JUC中LondAdder使用了分散热点的方式解决该问题,但是更常见的是使用队列削峰的方式解决该问题,并且还提供了一个基于双向队列的削峰基类–抽象基础类(AbstractQueuedSynchronizer 抽象队列同步器,简称AQS)

CLH内部的队列

CLH自旋锁内部维护了一个FIFO的单向队列,队头的节点表示占有该锁的节点,新加入的节点需要等待,会插入到队列的尾部。

在这里插入图片描述

分布式锁的内部队列

Zookeeper的分布锁方案:
在这里插入图片描述

AQS的内部队列

JUC中有众多的类是基于AQS的,如ReentrantLock,CountDownLatch,Semaphore,ReentrantReadWriteLock,FutureTask等。AQS解决了实现同步容器的大量细节问题。

AQS是CLH队列的一个变种,内部是一个FIFO的双向链表,这种结构是每个节点都有前驱和后继节点,每一个节点都封装了线程,当线程竞争失败的时候会封装成节点进入AQS队列;当获取线程的线程释放锁后,会从队列中唤醒一个阻塞的节点(线程)。

在这里插入图片描述

第3节 核心成员

AQS是基于模板模式实现的,分离变与不变。AQS为锁获取,锁释放,入队,出队提供了一系列的模板方法,而具体的细节实现抽取出钩子方法,交给子类去实现。

Node内部类
static final class Node {
        //AQS为两种模式提供了不同的模板流程
        //枚举:共享模式
        static final Node SHARED = new Node();
    
        //枚举:独占模式
        static final Node EXCLUSIVE = null;

        //表示当前节点处于取消状态
        static final int CANCELLED =  1;
    
        //标示后继线程处在等待状态
        static final int SIGNAL    = -1;
    
        //标示当前线程正在进行条件等待
        static final int CONDITION = -2;

        //标示下一次共享锁的acquireShared操作需要无条件传播
        static final int PROPAGATE = -3;

        //node状态,可选值(0,SIGNAL[-1],CANCELLED[1],CONDITION,PROPAGATE)
        //普通的同步节点默认初始值为0,条件等待节点的初始值为-2
        //waitStatus > 0 取消状态
        //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点
        volatile int waitStatus;

        //因为node需要构建成 fifo队列,索引prev 指向 当前 前继节点
        //当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态
        volatile Node prev;

        //指向后继节点
        volatile Node next;

        //当前node封装的线程本身
        volatile Thread thread;

        //若当前节点不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上,
		//此属性指向下一个条件等待节点,即条件队列上的后继节点
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //返回当前节点的上一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
waitStatus

CANCELLED = 1:表示当前的线程节点已经释放(超时,中断),已取消的节点不会再阻塞,需要从同步队列中取消等待。该节点不会参与竞争,且会一直保持取消状态(那么抢锁的时候,自旋获取前驱节点状态的时候就要跳过这些CANCELLED的节点)

SIGNAL = -1:表示后继节点处于等待唤醒的状态啊,当前节点如果释放了锁,就需要通知后继节点,使后继节点的线程得以运行

CONDITION = -2:表示线程在条件队列中阻塞(Condition中使用了),表示节点在等待队列中。当持有锁的线程调用了CONDITION的signal()方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁。(同步队列指的是AQS中维护的FIFO队列,等待队列是每一个CONDITION关联的队列)

PROPAGATE = -3:表示下一个线程获取到

第4节 模板模式

第5节 AQS锁抢占

第6节 核心方法:入队出队

第7节 AQS锁释放

第7节 ReentrantLock抢锁流程

第8节 AQS条件队列

第二章 JUC容器(上)

阻塞队列

为了更好的学习线程池,需要提前学习一下阻塞队列

第三章 线程与线程池

关于线程的基础知识,本篇就不在赘述了,直接从使用Callable和FutureTask创建线程讲起,我们使用继承Thread或者是实现Runnable的方式去创建线程都没办法获得异步任务执行的结果,因此可以使用Callable和FutureTask来解决这个痛点。

线程状态

public enum State {
        //new Thread(...)已经创建线程,但是还没有start
        NEW,

        //Java把Ready和Running合并成一种状态:Runnable(可执行状态)
        //调用了start方法后,线程就处于就绪状态,得到CPU时间片执行run方法后,就进入了执行状态
        RUNNABLE,

        //处于blocke阻塞状态的线程不会占用CPU资源(等待获取锁或者IO阻塞)
        BLOCKED,

        //waiting无限期等待,不会被分配CPU时间片,需要其他线程显示唤醒,才可以进入就绪状态
        WAITING,

        //限时等待,指定时间内线程没被唤醒,就会自动唤醒,进入就绪状态
        TIMED_WAITING,


        //线程任务执行结束或者发送异常而没有被处理,线程进入死亡状态
        TERMINATED;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

FutureTask创建线程

)在这里插入图片描述

Callable接口相比Runnable接口,有返回值,但是Thread没办法直接使用Callable去创建异步任务

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

RunnableFuture,即是任务,也可以获取异步任务结果。适配器模式。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    //异步任务,留给子类重新
    void run();
}
  • 1
  • 2
  • 3
  • 4

Future可以获取异步任务的结果或者状态

public interface Future<V> {
    //取消异步任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    //判断任务是否去下
    boolean isCancelled();

    //判断任务是否完成
    boolean isDone();

    //阻塞获取异步任务执行结果
    V get() throws InterruptedException, ExecutionException;

    //限定时间内获取异步任务执行结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

RunnableFuture只是接口,具体的实现由FutureTask来实现,FutureTask是一个异步任务,可以获取执行结果或状态

public class FutureTask<V> implements RunnableFuture<V> {
    /**
     * 可能的转变状态
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    //表示当前task状态
    private volatile int state;
    //当前任务尚未执行
    private static final int NEW          = 0;
    //当前任务正在结束,尚未完全结束
    private static final int COMPLETING   = 1;
    //当前任务正常结束
    private static final int NORMAL       = 2;
    //当前任务执行过程中发生了异常,内部封装的Callable.run()向上抛出了异常
    private static final int EXCEPTIONAL  = 3;
    //当前任务被取消
    private static final int CANCELLED    = 4;
    //当前任务中断中
    private static final int INTERRUPTING = 5;
    //当前任务已经中断
    private static final int INTERRUPTED  = 6;

    //装饰器模式,submit(Runnable/Callable)
    //这个属性就是FutureTask能够获取异步任务执行结果的关键,原本没办法直接传送Callable
    //给Thread,现在经过FutureTask的包装,FutureTask又实现了RunnableFuture接口,
    //因此便可以传送给Thread了
    private Callable<V> callable;

    //正常情况:任务执行结束,outcome返回保存的结果
    //非正常情况:callable向上抛异常,outcome返回异常
    private Object outcome; 
    
    //当前任务执行期间,保存当前任务执行的线程对象引用
    private volatile Thread runner;

    //get可能有多个线程当前任务的结果,所以这里使用一种头插头取的队列
    private volatile WaitNode waiters;

    /**
     * 返回结果给调用线程
     * @param s completed state value ,get得到的状态
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        //返回结果给调用线程
        Object x = outcome;
        //任务正常结束
        if (s == NORMAL)
            return (V)x;
        //任务取消
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        //callable就是程序员自己业务对象
        this.callable = callable;
        //状态为NEW
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Runnable runnable, V result) {
        //装饰器模式,将Runnable转为Callable
        //call()的结果可能是null或者插进来的result
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
核心方法run

FutureTask#run()

    //线程池submit(runnable/callable) -》
    //任务执行的入口
    public void run() {
        //条件1:state != NEW成立,当前task已经被执行过,或者被取消掉
        //条件2:CAS给runner设置为当下线程,只有一个线程能够cas runner成功
        //两个条件是为了保证当前的任务只能有一个线程在执行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        //执行到这里,任务一定是NEW状态,当前线程抢占任务成功
        try {
            //Callable 就是程序员自定义封装逻辑对象或者被装饰的对象。
            //可以自定义传入一个Callable,里面的call方法调用了runnable的run方法,可以对该方法进行扩展
            Callable<V> c = callable;
            //条件1:防止传的是一个null对象
            //条件2:state == NEW 防止外部线程取消任务
            if (c != null && state == NEW) {
                //结果引用
                V result;
                //true 执行成功 没有异常
                //false 执行失败 存在异常
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //任务执行成功,设置任务结束标志 并且 唤醒所有的订单线程
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            //任务执行结束,释放runner线程
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            //获取任务结束时的状态
            int s = state;
            //任务执行结束线程中断状态,处理中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

setException和set保存异步任务执行的结果

    protected void setException(Throwable t) {
        //设置状态为异常
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            //设置状态为异常
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //唤醒所有阻塞线程
            finishCompletion();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
    protected void set(V v) {
        //CAS设置为完成中
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //修改任务为已经完成
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //清空WaitNode链,把所有的等待线程唤醒
            finishCompletion();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

按照继承Thread或者是实现Runnable接口的方式,在start后直接去调用其run方法,该方法就是异步任务的逻辑。FutureTask也实现了Runnable接口,也可以作为Runnable实例作为异步任务,但是它强就强在了并没有在run方法内直接编写异步任务的逻辑,而是将异步任务的逻辑交给了Callable接口去实现,并且Callable返回值可以保存在outcome属性中,最终可以使用FutureTask#get()方法去获取任务执行的结果。

代码Demo
public class FutureTaskDemo {
    public static final int COMPUTE_TIMES = 1000000000;


    static class CallableTask implements Callable<Long>{
        //异步任务
        @Override
        public Long call() throws Exception {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "线程开始运行");
            for (int i = 0; i < COMPUTE_TIMES; i++){
                int j = i * 100;
            }
            long end = System.currentTimeMillis();
            long used = end - start;
            System.out.println(Thread.currentThread().getName() + "线程运行结束");
            return used;
        }
    }

    public static void main(String[] args) {
        //实现一个FutureTask实例,并且传递Callable给其构造函数作为异步任务
        CallableTask callableTask = new CallableTask();
        FutureTask<Long> futureTask = new FutureTask<Long>(callableTask);
        Thread thread = new Thread(futureTask,"returnThread");
        thread.start();
        System.out.println(Thread.currentThread().getName() + "做点自己的事情");
        for (int i = 0; i < COMPUTE_TIMES; i++){
            int j = i * 100;
        }
        System.out.println(Thread.currentThread().getName() + "获取异步任务执行结果");
        try {
            System.out.println("异步任务执行耗时:" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

通过FutureTask获取异步任务的时候,有两种情况:

  1. 任务执行完了,outcome不为空,直接拿到结果
  2. 任务没有执行完,outcome为空,main线程阻塞订单异步任务完成,异步任务完成唤醒main线程获取结果

线程池创建线程

提醒:学习线程池之前首先需要有FutureTask和BlockingQueue的基础,没有学过的同学可以回过头看一下我的专栏

线程池架构图

在这里插入图片描述

线程池内部架构

在这里插入图片描述

任务执行流程

在这里插入图片描述

线程池状态
    //丛上往下,值越来越小
    //-1补码表示:111 1*29位;-1 左移 29位 是[111] 0*29位;转为整数是一个负数。
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0补码表示是全0;0 左移 29位还是0。[000]
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //1补码表示高位全是0,最低位是1;1 左移 29位 是[001] 0*29
    private static final int STOP       =  1 << COUNT_BITS;
    //2补码表示高位全是0,最低位是10;2 左移 29位 是 [010] 0*29
    private static final int TIDYING    =  2 << COUNT_BITS;
    //3补码表示高位全是0,最低位是11;3 左移 29位 是 [011] 0*29
    private static final int TERMINATED =  3 << COUNT_BITS;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务
SHUTDOWN:线程池执行shutdown方法,这种状态下不再接受新的任务,但是会等待阻塞队列中的任务执行完毕
STOP:线程池执行shutdownNow方法,这种状态下不再接受新的任务,并且会中断现有任务,清空任务队列
TIDYING:这种状态下线程池所有任务已经终止或者处理完成,将会调用terminated钩子方法
TERMINATED:调用完钩子terminated之后的状态

基本属性
    //高三位表示线程池运行状态
    //除去高三位的剩余位数表示线程池中拥有的线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    //计算ctl低位用来存放线程池线程数量
    //COUNT_BITS = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    //低COUNT_BITS位所能表示的最大数值。即 000 1*COUNT_BITS个1
    //1 << COUNT_BITS == 1 << 29 = 001 00000000000000000000000000000
    //(1 << 29) - 1 == 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //从上往下一次变大,RUNNING负数最小

    //-1补码表示:111 1*29位;-1 左移 29位 是[111] 0*29位;转为整数是一个负数。
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0补码表示是全0;0 左移 29位还是0。[000]
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //1补码表示高位全是0,最低位是1;1 左移 29位 是[001] 0*29
    private static final int STOP       =  1 << COUNT_BITS;
    //2补码表示高位全是0,最低位是10;2 左移 29位 是 [010] 0*29
    private static final int TIDYING    =  2 << COUNT_BITS;
    //3补码表示高位全是0,最低位是11;3 左移 29位 是 [011] 0*29
    private static final int TERMINATED =  3 << COUNT_BITS;

    //作用:通过ctl获取当前线程池运行状态
    private static int runStateOf(int c)     {
        //CAPACITY:000 1*COUNT_BITS个1,即000 11111111111111111111111111111
        //~CAPACITY:111 00000000000000000000000000000

        //c = ctl
        //假设拿到ctl为:111 000000000000000111
        //那么 111 000000000000000111 & 111 000000000000000000 = 111 000000000000000000,即RUNNING状态
        return c & ~CAPACITY;
    }

    //作用:通过ctl获取线程池线程数量
    //假设拿到ctl为:111 000000000000000111,
    //假设CAPACITY为:000 111111111111111
    //那么 111 000000000000000111 & 000 111111111111111 = 000 000000000000000111,即线程池有7个线程
    private static int workerCountOf(int c)  {
        return c & CAPACITY;
    }

    //作用:根据线程池的状态和线程池中的线程数量来计算ctl
    //rs:线程池状态
    //wc:线程池中的worker(线程)数量
    //假设rs:111 000000000000000000,wc:000 000000000000000111
    //那么:111 000000000000000000 | 000 000000000000000111 = 111 000000000000000111,得到新的ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //比较当前线程池ctl值所表示的线程池状态是否小于某个状态s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    //当前线程池的状态是否大于等于某个状态s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //小于shutdown的一定是running
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    //作用:修改ctl,标示线程数量加1
    //使用cas操作使ctl值加1,成功返回true,失败返回false。加1代表的是线程数量加1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    //作用:修改ctl,标示ctl线程数量减1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    //将ctl值减1,这个方法一定成功。失败就cas自旋重试,直到cas操作成功
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    
    //任务队列。
    //作用:存放任务的地方。当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue,就是阻塞队列中。
    private final BlockingQueue<Runnable> workQueue;

    //线程池全局锁。
    //作用:增加work,减少work 时,需要持有mainLock,修改线程池运行状态时,也需要持有mainLock
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程池中存放work(thread)线程的地方。
    private final HashSet<Worker> workers = new HashSet<Worker>();

    //当外部线程调用awaitTermination()方法时,外部线程会等待当前线程状态为termination为止
    //等待的实现:将外部线程封装成waitNode放到了Condition队列中了,waitNode.Thread就是外部线程,会被park掉,处waiting状态
    //当线程池 状态 变为 Termination 时,会去唤醒这些线程。通过termination.signalAll()去唤醒,唤醒之后,这些线程会进入阻塞队列,头结点会去抢占mainLockc
    //抢占到的线程会去继续执行awaitTermination()后面程序。这些线程都会正常执行。
    //简单理解:termination.await()会将线程阻塞在这里
    //termination.signalAll()会将阻塞在这的线程依次唤醒
    private final Condition termination = mainLock.newCondition();


    //作用:创建线程时会使用到线程工厂。
    //使用Executors.xxx(),使用的是DefaultThreadFactory,不建议使用,建议自己实现ThreadFactory指定线程池名字
    private volatile ThreadFactory threadFactory;

    //拒绝策略,juc提供了4种。一般自己业务实现
    private volatile RejectedExecutionHandler handler;

    //核心线程数量
    private volatile int corePoolSize;

    //线程池最大线程数量限制
    private volatile int maximumPoolSize;

    //记录线程池生命周期内线程池线程数最大数量
    private int largestPoolSize;

    //记录线程池所完成的任务总数 -> 当worker退出时,会将worker完成的任务累计到completedTaskCount
    private long completedTaskCount;


    //控制核心线程数量内的线程是否可用被回收,true可以,false不可以
    //false:核心线程即使是空闲,也能够存活(默认)
    //true:超时回收
    private volatile boolean allowCoreThreadTimeOut;

    //空闲线程存活时间。当allowCoreThreadTimeOut == false时,会维护线程数量内的线程存活,超出部分会被超时。
    //allowCoreThreadTimeOut == true的话,核心数量内的线程空闲也会被回收
    private volatile long keepAliveTime;



    //默认的拒绝策略AbortPolicy(抛异常)
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    private final AccessControlContext acc;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
Worker内部类

Worker继承了AQS,所以需要实现其钩子方法tryAcquire和tryRelease,提供给模板方法使用。模板方法指的是AQS#release和AQS#acquire两个方法。
如果对于AQS不熟悉,最好是先去学习AQS的相关知识。

/**
     * Worker 基础了AQS
     * 采用了AQS的独占模式
     * AQS的两个重要属性:state 和 ExclusiveOwnerThread
     * state:0 表示未被占用(只有等于0才可以尝试抢锁),大于0时表示被占用,小于0时表示初始状态(这种情况不能被抢锁)
     * ExclusiveOwnerThread:表示独占锁的线程。
     */    
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //作用:Worker内部封装的工作线程
        final Thread thread;

        /** Initial task to run.  Possibly null. */
        //作用:假设firstTask不为空,那么当worker启动后(worker内的Thread启动后)会优先执行firstTask,当执行完firstTask后,会到queue中获取下一个任务
        Runnable firstTask;

        /** Per-thread task counter */
        //记录当前worker完成的任务数量
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        //first可以为空,为空启动后会到queue中获取。
        Worker(Runnable firstTask) {
            //设置AQS独占模式为初始化状态,这个时候不能被抢占锁
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        //当worker启动时会执行run方法
        public void run()
        {
            //runWorker:ThreadPoolExecutor的核心方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        //判断当前worker的独占锁是否被独占,0表示未被占用,1表示已占用
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }


        //重写钩子函数
        //尝试去占用worker的独占锁
        protected boolean tryAcquire(int unused) {
            //使用CAS修改AQS中的state,期望值为0,0表示未被占用。
            //修改成功表示当前线程抢占成功,那么则设置setExclusiveOwnerThread为当前线程
            //返回值表示是否抢占成功
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //重写钩子函数
        //尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease()
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //加锁,加锁失败时会阻塞线程,知道获取到锁为止
        public void lock()        { acquire(1); }

        //尝试加锁,如果当前锁未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false
        public boolean tryLock()  { return tryAcquire(1); }

        //一般情况下,调用unlock要保证当前线程是持有锁的
        //特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0
        //启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0
        public void unlock()      { release(1); }

        //就是返回worker的lock是否被占用
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            //线程是启动状态,并且没有设置过中断,那么就设置中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
lock方法
//加锁,加锁失败时会阻塞线程,知道获取到锁为止
public void lock()        { acquire(1); }

//一般情况下,调用unlock要保证当前线程是持有锁的
//特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0
//启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0
public void unlock()      { release(1); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

lock调用了AQS中的模板方法,我们来看看AQS中的这个模板方法。

//独占模式 获取资源模板方法:调用tryAcquire独占模式获取资源钩子方法
    public final void acquire(int arg) {
        //条件1:!tryAcquire 尝试获取锁,没有获取成功返回true

        //acquireQueued需要做什么?
        //1、当前节点有没有被park?挂起?没有 =》所以需要挂起的操作
        //2、唤醒之后的逻辑在哪呢? =》 唤醒之后的逻辑。

        //条件2.1:首先 addWaiter 将当前线程封装成node入队(两种情况,快速入队,完整入队)
        //条件2.2:acquireQueued挂起当前线程,唤醒后相关的逻辑。
        //acquireQueued返回true,表示挂起过程中线程被中断唤醒过...false表示未被中断过


        //为了不浪费资源,acquireQueue过程中,tryAcquire成功,返回,没有tryAcquire成功,会阻塞该线程,等待前驱节点唤醒后才启动循环
        //如果头结点如果退出了acquireQueue自旋,获取了锁,就会转去执行临界区代码,其他线程继续自旋
        if (!tryAcquire(arg) //子类钩子方法的实现
            &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //再次设置中断标记为true
            selfInterrupt();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
        //重写钩子函数
        //尝试去占用worker的独占锁
        protected boolean tryAcquire(int unused) {
            //使用CAS修改AQS中的state,期望值为0,0表示未被占用。
            //修改成功表示当前线程抢占成功,那么则设置setExclusiveOwnerThread为当前线程
            //返回值表示是否抢占成功
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如果tryAcquire失败,那么久将当前抢锁线程封装为Node节点,放到AQS同步队列末尾。

    //节点指定模式入队
    //最终返回值是当前线程包装的节点
    private Node addWaiter(Node mode) {
        //Node.EXCLUSIVE
        //构建mode,把当前线程封装到对象node中
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速入队
        Node pred = tail;
        //pred != null:条件成立,当前节点不是第一个节点
        if (pred != null) {
            node.prev = pred;//这里需要注意,可能这步执行完后,可能unparkSuccessor执行了,此时通过next拿不到节点,只能从后往前用prev
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //完整入队
        //前提:1、当前节点是第一个节点 2、cas操作修改尾节点失败,需要进入自旋设置
        enq(node);
        return node;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
//节点入队
    private Node enq(final Node node) {
        //只有入队成功才会跳出循环
        for (;;) {
            Node t = tail;
            //条件成立:当前节点是第一个节点
            //说明当前 锁被占用,且目前线程 可能是第一个获取锁失败的线程(当时可能存在一批获取锁失败的线程)
            if (t == null) { // Must initialize
                //作为当前持锁线程的第一个后继线程,需要做什么?
                //1、因为当前持锁的线程,它获取锁时,直接tryAcquire成功了,没有向 阻塞队列 中添加任何node,所以作为后继需要为它擦屁股
                //2、为自己追加自己的node

                //设置一个无参的new Node()作为头结点
                if (compareAndSetHead(new Node()))
                    //头尾节点设置成相同的
                    tail = head;
                //特别注意:这里没有reture,继续循环设置自己的node
            } else {
                //普通入队方式
                //当前节点不是第一个,防止cas设置尾节点失败,自旋
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    //返回当前节点的前置节点
                    return t; 
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
unlock方法
//一般情况下,调用unlock要保证当前线程是持有锁的
//特殊情况:worker的state为-1时,调用unlock表示初始化state,设置state = 0
//启动worker之前,先去调用unlock方法。会强制刷新exclusiveOwnerThread = null,state = 0
public void unlock()      { release(1); }
  • 1
  • 2
  • 3
  • 4

unlock调用了AQS中的模板方法release,release调用了Worker的子类实现的tryRelease钩子方法

//独占模式 释放资源模板方法:调用tryRelease独占模式释放资源钩子方法
    public final boolean release(int arg) {
        //尝试释放锁,tryRelease(会设置state[和ownerThread],不会设置head)
        //返回 true 表示当前线程已经完全释放锁
        //返回false,说明当前线程尚未完全释放锁

        //独占模式 钩子函数 释放资源,成功返回true,失败返回false
        if (tryRelease(arg)) {
            //不存在重入,完全释放锁了,可以唤醒下一个线程了

            //head什么情况会被创建出来?
            //当持锁线程未释放线程时,且持锁期间 有其他线程想要获取锁时,
            //其他线程发现获取不了锁,而且队列是空队列,此时后续线程会为当前持锁线程构建一个head节点,
            //然后后续线程节点会追加到head节点后面
            Node h = head;



            //node状态,可选值(0,SIGNAL[-1],CANCELLED[1],CONDITION,PROPAGATE)
            //waitStatus == 0 默认状态
            //waitStatus > 0 取消状态
            //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点

            //条件1成立,说明队列中的head节点以及初始化过了,ReentrantLock在使用期间发生过多线程竞争了
            //条件2成立,说明head后面一定插入过node节点【队列中不只有一个节点】
            if (h != null && h.waitStatus != 0)
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

        //重写钩子函数
        //尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease()
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */

    //waitStatus == 0 默认状态
    //waitStatus > 0 取消状态
    //waitStatus == -1 表示当前node如果是head节点时,释放锁之后,需要唤醒它的后继节点

    //作用:唤醒当前节点的下一个节点
    //release()进来,则参数Node为head节点,且node.waitStatus != 0
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获取当前节点的状态
        int ws = node.waitStatus;
        //若头结点状态小于0,设置为0,表示初始状态,移除head节点交给唤醒的节点在acquireQueue中tryAcquire成功移除掉
        if (ws < 0)//-1 SIGNAL状态 改成零的原因:因为当前节点以及完成后继节点的任务
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //node节点的第一个后继节点
        Node s = node.next;

        //条件1:
        //s什么时候等于null?
        //1、当前节点就是tail节点时,s == null
        //2、addWriter操作中,还没有设置好pred.next的指向的时候,就会产生s == null的情况
        //需要找到可以被唤醒的节点

        //条件2:
        //成立:说明 当前node节点的后继节点是取消状态

        //s为null 或者 s是CANCELLED状态(s 为 null 可能是在shouldParkFaildAcquire中是cancelled状态被移除了)
        if (s == null || s.waitStatus > 0) {
            //查找可以被唤醒的节点
            s = null;
            //从尾向头遍历,找到一个离当前node最近的可以被唤醒的node。node可能找不到
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //如果找到合适的可以被唤醒的node,则唤醒,找不到啥也不做
        if (s != null)
            //唤醒这个节点的线程,唤醒的节点会重新设置head
            LockSupport.unpark(s.thread);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

为什么要从后往前找呢?
因为在lock过程中,addWaiter和enq,会先设置新节点的prev,但是next是后面才设置的,可能会导致设置完prev后,调用了unlock -> release -> tryRelease,unparkSuccessor。
此时unparkSuccessor拿不到释放资源的节点的后继节点,所以需要通过从后往前去找到该节点,并且unpark其线程

    //节点指定模式入队
    //最终返回值是当前线程包装的节点
    private Node addWaiter(Node mode) {
        //Node.EXCLUSIVE
        //构建mode,把当前线程封装到对象node中
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速入队
        Node pred = tail;
        //pred != null:条件成立,当前节点不是第一个节点
        if (pred != null) {
            node.prev = pred;//这里需要注意,可能这步执行完后,可能unparkSuccessor执行了,此时通过next拿不到节点,只能从后往前用prev
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //完整入队
        //前提:1、当前节点是第一个节点 2、cas操作修改尾节点失败,需要进入自旋设置
        enq(node);
        return node;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

unpark成功后,那么该线程将从acquireQueue中重新开始自旋

   //独占模式 获取资源模板方法:调用tryAcquire独占模式获取资源钩子方法
    public final void acquire(int arg) {
        //条件1:!tryAcquire 尝试获取锁,没有获取成功返回true

        //acquireQueued需要做什么?
        //1、当前节点有没有被park?挂起?没有 =》所以需要挂起的操作
        //2、唤醒之后的逻辑在哪呢? =》 唤醒之后的逻辑。

        //条件2.1:首先 addWaiter 将当前线程封装成node入队(两种情况,快速入队,完整入队)
        //条件2.2:acquireQueued挂起当前线程,唤醒后相关的逻辑。
        //acquireQueued返回true,表示挂起过程中线程被中断唤醒过...false表示未被中断过


        //为了不浪费资源,acquireQueue过程中,tryAcquire成功,返回,没有tryAcquire成功,会阻塞该线程,等待前驱节点唤醒后才启动循环
        //如果头结点如果退出了acquireQueue自旋,获取了锁,就会转去执行临界区代码,其他线程继续自旋
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //再次设置中断标记为true
            selfInterrupt();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
//自旋抢占,节点入队后,启动自旋抢占锁的流程
    //当前节点会在死循环中不断获取同步状态,并且不断在前驱节点上自旋,前驱节点是头结点的时候才可以获取锁。
    //原因:
    //1、头结点是成功获取同步状态(锁)的节点,头结点在释放同步状态后,会唤醒后继节点,后继节点唤醒后检查自己的前驱节点是否是头结点
    //2、维护同步队列的FIFO原则,节点进入同步队列后,就进行了自旋的过程,每个节点都在不断的for死循环
    final boolean acquireQueued(final Node node, int arg) {
        //true:表示当前线程抢占锁成功。普通情况下【lock】当前线程早晚会拿到锁
        //false:表示失败,需要执行出队逻辑..(响应中断的lock的方法时再讲)
        boolean failed = true;
        try {
            //当前线程是否被中断
            boolean interrupted = false;
            //自旋:当前节点会在前驱节点上不断自旋,直到tryAcquire成功,头结点出队,自己成为头结点
            for (;;) {
                //什么时候会执行这里?
                //1、进入for循环,在线程尚未park前会执行
                //2、线程park之后,被唤醒后,也会执行这里

                //获取当前节点的前置节点
                final Node p = node.predecessor();

                //条件1:前置节点是否是head节点,head.next节点在任何时候都有权利去争夺锁
                //条件2:tryAcquire(arg)
                //条件2成立:说明head对应的线程已经释放锁了,head.next节点对应的线程,正好获取到锁了
                //条件2不成立:说明head对应的线程,还没有释放锁,head.next任然需要被park

                //前驱节点是head,继续调用子类实现的独占模式获取资源钩子方法tryAcquire
                if (p == head && tryAcquire(arg)) {
                    //拿到锁之后需要做什么
                    //1、设置自己为head节点
                    setHead(node);
                    //将上个线程对应的node的引用置为null,协助老的head出队..
                    p.next = null; // help GC
                    //当前线程获取锁过程没有异常
                    failed = false;
                    //返回当前线程的中断标记
                    return interrupted;
                }


                //条件1:shouldParkAfterFailedAcquire:当前线程获取锁资源失败后,是否需要挂起。
                //true:当前线程需要挂起     false:当前线程不需要挂起
                //条件2:parkAndCheckInterrupt:挂起线程,返回中断标记(唤醒:1.正常唤醒 其他线程unpark 2.其他线程给当前挂起的线程一个中断信号)

                //判断当前线程是否需要挂起,如果需要挂起的调用parkAndCheckInterrupt,直到被唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //interrupted = true,表示当前node对应的线程是被中断信号唤醒的
                    interrupted = true;
            }
        } finally {
            //如果没有成功获取资源(timeout 或者 可中断情况下被中断了)
            if (failed)
                //取消请求,将当前节点从队列移除
                cancelAcquire(node);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
设置state的方法
/*
     * Methods for setting control state
     */

    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    //小于targetState状态,则变为该状态,大于等于该状态,则不变
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            //条件成立,假设targetState == SHUTDOWN,说明当前线程状态 >= SHUTDOWN
            //条件不成立,假设targetState == SHUTDOWN,说明当前线程状态 == RUNNING
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
提交任务

在这里插入图片描述

线程池任务提交脑子里需要有这张图片
1、新任务提交的时候,如果核心线程数还未达到,直接开线程处理
2、如果核心线程数已经达到了,如果阻塞队列未满,放入阻塞队列,等待处理
3、如果阻塞队列也满了,最大线程数量未满,开一个线程来处理
4、如果最大线程数也满了,执行拒绝策略

execute方法

    //任务队列。
    //作用:存放任务的地方。当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue,就是阻塞队列中。
    private final BlockingQueue<Runnable> workQueue;

	//Runnable可以是普通你的Runnable实现类,也可以是FutureTask
    public void execute(Runnable command) {
        //非空判断
        if (command == null)
            throw new NullPointerException();


        //获取ctl最新值。ctl分为两部分,高3位(线程池状态)和其他位(当前线程池线程数量)
        int c = ctl.get();

        //workerCountOf:取出当前线程数量
        //条件成立:当前线程数量小于核心线程数,此次提交任务直接创建一个新的worker,对应线程池中多了一个新的线程
        if (workerCountOf(c) < corePoolSize) {
            //addWorker即为创建线程的过程,会创建worker对象,并将command作为firstTask
            //core == true表示采用核心线程数数量限制
            //core == false表示采用maxmumPoolSize线程数限制
            if (addWorker(command, true))
                //创建成功后直接返回,addWorker中会启动新创建的worker,将firstTask执行
                return;

            //执行到这里,说明addWorker失败
            //失败原因:
            // ①、存在并发现象:execute方法可能是有多个线程调用的,addWorker方法成立后,其他线程可能也成立了,并且向线程池中创建了worker,
            // 这个时候线程池中的核心线程数已经达到,所以失败了
            //②、当前线程池状态发生改变了。RUNNING SHUTDOWN STOP TIDYING TERMINATION
            //当线程池状态非RUNNING状态时,addWorker(firstTask != null, true | false)一定会失败
            //当线程池处于SHUTDOWN状态下也有可能提交成功,但是前提firstTask == null 而且当前 queue 不为空(特殊情况)
            //线程池状态只有在RUNNING的时候,addWorker才有可能成功
            c = ctl.get();
        }

        //执行到这里有几种情况
        //1、当前线程数已经达到了corePoolSize
        //2、addWorker失败
        if (isRunning(c) //条件成立:说明当前线程池处于RUNNING状态,则
                &&
                workQueue.offer(command)//尝试将task放到workQueue中
        ) {
            //执行到这里,说明offer提交任务成功

            //再次获取ctl保存到recheck中
            int recheck = ctl.get();


            if (! isRunning(recheck)//成立说明提交到队列之后线程池状态被外部线程修改了,比如shutdown()或者shutdownNow()。这种情况需要把刚刚提交的任务删除掉
                    &&
                    remove(command)//有可能成功,有可能失败。成功说明提交后线程池中的线程还未处理,失败,在shutdown()或者shutdownNow()之前就被线程池中的线程给处理了。
            )
                //提交之后,线程池状态为非RUNNING,且任务出队成功,走拒绝策略
                reject(command);

            //有几种情况来这?
            //1、当前线程池是RUNNING(这个大概率)
            //2、线程池状态时非RUNNING,但是remove提交的任务失败(这个小概率)

            //担心 当前线程池是RUNNING状态,但是线程池中的存活线程数量是0
            //这是一个担保机制,保证线程池在RUNNING下有一个线程在工作
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行到这里,有几种情况
        //1、线程不是处于RUNNING
        //2、offer失败
        //①、offer失败,需要做什么?说明当前queue满了!这个时候如果当前线程数量尚未达到maxmumPoolSize的话,尝试创建新线程,直接执行task的command
        //假设当前线程池的线程数达到maxmumPoolSize,这里也会失败,走拒绝策略
        //
        //②、线程池状态为非RUNNING状态,这个时候因为command != null addWorker 一定是返回false
        else if (!addWorker(command, false)/**/)
            reject(command);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
addWorker方法

在这里插入图片描述

	//线程池中存放work(thread)线程的地方。
    private final HashSet<Worker> workers = new HashSet<Worker>();


	//返回值总结:
    //true:表示创建worker成功,且启动成功
    //false:表示创建失败
    //1、线程池状态rs > SHUTDOWN(STOP/TIDYING/TERMINATION)
    //2、线程池状态rs = SHUTDOWN,但是队列中已经没有任务了,或者当前状态是SHUTDOWN且队列未空,但是firstTask不为null
    //3、当前线程池已经达到指定指标(corePoolSize或者maxmumPoolSize),
    //4、threadFactory创建的线程是null

    //firstTask:可以为null,表示启动worker后,worker自动到queue中获取任务,如果不是null,则worker优先执行firstTask
    //core:表示采用的线程数线程限制,如果为true 采用 核心线程数限制,false采用maxmumPoolSize线程数限制
    private boolean addWorker(Runnable firstTask, boolean core) {
        //自旋
        retry:
        //外层自旋作用:负责判断当前线程池状态是否允许创建线程,不允许则直接退出addWorker方法,可以则进入内部自旋(内部自旋的作用为了让ctl加1)。
        for (;;) {
            //获取当前ctl
            int c = ctl.get();
            //获取线程池的状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.

            //条件1:rs >= SHUTDOWN,条件成立,说明当前线程池状态不是RUNNING

            //条件2:! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
            //条件2前置条件:当前的线程池状态不是RUNNING
            //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
            //表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的 & 当前任务队列不是空
            //排除掉上面的情况,当前线程池是SHUTDOWN状态但是队列里面还有任务尚未处理完,这个时候允许添加worker,但是不允许再次提交task
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                //当线程池状态rs > SHUTDOWN
                //rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask == null
                return false;


            //内部自旋作用:获取创建线程池的令牌,创建成功则ctl+1
            for (;;) {
                //获取当前线程池中,线程数量
                int wc = workerCountOf(c);

                //条件1:wc >= CAPACITY ,永远不可能处理,CAPACITY5亿多的
                //条件2:wc >= (core ? corePoolSize : maximumPoolSize)
                //core == true,判断当前线程数是否 大于 核心线程数
                //core == false,判断当前线程数是否 大于 maxmumPoolSize线程数限制
                if (wc >= CAPACITY ||
                    //core:表示采用的线程数线程限制,如果为true 采用 核心线程数限制,false采用maxmumPoolSize线程数限制
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //别放了,线程池放不下你了
                    return false;
                //前提条件:当前线程数量没有超过线程池线程数限制
                //成功:ctl加1(线程数+1)
                //失败;可能存在其他线程修改过了ctl
                //可能发生什么事
                //1.其他线程execute()申请过令牌了,在这之前,导致cas失败
                //2.其他外部线程可能调用过shutdown()或者shutdownNow()导致线程池发生变化了,状态发送变化则ctl发生变化,期望值不正确,cas必然失败
                if (compareAndIncrementWorkerCount(c))
                    //线程数加1,cas成功,退出外层自旋(退出自旋的唯一方式)
                    break retry;

                //前提条件:线程数加1失败
                //原因:可能存在其他线程修改了ctl
                //重新获取ctl
                c = ctl.get();  // Re-read ctl

                //判断线程池ctl是否被其他线程修改
                //成立:被修改了状态,进入外层循环判断状态
                //不成立:cas修改ctl发生看竞争,再次自旋内层循环,重新尝试cas
                if (runStateOf(c) != rs)
                    //被其他线程修改,退出本次外层自旋,重新进入外层循环,获取ctl
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }



        //前提条件:break retry,线程池线程数ctl加1成功
        //workerStarted表示创建的worker是否已经启动,false未启动,true启动
        boolean workerStarted = false;
        //workerAdded表示创建的worker是否已经添加到线程池(HashSet)
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建Worker,执行完后,线程应该是已经创建完成的
            w = new Worker(firstTask);
            //将创建的Worker节点的线程赋值给t
            final Thread t = w.thread;

            //为什么要做t != null 的判断?
            //防止ThreadFactory实现类有bug,因为ThreadFactory是一个接口,谁都可以实现
            //万一直接给个thread是null,这里就有问题,所以要判断是否为null
            if (t != null) {
                //拿到全局锁保存引用到mainLock中
                final ReentrantLock mainLock = this.mainLock;
                //持有全局锁,可能会阻塞,知道获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持有锁
                mainLock.lock();
                //从这里加锁之后,其他线程是无法修改线程池状态的
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //获取当前的线程池状态
                    int rs = runStateOf(ctl.get());


                    if (rs < SHUTDOWN ||//线程池处于RUNNING状态
                        (rs == SHUTDOWN && firstTask == null)//线程池处于SHUTDOWN状态,但是firstTask为null
                    ) {
                        //判断线程是否已经start
                        //防止程序员在ThreadFactory创建线程返回给外部之前将线程start了
                        if (t.isAlive()) // precheck that t is startable
                            //失败的来这里了!
                            throw new IllegalThreadStateException();

                        //将创建的worker添加到线程池中
                        //线程还没start,将worker线程放到HashSet中
                        workers.add(w);
                        //获取线程池的线程数量
                        int s = workers.size();
                        //largestPoolSize:记录线程池生命周期内线程数最大值
                        if (s > largestPoolSize)
                            //修改线程池生命周期内的最大线程数量
                            largestPoolSize = s;
                        //线程池添加线程成功
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //线程池添加线程成功,启动线程
                    //执行的runnable实际上就是worker重写的run方法,会不断执行阻塞队列中的任务
                    t.start();
                    //修改线程启动状态
                    workerStarted = true;
                }
            }
        } finally {
            //判断线程是否启动成功
            if (! workerStarted)
                //1、添加线程失败,释放令牌
                //2、将当前worker清理出workers集合
                addWorkerFailed(w);
        }
        //返回新创建线程启动状态
        return workerStarted;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
关闭线程池

shutdown,shutdownNow:这两个方法都不会等待线程池完全关闭
awaitTermination:等待线程池完全关闭

因为shutdown和shutdownNow使用interrupt只是设置了中断标记位为true,线程关闭的处理需要交给用户来处理,但是线程是在sleep,wait中,则会抛出 InterruptedException ,这个时候会设置中断标记位为false,用户没办法捕获状态关闭线程,因此需要重新设置状态标记位,所以我们就可以借鉴dubbo关闭线程池的源码。
dubbo关闭线程池首先会先shutdown,再通过awaitTermination来检查是否线程池已经完全关闭,如果是true则已经关闭,如果是false则等待超时,没有完全关闭,可能是因为中断标记位被sleep或者wait抛出的异常重新设置了false,因此我们可以重新shutdown来给他设置上中断标记位。

public static void gracefulShutdown(Executor executor, int timeout) {
        if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
            return;
        }
        final ExecutorService es = (ExecutorService) executor;
        try {
            es.shutdown(); // Disable new tasks from being submitted
        } catch (SecurityException ex2) {
            return;
        } catch (NullPointerException ex2) {
            return;
        }
        try {
            if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                es.shutdownNow();
            }
        } catch (InterruptedException ex) {
            es.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (!isShutdown(es)) {
            newThreadToCloseExecutor(es);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

dubbo如何关闭线程池:https://www.cnblogs.com/notlate/p/10204834.html

关闭线程池源码解析
runWorker方法
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        //作用:Worker内部封装的工作线程
        final Thread thread;

        //作用:假设firstTask不为空,那么当worker启动后(worker内的Thread启动后)会优先执行firstTask,当执行完firstTask后,会到queue中获取下一个任务
        Runnable firstTask;

        //记录当前worker完成的任务数量
        volatile long completedTasks;

        //first可以为空,为空启动后会到queue中获取。
        Worker(Runnable firstTask) {
            //设置AQS独占模式为初始化状态,这个时候不能被抢占锁
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口
            this.thread = getThreadFactory().newThread(this);
        }

        //当worker启动时会执行run方法
        public void run()
        {
            //runWorker:ThreadPoolExecutor的核心方法
            runWorker(this);
        }

        //重写钩子函数
        //尝试释放锁(外部不会直接调用这个方法),这个方法是AQS内调用的,外部调用unlock时,unlock -> AQS.release() -> tryRelease()
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
    //w就是启动worker
    final void runWorker(Worker w) {
        //wt == w.thread
        Thread wt = Thread.currentThread();
        //将初始执行task赋值给task
        Runnable task = w.firstTask;
        //清空w.firstTask的引用
        w.firstTask = null;
        //释放锁,因为创建的时候就加锁了(state在构造函数中设置为了-1)
        //为了初始化 worker state == 0 和 exclusiveOwnerThread == null
        w.unlock(); // allow interrupts

        //是否是突然退出
        //true -> 发生异常了,表示线程是突然退出的,需要回头做一些处理
        //false -> 正常退出
        boolean completedAbruptly = true;


        try {

            //有任务就做,这就是线程池提交任务执行的核心了

            //条件1:task != null,firstTask是不是 null
            //条件2:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,该方法会阻塞
            //getTask如果返回null,当前线程需要执行结束逻辑。
            while (task != null || (task = getTask()) != null) {
                //worker设置独占锁为当前线程
                //为什么要设置独占锁?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断worker是否正在工作
                w.lock();

                //条件1:runStateAtLeast(ctl.get(), STOP),说明线程池目前处于STOP或TYDING或TERMINATION,此时线程一定要给他一个中断信号
                //条件1成立:runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
                //上面条件成立,说明当前线程池状态时 >= STOP 且当前线程是未设置中断状态的,此时需要接入到if里面,给当前线程一个中断。

                //假设:runStateAtLeast(ctl.get(), STOP) == false
                //Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 在干吗
                //Thread.interrupted()获取当前中断状态,且设置中断为为false。连续调用两次,第二次一定是返回false
                //runStateAtLeast(ctl.get(), STOP)大概率这里还是false
                //其实它在强制刷新线程的中断标记为false,因为有可能上一次执行task
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    //这个if的作用,只要是线程池状态为STOP TYDING TERMINATION 无论当前线程是否是中断,都设置为中断,接触其他地方的阻塞状态
                    wt.interrupt();
                try {
                    //钩子方法,留给子类实现
                    beforeExecute(wt, task);
                    //表示异常情况,如果thrown不为空,表示task运行过程中,向上抛出异常了。
                    Throwable thrown = null;
                    try {
                        //task可能是FutureTask也可能是普通的Runnable接口实现类
                        //如果前面是通过submit提交的runnable/callable会被封装成FutureTask
                        task.run();
                    //封装futureTask抛出的异常
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //钩子方法,留给子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    //将局部变量task置为空
                    task = null;
                    //更新worker完成任务数量
                    w.completedTasks++;
                    //worker处理完一个任务后,会释放掉独占锁,然后再次到queue中获取任务
                    //1、正常情况下,会再次回到getTask()那里获取任务。while(getTask)
                    //2、task.run()时内部抛出异常了
                    w.unlock();
                }
            }

            //什么情况下,会来到这里?
            //1、getTask()方法返回null,说明当前线程应该执行退出逻辑了
            completedAbruptly = false;
        } finally {
            //task.run()内部抛出异常时,直接从w.unlock();调到这里
            //正常退出completedAbruptly == false。异常退出completedAbruptly == true
            processWorkerExit(w, completedAbruptly);
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

worker处理结束执行processWorkerExit逻辑

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //记录线程池完成的任务数量
            completedTaskCount += w.completedTasks;
            //当前线程和队伍任务完成,移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        //考虑移除之后,线程池线程线程够不够用?

        int c = ctl.get();
        //条件成立,说明当前线程池的状态时RUNNING
        if (runStateLessThan(c, STOP)) {
            //条件成立,说明当前线程是正常退出
            if (!completedAbruptly) {
                //allowCoreThreadTimeOut:控制核心线程数量内的线程是否可用被回收,true可以,false不可以
                //false:核心线程即使是空闲,也能够存活(默认)
                //true:超时回收

                //min 表示线程池最低持有的线程数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;


                //前提条件:线程池状态为RUNNING
                //条件成立:当前线程池最低线程数为0,并且阻塞队列还有任务,此时线程池必须要有线程来处理这些任务
                //如果min设置为1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //条件成立:当前线程池线程数量已经超过了保证线程池运行的最低线程数量,直接退出
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //没达到最低线程,添加一个线程来处理
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

通过submit提交的runnable或者callable方法,会包装成futureTask,再传入execute调用执行任务方法

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

看到这里,是不是觉得很奇怪,Worker#run()方法究竟是在什么时候调用的?
这个我们需要看看Worker的构造方法

 Worker(Runnable firstTask) {
            //设置AQS独占模式为初始化状态,这个时候不能被抢占锁
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //使用线程工厂创建了一个线程,并且将worker指定为runnable,也就是当thread启动的时候,会以worker.run()为入口
            this.thread = getThreadFactory().newThread(this);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这个构造方法实际上传入了该worker,并且该worker实现了Runnable,重写了run方法

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        //r:Worker实现了Runnable,因此可以直接传入Worker,以后调用worker.start
        //或者是worker.thread.start()实际上都是调用了worker里面的run方法
        //这个start的调用,实际上是在addWorker添加worker成功后调用的,run会调用其runWorker方法
        //不断执行getTask去获取阻塞队列中的任务去执行
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
getTask方法

在这里插入图片描述

private Runnable getTask() {
        //表示当前线程获取任务是否超时,默认是false,true表示已超时
        boolean timedOut = false; // Did the last poll() time out?

        //自旋
        for (;;) {
            //获取最新ctl值保存到c
            int c = ctl.get();
            //获取线程池当前运行状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //条件1:rs >= SHUTDOWN
            //条件成立:当前线程池是非RUNNING状态,可能是SHUTDOWN,STOP,...

            //条件2:rs >= STOP || workQueue.isEmpty()
            //条件2.1:rs >= STOP,成立说明当前的状态最低也是STOP状态,一定要返回null(STOP不接受新任务,不执行任务队列任务,直接中断执行中的任务)
            //条件2.2:前置条件 状态时SHUTDOWN,workQueue.isEmpty()条件成立说明当前线程池状态为SHUTDOWN状态,且任务队列为空,此时一定返回null
            //返回null,runWorker方法将会返回null的线程执行退出线程池的逻辑
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //使用cas + 死循环的方式让ctl减1
                decrementWorkerCount();
                return null;
            }

            //执行到这里有几种情况:
            //1、线程池是RUNNING状态
            //2、线程池是SHUTDOWN状态,但是队列未空,此时可以创建线程(线程池状态为SHUTDOWN,不接受新任务但是药执行完任务队列的任务)

            //获取线程池中的线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //timed == true 表示 当前线程获取task时是支持超时机制的,使用queue.poll(xxx,xxx);
            //当获取task超时的情况下,下一次自旋就可能返回null了

            //timed == false 表示当前的线程是不支持超时机制的,当前线程会使用 queue.take()

            //情况1:allowCoreThreadTimeOut == true,表示核心线程数内的线程也可以被回收。所有线程都是支持超时机制的。都是使用queue.poll获取task
            //情况2:allowCoreThreadTimeOut == false,表示当前线程池会维护核心数量内的线程
            //wc > corePoolSize
            //条件成立:当前线程池中的线程数量都是大于核心线程数的,此时让所有路过的这里的线程,都是用poll支持超时的方式去获取任务
            //这样,就会可能有一部分线程获取不到任务,获取不到任务,返回null,然后runWorker会执行线程退出逻辑
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


            //条件1:(wc > maximumPoolSize || (timed && timedOut))
            //条件1.1:wc > maximumPoolSize为什么会成立?setMaxmumPoolSize()方法,可能存在外部线程将线程池最大线程数量设置比初始时小
            //条件1.2:timed && timedOut条件成立:前提条件:当前线程使用poll方式获取task。上一次循环时,使用poll方式获取任务时超时了
            //条件1为true,表示线程可以被回收,达到回收标准了,当确实需要回收时回收

            //条件2:wc > 1 || workQueue.isEmpty()
            //条件2.1:wc > 1 条件成立:说明当前线程池中还有其他线程,当前线程可以直接回收,返回null
            //条件2.2:workQueue.isEmpty(),前置条件wc==1,条件成立:说明当前任务队列已经空了,最后一个线程也可以放心退出
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //使用CAS机制,将ctl -1,减1成功的线程返回null
                //cas成功的返回null
                //cas失败?为什么会cas失败
                //1、其他线程先你一步退出了
                //2、线程池状态发生了变化。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                //再次自旋时,timed有可能时false了,因为当前线程cas失败,很有可能是因为其他线程退出导致的,再
                continue;
            }

            //这里才是正式开始获取任务
            try {
                //take:会响应中断,会一直阻塞到得到任务或者当前线程中断
                //poll:会响应中断,会一直阻塞到超时返回null或者拿到任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //前提条件:获取不到元素
                //获取任务超时,可以尝试减少worker了
                timedOut = true;
            } catch (InterruptedException retry) {
                //中断了,重试
                timedOut = false;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

getTask方法需要结合调用他的方法runWorker来看,如果getTask方法返回为null,那么runWorker将会直接调用processWorkerExit(w, completedAbruptly)方法执行减少worker的退出操作,该操作首先是累加完成的任务数量,减少线程池线程数量(worker),最后如果当前线程池还是RUNNING状态,那么要保持其正常运转需要的最低线程数,没有达到就给他新增线程。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //记录线程池完成的任务数量
            completedTaskCount += w.completedTasks;
            //当前线程和队伍任务完成,移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        //考虑移除之后,线程池线程线程够不够用?

        int c = ctl.get();
        //条件成立,说明当前线程池的状态时RUNNING
        if (runStateLessThan(c, STOP)) {
            //条件成立,说明当前线程是正常退出
            if (!completedAbruptly) {
                //allowCoreThreadTimeOut:控制核心线程数量内的线程是否可用被回收,true可以,false不可以
                //false:核心线程即使是空闲,也能够存活(默认)
                //true:超时回收

                //min 表示线程池最低持有的线程数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;


                //前提条件:线程池状态为RUNNING
                //条件成立:当前线程池最低线程数为0,并且阻塞队列还有任务,此时线程池必须要有线程来处理这些任务
                //如果min设置为1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //条件成立:当前线程池线程数量已经超过了保证线程池运行的最低线程数量,直接退出
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //没达到最低线程,添加一个线程来处理
            addWorker(null, false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
调度器钩子方法

钩子方法beforeExecute:重新初始化ThreadLocal线程本地变量,更新日志,计时统计,更新上下文变量
钩子方法afterExecute:清除ThreadLocal线程本地变量,更新日志记录,收集统计信息,更新上下文变量

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
  • 1
  • 2
public class ThreadPoolExecutorGouzi {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(6)){
            private final ThreadLocal<Long> threadLocal = new ThreadLocal<>();

            public ThreadLocal<Long> getThreadLocal() {
                return threadLocal;
            }

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                //开始计时
                this.threadLocal.set(System.currentTimeMillis());
                super.beforeExecute(t, r);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                //计时结束
                Long used = System.currentTimeMillis() - this.threadLocal.get();
                this.threadLocal.set(used);
                System.out.println(Thread.currentThread().getName() + "--用时:" + used);
                this.threadLocal.remove();
            }
        };
        for (int i = 0; i < 10; i++){
            poolExecutor.submit(() -> {
                int sum = 0;
                for(int j = 0; j < 1000000000; j++){
                    sum += j;
                }
                System.out.println(Thread.currentThread().getName() + "--sum:" + sum);
            });
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        poolExecutor.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
"D:\Program Files\Java\jdk1.8.0_251\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 2020.1.4\lib\idea_rt.jar=64385:D:\Program Files\JetBrains\IntelliJ IDEA 2020.1.4\bin" -Dfile.encoding=UTF-8 -classpath "D:\Program Files\Java\jdk1.8.0_251\jre\lib\charsets.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\deploy.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\access-bridge-64.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\cldrdata.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\dnsns.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\jaccess.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\jfxrt.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\localedata.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\nashorn.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunec.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunjce_provider.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunmscapi.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\sunpkcs11.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\ext\zipfs.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\javaws.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jce.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jfr.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jfxswt.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\jsse.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\management-agent.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\plugin.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\resources.jar;D:\Program Files\Java\jdk1.8.0_251\jre\lib\rt.jar;D:\project\learning\juc-learning\target\classes" book2.ThreadPoolExecutorGouzi
pool-1-thread-2--sum:-1243309312
pool-1-thread-2--用时:398
pool-1-thread-3--sum:-1243309312
pool-1-thread-3--用时:401
pool-1-thread-4--sum:-1243309312
pool-1-thread-4--用时:407
pool-1-thread-1--sum:-1243309312
pool-1-thread-1--用时:417
pool-1-thread-3--sum:-1243309312
pool-1-thread-3--用时:406
pool-1-thread-1--sum:-1243309312
pool-1-thread-3--sum:-1243309312
pool-1-thread-3--用时:355
pool-1-thread-4--sum:-1243309312
pool-1-thread-4--用时:755
pool-1-thread-2--sum:-1243309312
pool-1-thread-1--用时:745
pool-1-thread-2--用时:764
pool-1-thread-3--sum:-1243309312
pool-1-thread-3--用时:380

Process finished with exit code 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
拒绝策略

在这里插入图片描述

public interface RejectedExecutionHandler {w4
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
  • 1
  • 2
  • 3
//调用者线程执行策略
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //e.isShutdown():false表示线程池处在RUNNING状态 true表示线程池处在其他状态
            //条件1:成立表示线程池处在RUNNING状态
            //线程池只有在RUNNING状态下才可以继续处理新的任务
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
//抛出异常策略,超过阻塞队列和maximumPoolSize,抛出异常
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        //不做任何操作,直接抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
    //直接抛弃策略,不抛出异常
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        //方法内没有代码逻辑,表示不对任务作出处理,也不抛异常,直接抛弃
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
//抛出最旧任务,再次尝试提交任务到队列,
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //e.isShutdown():false表示线程池处在RUNNING状态 true表示线程池处在其他状态
            //条件1:成立表示线程池处在RUNNING状态
            //线程池只有在RUNNING状态下才可以继续处理新的任务
            if (!e.isShutdown()) {
                //抛出最旧的任务
                e.getQueue().poll();
                //再次尝试将任务提交
                e.execute(r);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
shutdown方法
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //判断权限
            checkShutdownAccess();
            //设置当前线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断空闲线程
            //中断线程池中所有可以中断的线程(空闲线程,没有设置为中断状态的线程,为什么会会有中断状态,可能是阻塞被外界唤醒的中断)
            interruptIdleWorkers();
            //回调方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
  • 1
  • 2
  • 3
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        //修改workers,先上个全局锁
        mainLock.lock();
        try {
            //获取所有的worker
            for (Worker w : workers) {
                Thread t = w.thread;
                //条件成立:获取的worker线程没有被中断,尝试上锁改worker成功
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //设置worker线程的中断标记位
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        //解锁worker
                        w.unlock();
                    }
                }
                //条件成立;说明只中断一个,可以退出了
                if (onlyOne)
                    break;
            }
        } finally {
            //解开全局锁
            mainLock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

onShutdown是一个钩子方法,需要子类实现

    void onShutdown() {
    }
  • 1
  • 2
shutdownNow方法
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //设置线程池状态为STOP
            advanceRunState(STOP);
            //中断所有的线程(已经启动的线程)
            interruptWorkers();
            //删除阻塞队列中的所有没有被处理的任务,并且返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
        //中断已经启动的线程
        void interruptIfStarted() {
            Thread t;
            //线程是启动状态,并且没有设置过中断,那么就设置中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
    private List<Runnable> drainQueue() {
        //获取阻塞队列引用
        BlockingQueue<Runnable> q = workQueue;
        //存放所有没有被处理的任务
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        //将所有任务转移到taskList
        q.drainTo(taskList);
        //条件成立:阻塞队列还有任务没有转移完
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                //删除阻塞队列中的任务
                if (q.remove(r))
                    //添加任务到队列
                    taskList.add(r);
            }
        }
        return taskList;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
tryTerminate方法
    final void tryTerminate() {
        for (;;) {
            //获取线程池状态
            int c = ctl.get();


            //不允许执行tryTerminate的情况
            //条件1:成立线程池处于RUNNING状态,还在处理任务,退出
            //条件2:成立线程池处于TIDYING,TERMINATE状态,已经走过这些逻辑,直接退出
            //条件3:成立线程池处于SHUTDOWN状态并且阻塞队列还有任务,任务没有处理完,退出
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            //前提条件:线程池状态为STOP,SHUTDOWN并且任务队列为空

            //条件成立:线程池线程数量大于0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //中断一个空闲线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            //上全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //设置线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //设置线程池状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //termination条件已经满足了,可以唤醒所有等待termination的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                //接触全局锁
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
Executors源码
//一池固定线程
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

//一池一线程
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

//可扩容的线程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

//可调度线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue()//任务的排队队列,按到期时间升序排序的阻塞队列
             );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • FixedThreadPool和SingleThreadExecutor:阻塞队列长度都是Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM
  • CachedThreadPool和ScheduledThreadPoolExecutor:允许创建的数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM
确定线程池线程数

IO密集型:线程数为CPU核心数 * 2
CPU密集型:线程数为CPU核心数
混合型:( IO耗时与CPU耗时之比+ 1)* CPU核心数

ThreadLocal

引用类型

学习ThreadLocal之前,需要补充一下Java中的引用类型。

1.强引用

强引用是默认的引用类型,即声明一个对象直接赋值给变量,该变量在不赋值为null的情况下,无论在什么情况,甚至在OOM的情况下依然不回收。只有在引用为null的情况才会被回收。

    public static void strongReferenceTest(){
        //强引用
        Test2 test2 = new Test2();
        System.out.println(test2);
        test2 = null;
        System.gc();
        System.out.println(test2);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
2.软引用

软引用在系统内存空间充足的情况下不会被回收,只有在系统内存空间不足的时候才会被回收。通常使用在内存敏感的地方,如高速缓存。软引用使用java.lang.ref.SoftReference类来实现,可以豁免一些垃圾回收。

    //-Xms9M -Xmx9m
    public static void softReferenceTest(){
        SoftReference<MyObject> softReference = new SoftReference<>(new MyObject());
        System.gc();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("---gc after 内存够用:" + softReference.get());
        byte[] bytes = new byte[1024 * 1024 * 9];
        System.out.println("----- gc after 内存不够:" + softReference.get());
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
3.弱引用

弱引用生命周期更短,对于只有弱引用的对象,只要垃圾回收一启动就会回收该对象占用的内存。通过java.lang.ref.WeakReference类来实现。
弱引用和软引用可以使用在需要加载大量图片的情况下,此时可能会造成OOM,gc的时候就可以释放了,

Map<String,SoftReference<Bitmap>> imageCache = new HashMap<String,SoftReference<Bitmap>>;
  • 1
    public static void weakReferenceTest(){
        WeakReference<MyObject> weakReference = new WeakReference<>(new MyObject());
        System.out.println("---gc before内存够用:" + weakReference.get());
        System.gc();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----gc after内存够用:" + weakReference.get());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
4.虚引用

虚引用就是形同虚设,如果一个对象仅持有虚引用,那么就和没有任何引用一样,在任何时候都可能会被垃圾回收期回收,它不能订单使用,也不能通过它访问对象,虚引用必须和引用队列(ReferenceQueue)联合使用

public static void phantomReferenceTest(){
        ReferenceQueue<MyObject> myObjectReferenceQueue = new ReferenceQueue<>();
        PhantomReference<MyObject> phantomReference = new PhantomReference<>(new MyObject(), myObjectReferenceQueue);
        List<byte[]> bytes = new ArrayList<>();
        new Thread(() -> {
            while(true){
                list.add(new byte[1 * 1024 * 1024]);
                try {
                    TimeUnit.MILLISECONDS.sleep(600);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(phantomReference.get());
            }
        },"t1").start();

        new Thread(() -> {
            while(true){
                Reference<? extends MyObject> reference = myObjectReferenceQueue.poll();
                if (reference != null){
                    System.out.println("有虚对象加入了队列");
                }
            }
        },"t2").start();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
小总结

在这里插入图片描述

ThreadLocal的简介
ThreadLocal的使用
手撕ThreadLocal源码

第四章 显式锁

第1节 什么是显式锁

第2节 悲观锁与乐观锁

CLH自旋锁

CLH自旋锁是后面AQS抽象队列同步器的基础,需要掌握。

第3节 公平锁与非公平锁

第4节 可中断锁与不可中断锁

第5节 可中断锁与不可中断锁

第6节 共享锁与独占锁

第7节 读写锁

第五章 JUC容器(下)

第六章 高并发设计模式

第七章 高并发异步回调模式

第八章 CompletableFuture 异步回调

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/363180?site
推荐阅读
相关标签
  

闽ICP备14008679号