赞
踩
AQS( AbstractQueuedSynchronizer ),抽象队列同步器,是 java.util.concurrent.locks 包下的一个抽象类。它提供了一个 int 型的同步状态 state 和一个 FIFO 的双向队列来实现线程间的同步。称之为抽象队列是因为这个队列并不是实际存在的对象,而仅仅是相邻结点间维护的一种关系,就像队列一样(其实是一个双向链表)。
AQS 本身是没有实现任何同步接口的,它仅仅是提供了同步状态的获取和释放的方法让自定义的同步组件使用。一般是在自定义的同步组件中定义一个静态的内部类来继承 AQS,同步组件配合静态内部类来实现锁的功能。比如 ReentrantLock 中定义了静态内部类 Sync 来继承 AQS。
AQS 其实就是一个框架,通过 AQS,可以方便的实现线程间的同步。
我们常说不要重复造轮子。上一节 java并发编程(七)——Lock锁 我们就提到过 synchronized 的缺点,在 JDK1.5 之前,synchronized 是一个非常耗费资源的操作,而每次需要实现线程同步时,如果都是用 synchronized 加锁,必然会浪费很多资源。那么性能使我们重复造轮子的理由吗?显然不是,因为 JDK1.6 之后 synchronized 做了很多的优化,比如引入了偏向锁和自旋锁,性能已经追了上来。性能是可以进行优化的,并不是我们重复造轮子的理由。那么为什么还需要使用 AQS 呢?我们知道,破坏死锁产生的条件可以避免死锁。但是 synchronized 是无法完成这个任务的,因为 synchronized 获取不到锁时会一直阻塞,而 Lock 是可以解决这些问题的,Lock 的实现类就是借助了 AQS,具体参照上一节。除了 ReentrantLock,很多的同步组件都需要借助 AQS 来完成,比如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的,可以看下图。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。
AQS 的主要功能
1、同步状态的原子性管理。
2、线程的阻塞和解除阻塞。
3、提供阻塞线程的存储队列。
基于这三大功能,衍生出下面的附加功能:
1、通过中断实现的任务取消,此功能基于线程中断实现。
2、可选的超时设置,也就是调用者可以选择放弃等待任务执行完毕直接返回。
3、定义了 Condition 接口,用于支持管程形式的 await/signal/signalAll 操作,代替了 Object 类基于JNI 提供的 wait/notify/notifyAll。
AQS 的核心思想
1、如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
2、如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS 对资源的共享方式
1、Exclusive(独占):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
2、Share(共享):多个线程可同时执行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器中共享资源的竞争方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在底层已经帮我们实现好了。
同步状态——state
AQS 使用一个 int 型成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
state 状态信息的修改是通过 procted final 类型的 getState(),setState(),compareAndSetState() 进行操作的。
//返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state = newState; } //如果当前同步状态的值等于expect(期望值),就进行原子操作(CAS操作)将同步状态的值设置为给定值update protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改 state 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
为什么要提供 setState 和 compareAndSetState 两种方式设置锁的状态?
这个问题的关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState:
1、lock.lock() 获取锁时会发生数据竞争,必须使用 CAS 来保障线程安全,也就是 compareAndSetState 方法。
2、lock.unlock() 释放锁时,线程已经获取到锁,没有数据竞争,也就可以直接使用 setState 修改锁的状态。
AQS 数据结构
我们之前说过,AQS 是通过 CLH 队列的变种来管理同步状态 state 的,这个变种队列是一个 FIFO 队列。
CLH 基本原理(自己的理解):
1、使用 FIFO 队列保证公平性,用内部类的属性 locked 来表示结点的状态,为true,表示当前结点需要获取锁,为 false 表示当前结点已经释放了锁。
2、使用 tail 属性来记录尾结点,每次入队都在队尾进行。
3、有当前结点(myNode)和前置结点(myPred),当前结点不断自旋,查询(监听)前置结点的状态 locked。
4、一系列的前置结点和当前结点构成队列。
5、当前结点运行完成后,更改自己的状态,那监听当前结点状态的线程就会结束自旋。
可见,CLH 其实是以自旋的方式来维护结点的状态,看起来更像一个单向链表。每个结点维护了一个变量 locked,CLH 是一个抽象的队列,因为每个结点并没有显示的 next 指针,而是使用 myPred 来关联前置结点和当前结点。具体方式如下:
入队:1、新建一个结点,创建一个与该结点关联的 QNode 对象,记录当前结点的 locked 状态,将该结点的前置结点(myPred)设置为之前的尾结点 tail,然后把 tail 设置为当前结点(这样其他结点可以继续在后面添加)。
2、检测前置结点的 locked 是否为 false,如果是,则当前结点符合条件。
3、如果前置结点的 locked 为 true,则自旋判断 locked 状态,直到为 false 时停止自旋。
出队:将当前结点的 locked 域设置为 false,后继结点检测到 locked 状态停止自旋,回收当前结点。
AQS 中的队列是 CLH 队列的变种,添加了一些功能:
1、等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。支持阻塞而不是一直自旋,竞争激烈时,阻塞性能更好。
2、每个节点都设计为一个持有单独的等待线程并且"带有具体的通知方式"的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
3、一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
4、等待队列中的节点通过 prev 属性连接前驱节点,通过 next 属性连接后继节点,简单来说,就是双向链表的设计。
5、CLH 队列本应该需要一个虚拟的头节点,但是在 AQS 中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
6、Condition(条件)等待队列中的阻塞线程使用的是相同的 Node 结构,但是提供了另一个链表用来存放,Condition 等待队列的实现比非 Condition 等待队列复杂。
7、支持可重入,支持取消节点,支持中断,支持独占(互斥)和共享两种模式。
AQS 将获取锁和释放锁的过程交给子类重写,本身关注的只是队列的构建,也就是解决获取锁失败后的问题。AQS 将每条请求共享资源的线程封装成 Node 结点,各个 Node 结点构成了一个双向的队列。AQS 中共有两种等待队列,其中一种是普通的同步等待队列,这里命名为Sync Queue,另一种是基于Sync Queue实现的条件等待队列,这里命名为Condition Queue。其数据结构如下图,详细的内容会在下面讲解。
AQS 底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器,我们只需要继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放) 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
使用 AQS,我们只需要重写以下方法
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
一般来说,需要子类重写的方法会用 abstract 来修饰,但是 AQS 中需要我们重写的方法都没有使用 abstract 来修饰,这是为什么呢?原因很简单,因为一个同步器或者锁不可能既是独占式又是共享式,为了避免重写不必要的方法,就没有使用 abstract 来修饰,但是在每个方法中都需要抛出异常来告知不能直接使用该方法。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
AQS 类中的其他方法都是 final,所以无法被其他类使用,只有以上五个方法可以被其他类使用。
下面这张图展示了 AQS 的内部结构
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {}
从 AQS 的继承关系来看,AQS 有一个父类 AbstractOwnableSynchronizer。当一个线程以独占的方式获取同步器的时候,我们可能需要记录这个线程的信息,而 AbstractOwnableSynchronizer 这个类就是用来获取和设置独占线程的信息。该类的源码很简单,提供 getter h和 setter 方法,都是用 final 修饰的,子类只能使用而不能重写。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
static final class Node { // 标记一个节点处于共享模式下的等待 static final Node SHARED = new Node(); // 标记一个节点处于独占模式下的等待 static final Node EXCLUSIVE = null; // 取消状态,表示当前的线程被取消 static final int CANCELLED = 1; // 唤醒状态,表示当前节点的后继节点包含的线程需要运行,也就是unpark static final int SIGNAL = -1; // 条件等待状态,表示当前节点在等待condition,也就是在condition队列中 static final int CONDITION = -2; // 传播状态,表示当前场景下后续的acquireShared能够得以执行 static final int PROPAGATE = -3; // 当前结点的状态,初始值为0,表示当前节点在sync队列中,等待着获取锁,其他可选值是上面的4个值 volatile int waitStatus; // 当前节点前驱节点的引用 volatile Node prev; // 当前节点后继节点的引用 volatile Node next; // 当前节点持有的线程,可能是阻塞中等待唤醒的线程 volatile Thread thread; // 下一个等待节点 Node nextWaiter; // 当前操作的节点是否处于共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取当前节点的前驱节点,若前驱结点为空,抛出异常 final Node predecessor() { Node p = prev; // 保存前驱结点 if (p == null) // 前驱结点为空,抛出异常 throw new NullPointerException(); else // 前驱结点不为空,返回 return p; } // 空节点,主要是首次创建队列的时候创建的头和尾节点使用 Node() {} // 设置下一个等待节点,设置持有线程为当前线程 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 设置waitStatus,设置持有线程为当前线程 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
队列在首次创建时没有线程实例,提供空的构造函数来初始化头结点和尾结点。
每个线程都会被封装成一个 Node 结点放入队列中。每个结点包含了一个 Thread 类型的引用,并且每个结点都存在一个状态,用 waitStatus 来表示 Node 实例的状态,共有五种状态:
1、默认值为 0,表示结点正在同步队列中处于等待状态。
2、CANCELLED 值为 1,表示当前结点实例因为超时或者线程中断而被取消。等待中的节点永远不会处于此状态。只有 CANCELLED 是正值,因此正值表示结点已被取消,而负值表示有效等待状态。
3、SIGNAL 值为 -1,表示当前结点的后继结点被阻塞了,当它释放或取消时,当前结点必须通过 unpark 唤醒它的后继结点。后继结点入队时,会将前继结点的状态更新为 SIGNAL。
4、CONDITION 值为 -2,表示当前结点处于条件队列中,等待条件符合后被唤醒。当它转换为同步队列中的结点后,状态会被重新设置为 0。
5、PROPAGATE 值为 -3,表示当前场景下后续的 acquireShared 能够得以执行。简单理解就是共享模式下,结点释放的传递信号,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
nextWaiter:指向下一个等待的结点。这里需要注意一下,虽然表面是指向下一个结点,但其实 nextWaiter 有三种取值情况:
1、取值为 Node.SHARED,表示当前的 Node 实例处于共享模式。
2、取值为 Node.EXCLUSIVE(也就是 null),表示当前的 Node 实例处于独占模式。
3、取值既不是 Node.SHARED,也不是 Node.EXCLUSIVE 的其它结点,指向条件队列中当前结点的下一个结点。
上面已经介绍了 AQS 中 Node 结点的结构,下面我们看下同步队列是如何工作的。首先需要明确的是同步队列是一个双向链表,AQS 中有两个 Node 型的变量 head 和 tail 分别存放头结点和尾结点。
//记录头结点 private transient volatile Node head; //记录尾结点 private transient volatile Node tail; //将节点设置为头结点 private void setHead(Node node) { head = node; node.thread = null; //当前结点被设置为头结点,该节点的线程一定已经解除了阻塞 node.prev = null; //头结点一定没有前置结点 } //CAS操作设置头结点 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } //CAS操作设置尾结点 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
添加结点
当一个结点需要入队时,会调用 addWaiter 方法,其源码如下
//将结点添加到同步等待队列,实际上初始化队列也是这个方法完成的 private Node addWaiter(Node mode) { //构造Node节点,包含当前线程信息以及节点模式【独占/共享】 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //新建一个变量pred将指针指向tail结点,用于下面的判断的CAS操作 if (pred != null) { //如果尾结点不为空,表示队列已经初始化 node.prev = pred; //将当前结点的前置结点暂时设置为pred, (pred不一定是最新的尾结点) if (compareAndSetTail(pred, node)) { //如果同时有多个结点要修改尾结点,通过CAS保证设置的当前结点为最新的尾结点 pred.next = node; //如果CAS操作成功,就把尾结点的后继指向当前结点 return node; //直接返回新构建的结点,该节点就是当前队列的尾结点 } } enq(node); //如果队列为空,即没有被初始化,或者上面的CAS操作失败了就需要后续的入队操作 enq return node; }
private Node enq(final Node node) { for (;;) { // 无限循环,确保结点能够成功入队列 Node t = tail; // 保存尾结点 if (t == null) { // 尾结点为空,即还没被初始化 if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点 tail = head; // 头结点与尾结点都指向同一个新生结点 } else { // 尾结点不为空,即已经被初始化过,但是之前的CAS设置尾结点操作失败了 node.prev = t; // 将node结点的prev域连接到尾结点 if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node t.next = node; // 设置尾结点的next域为node return t; // 返回尾结点 } } } }
灵魂拷问:
1、为什么是要新建一个结点 pred 而不是直接使用 tail?
主要原因是在多线程的情况下,某个线程设置前置结点时使用的 tail 和设置后继结点时使用的 tail 可能并不是同一个,同时如果不使用局部变量记录 tail 结点,那么 CAS 操作也不能正常进行,因为每次使用的都是同一个tail。具体请看—— addWaiter问题
2、为什么只保证设置前置结点时的原子性?
从源码中可以看出来,每次 CAS 操作之前设置的是当前的结点的前置结点 node.prev,即保证了前置域设置的原子性。AQS 使用双向链表,双向链表目前没有无锁算法的实现。双向链表需要同时设置前驱和后继结点,这两次操作只能保证一个是原子性的。即不能同时保证前置域 prev 的原子性和后置域 next 的原子性。那么为什么选择保证前置接结点的原子性呢?从操作来看:
设置后置域需要两个 CAS 操作,即 tail.next = node,然后把 tail 指针设置为当前结点。这两个操作必须原子性进行。
设置前置域只需要一个 CAS 操作,即设置 tail 指针。你可能会问,不需要设置 node.prev = tail 吗,当然需要,但是此时 Node 结点是没有进入队列的,我们对 Node 结点操作不存在竞争问题,也就不需要保证 CAS 操作,只需要在后面设置 tail 指针时进行一次 CAS 操作,如果 CAS 操作失败表示 tail 已经被改变,重新设置 node.prev = tail,直至 CAS 操作成功从而保证了前置域的原子性。这块我没弄明白,请参照—— 锁原理 - AQS 源码分析 。弄懂了请在下面评论。
3、明知道 node.next 有可见性问题,为什么还要设计成双向链表?
唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。通过上面的 CAS 入队操作我们知道 node.prev 一定是线程安全的。而 node.next 并不是通过 CAS 设置的,所以不一定是正确的。一般会认为以下两种情况中 node.next 是不可靠的,需要从 tail 反向遍历:
1、node.next = null:可能结点刚刚插入链表中,node.next 仍为空,此时有其它线程通过 unparkSuccessor 来唤醒该线程。
2、node.next.waitStatus > 0:结点已经取消,next 值可能已经改变。
4、队列初始化是在什么时候?
AQS 同步队列的初始化会延迟到第一次可能出现竞争的情况,这是为了避免无谓的资源浪费。在首次调用 addWaiter 方法时,如果队列为空,就调用 enq 方法进行死循环。第一次死循环初始化队列的头结点和尾结点,第二次进行入队操作,也就是说死循环至少进行两次。
下面的图演示了三个线程进入队列的过程
删除结点
结点的删除是在队首进行的,删除结点只需要设置头结点就可以了。注意,头结点只是一个虚结点,起哨兵的作用。
private void setHead(Node node) { head = node; //将头结点设置为当前获得锁的结点 node.thread = null; //方便 GC node.prev = null; //方便 GC }
删除头结点就是把之前的 head 指针后移,该方法中的操作都不是 CAS,因为头结点的更新是在获取锁以后进行的,所以不需要使用 CAS 就能保证原子性 。
setHead 方法是把当前节点置为虚节点,但并没有修改 waitStatus,因为它是一直需要用的数据。
作为一个队列来说,该操作已经完成了删除结点的任务,但是如果要配合线程来用,还需要其他操作,将在下面的讲解。
取消结点
取消结点,这里指的是逻辑删除,将结点的状态标记为 CANCELLED,同时修改 node.next 域。取消结点操作比较复杂,因为要考虑取消的结点可能为尾结点、中间结点、头结点三种情况。
private void cancelAcquire(Node node) { if (node == null) //如果结点为空,直接返回 return; node.thread = null; //将关联的线程信息清空 Node pred = node.prev; //获取前置结点 while (pred.waitStatus > 0) //跳过所有同样是取消状态的前置结点 node.prev = pred = pred.prev; //while循环结束后拿到的是有效的前置结点 Node predNext = pred.next; //获取有效的前置结点的后继结点 node.waitStatus = Node.CANCELLED; //将当前结点的状态设置为 CANCELLED //如果当前结点为尾结点,则直接删除该节点,将前置结点设置为尾结点 if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); //前置结点为尾结点,没有后继结点,设置为null } else { //如果当前结点不为尾结点,或者CAS操作失败 int ws; //1、如果当前结点不是头结点的后继结点 if (pred != head && //2、判断当前节点有效前驱节点的状态是否为 SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || //3、如果不是,就尝试将前驱结点设置为 SIGNAL (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && //如果1为true,2中只要有一个为true,就判断前置结点的线程是否为null pred.thread != null) { //如果上述条件满足,就把前置结点的后继结点设置为当前结点的后继结点,即当前结点出队 Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
整体流程如下:
1、获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node设置为 CANCELLED。
2、根据当前节点的位置,考虑以下三种情况:
(1)当前节点是尾节点。
(2)当前节点是 Head 的后继节点。
(3)当前节点不是Head的后继节点,也不是尾节点。
上面三种情况对应下面的图:
当前节点是尾节点。
当前节点是Head的后继节点。
当前节点不是Head的后继节点,也不是尾节点。
为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?
从源码可以看出,cancelAcquire 唯一可以确定的是将 node.waitStatus 修改成 CANCELLED,如果被取消的是头结点时,需要唤醒后继结点。至于取消的结点是尾结点或中间结点,并不能保证操作成功与否。因为用的只是 if 判断,而没有使用 while 循环。在AQS中所有以 compareAndSe t开头的方法都是尝试更新,并不保证成功。
执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 shouldParkAfterFailedAcquire 方法了),如果此时修改后继结点的 Prev 指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。 shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);
cancelAcquire 方法只是逻辑删除,将结点状态标记为 CANCELLED,同时可以修改 node.next 域。从这我们也可以看到为什么 unparkSuccessor 方法唤醒后继结点时,如果后继结点已经 CANCELLED,就需要从 tail 反向遍历结点,因为 next 域可能已经被修改。
唤醒结点
在取消结点时,如果当前结点的前置结点时头结点,那么取消该节点时需要唤醒该结点的后继结点,这个唤醒操作就是 unparkSuccessor。
private void unparkSuccessor(Node node) { //获取当前结点的状态 int ws = node.waitStatus; if (ws < 0) //将当前结点的状态设置为默认值 0 node.compareAndSetWaitStatus(ws, 0); Node s = node.next; //获取当前结点的后继结点 //如果下一个结点为空,或者下一个结点的状态为 CANCELLED if (s == null || s.waitStatus > 0) { s = null; //从尾结点开始从后往前开始遍历找到队列第一个waitStatus<=0的节点。(重点) for (Node p = tail; p != node && p != null; p = p.prev) // 如果是独占式,这里小于0,其实就是 SIGNAL if (p.waitStatus <= 0) s = p; } //如果当前节点的下个节点不为空,而且状态<=0,就把唤醒下个结点的线程 if (s != null) LockSupport.unpark(s.thread); }
唤醒结点时,需要找到后继的有效结点,为什么要从后往前找第一个非 Cancelled 的节点呢?
1、在上面 addWaiter 时我们就说过,结点入队只保证了 prev 域的原子性,而 next 域的原子性并没有得到保证,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时 pred.next = node;还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。
2、在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。node.next 是辅助结点,存在可见性问题,但 node.prev 一定可以遍历所有的结点。
上面我们已经了解了 AQS 中同步队列的相关操作,AQS 对资源的获取有两种模式,独占式和共享式,接下来我们就通过这两种模式学习 AQS 中其它的方法。
上一节我们已经介绍过 Lock 接口中的 Condition 对象。它就像 sychronized 中的 Monitor 对象一样,起着监视器的作用,称之为管程。传送门 java并发编程(七)——Lock锁
Condition必须固有地绑定在一个Lock的实现类上,也就是要通过Lock的实例建立Condition实例,而且Condition的方法调用使用必须在Lock的"锁定代码块"中,这一点和synchronized关键字以及Object的相关JNI方法使用的情况十分相似。
条件等待队列需要结合同步等待队列使用,这也刚好对应于前面提到的 Condition 的方法调用使用必须在 Lock 的锁定代码块中
在 AQS 中存在另一种较为复杂的等待队列——条件等待队列。Condition 的实现类是 ConditionObject,这是一个公有的内部类,其源码如下:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; // 该类中只有两个成员变量 private transient Node firstWaiter; // 条件队列的第一个结点 private transient Node lastWaiter; // 条件队列的最后一个结点 // 公有的构造函数,用于创建一个新结点 public ConditionObject() { } // 私有方法,添加一个条件等待结点 private Node addConditionWaiter() { // 判断线程是否正在独占资源,需要子类重写 if (!isHeldExclusively()) // 如果是独占资源就抛出异常 throw new IllegalMonitorStateException(); Node t = lastWaiter; // 记录下最后一个等待结点 // 如果t不为空并且不是CONDITION状态,那么肯定是CANCELL状态 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); // 清除掉所有的已取消的结点 t = lastWaiter; // 此时t如果不为空,一定未被取消 } // 新建一个CONDITION状态的结点用于封装当前线程 Node node = new Node(Node.CONDITION); // 如果t为空,表示等待队列还未建立,就将node设为头结点,即第一个结点 if (t == null ) firstWaiter = node; else // 如果队列已经存在其它结点,就将node结点加到队列最后一个结点后面 t.nextWaiter = node; lastWaiter = node; // 将最后一个结点设置为node,即使node是第一个结点 return node; } // 内部方法,唤醒队列的第一个结点 private void doSignal(Node first) { do { // 此处判断结点的后继结点是否为空,同时已经移动了firstWaiter指针 if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null; // 如果后继结点为空,那么将条件队列置空 // 上面的if中firstWaiter指针已经移动到first的下一个结点,此处将first出队 first.nextWaiter = null ; // 不断将结点加入到 syn 队列中,直至该结点为空或者已被取消 } while (!transferForSignal(first) && (first = firstWaiter) != null ); } // 该方法与doSignal类似,不过不论当前结点是否已被取消都会加入到 syn 队列中 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } // 移除所有的已取消的结点 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; // 第一次进来trail才会为null,也就是头结点已被取消 if (trail == null ) firstWaiter = next; else // trail记录的是前置结点,此处清除结点 t trail.nextWaiter = next; // 不断循环lastWaiter始终指向当前循环的最以一个未取消的结点 if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } // 公有方法,类外可以调用,用于将等待队列中的结点加入到同步队列 public final void signal() { // 如果是独占模式就抛出异常 if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); // 注意,该方法在遇到已取消的结点时会停止 } // 公有方法,唤醒等待队列中所有的结点 public final void signalAll() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignalAll(first); // 无论是否以被取消,都会加入到同步队列 } // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (! isOnSyncQueue(node)) { LockSupport.park( this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */ /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1 ; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1 ; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ // 这是一个很复杂的判断,用了两个三目表达式,作用是如果新建的等待节点所在线程中断, // 则把节点的状态由CONDITION更新为0,并且加入到同步等待队列中,返回THROW_IE中断状态,如果加入同步队列失败,返回REINTERRUPT // 如果新建的等待节点所在线程没有中断,返回0,也就是初始状态的interruptMode private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ // 等待完毕后报告中断处理,前边的逻辑得到的interruptMode如果为THROW_IE则抛出InterruptedException,如果为REINTERRUPT则中断当前线程 private void reportInterruptAfterWait( int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * Implements interruptible condition wait. * <ol> * <li>If current thread is interrupted, throw InterruptedException. * <li>Save lock state returned by { @link #getState}. * <li>Invoke { @link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li>Block until signalled or interrupted. * <li>Reacquire by invoking specialized version of * { @link #acquire} with saved state as argument. * <li>If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ // 把线程加入到等待队列,当前线程在接到信号或被中断之前一直处于等待状态 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 添加一个结点到等待队列 /* fullyRelease释放当前AQS中的所有资源 其实也就是基于status的值调用release(status) 这一步对于锁实现来说,就是一个解锁操作 释放失败则标记等待状态为取消 */ int savedState = fullyRelease(node); // 获取释放的状态 int interruptMode = 0; // 初始化中断 // 如果节点新建的节点不位于同步队列中(理论上应该是一定不存在), // 则对节点所在线程进行阻塞,第二轮循环理论上节点一定在同步等待队列中 while (! isOnSyncQueue(node)) { LockSupport.park( this); // 阻塞线程 // 如果线程被中断就结束循环,并且设置中断标志 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } // 节点所在线程被唤醒后,如果节点所在线程没有处于中断状态,则以独占模式进行头节点竞争 // 注意这里使用的state是前面释放资源时候返回的保存下来的state if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下一个等待节点不空,则从等待队列中移除所有取消的等待节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // interruptMode不为0则按照中断模式进行不同的处理 if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } /** * Implements timed condition wait. * <ol> * <li>If current thread is interrupted, throw InterruptedException. * <li>Save lock state returned by { @link #getState}. * <li>Invoke { @link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li>Block until signalled, interrupted, or timed out. * <li>Reacquire by invoking specialized version of * { @link #acquire} with saved state as argument. * <li>If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 public final long awaitNanos( long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // We don't check for nanosTimeout <= 0L here, to allow // awaitNanos(0) as a way to "yield the lock". final long deadline = System.nanoTime() + nanosTimeout; long initialNanos = nanosTimeout; Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (! isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos( this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); long remaining = deadline - System.nanoTime(); // avoid overflow return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; } /** * Implements absolute timed condition wait. * <ol> * <li>If current thread is interrupted, throw InterruptedException. * <li>Save lock state returned by { @link #getState}. * <li>Invoke { @link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li>Block until signalled, interrupted, or timed out. * <li>Reacquire by invoking specialized version of * { @link #acquire} with saved state as argument. * <li>If interrupted while blocked in step 4, throw InterruptedException. * <li>If timed out while blocked in step 4, return false, else true. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态 public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false ; int interruptMode = 0 ; while (! isOnSyncQueue(node)) { if (System.currentTimeMillis() >= abstime) { timedout = transferAfterCancelledWait(node); break ; } LockSupport.parkUntil( this , abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return ! timedout; } /** * Implements timed condition wait. * <ol> * <li>If current thread is interrupted, throw InterruptedException. * <li>Save lock state returned by { @link #getState}. * <li>Invoke { @link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li>Block until signalled, interrupted, or timed out. * <li>Reacquire by invoking specialized version of * { @link #acquire} with saved state as argument. * <li>If interrupted while blocked in step 4, throw InterruptedException. * <li>If timed out while blocked in step 4, return false, else true. * </ol> */ // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 // 此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0 public final boolean await( long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); // We don't check for nanosTimeout <= 0L here, to allow // await(0, unit) as a way to "yield the lock". final long deadline = System.nanoTime() + nanosTimeout; Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false ; int interruptMode = 0 ; while (! isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { timedout = transferAfterCancelledWait(node); break ; } if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos( this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return ! timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return { @code true} if owned */ // 传入的节点是否在同步队列中 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer. this ; } /** * Queries whether any threads are waiting on this condition. * Implements { @link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return { @code true} if there are any waiting threads * @throws IllegalMonitorStateException if { @link #isHeldExclusively} * returns { @code false} */ // 查询是否有正在等待此条件的任何线程 protected final boolean hasWaiters() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true ; } return false ; } /** * Returns an estimate of the number of threads waiting on * this condition. * Implements { @link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if { @link #isHeldExclusively} * returns { @code false} */ // 返回正在等待此条件的线程数估计值 protected final int getWaitQueueLength() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0 ; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++ n; } return n; } /** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements { @link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if { @link #isHeldExclusively} * returns { @code false} */ // 返回包含那些可能正在等待此条件的线程集合 protected final Collection<Thread> getWaitingThreads() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<> (); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null ) list.add(t); } } return list; } }
添加结点到条件队列的过程
从条件队列中唤醒结点
AQS同步器如果使用独占(EXCLUSIVE)模式,那么意味着同一个时刻,只有唯一的一个结点所在线程获取(acuqire)原子状态 status 成功,此时该线程可以从阻塞状态解除继续运行,而同步队列中的其他节点持有的线程依然处于阻塞状态。独占模式同步器的功能主要由下面的四个方法提供:
1、acquire(int arg):申请获取 arg 个原子状态 status(申请成功可以简单理解为 status = status - arg)。
2、acquireInterruptibly(int arg):申请获取 arg 个原子状态 status,响应线程中断。
3、tryAcquireNanos(int arg, long nanosTimeout):申请获取 arg 个原子状态 status,带超时的版本。
4、release(int arg):释放 arg 个原子状态 status(释放成功可以简单理解为 status = status + arg)。
独占模式下,AQS同步器实例初始化时候传入的 status 值,可以简单理解为"允许申请的资源数量的上限值",独占模式下 arg 为 1。
独占锁的获取—— acquire
public final void acquire(int arg) { //如果申请资源失败并且新增一个独占类型的结点到同步队列成功,就中断该线程,否则直接返回 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这里面涉及到四个方法:
1、tryAcquire 方法需要子类重写,用于判断获取资源是否成功。
2、acquireQueued 方法会使放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
3、addWaiter 方法将 Node 结点加入到同步队列中,上面已经讲过。
4、selfInterrupt 方法会中断当前线程。
//此方法需子类重写,用于判断获取资源是否成功 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
// 中断当前线程 static void selfInterrupt() { Thread.currentThread().interrupt(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //标记是否成功获取到资源 try { boolean interrupted = false; //标记等待过程中是否中断过 for (;;) { //开始自旋,要么获取锁,要么中断 final Node p = node.predecessor(); //获得当前结点的前置结点 //如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) if (p == head && tryAcquire(arg)) { setHead(node); //获取成功,头指针移动到当前结点 p.next = null; // help GC failed = false; //修改资源获取标志 return interrupted; //返回中断标志,此处是跳出死循环唯一位置 } //如果当前结点不是头结点,或者当前结点是头结点但获取获取锁失败就进行下面操作 //判断获取锁失败的当前结点中的线程是否需要被阻塞(阻塞条件:前置结点的waitStatus为SIGNAL) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //挂起当前线程,程序不会继续执行 interrupted = true; //当前挂起的线程被中断后,修改中断标志为true } } finally { if (failed) //如果上面操作出现了异常,就取消当前结点 cancelAcquire(node); } }
如果获取锁成功那么就会返回,但是如果失败就会一直陷入到“死循环”中浪费资源吗?很显然不是,对于没有获取到锁的线程,会将其挂起。调用 shouldParkAfterFailedAcquire(p, node) 方法将有效前置结点的 waitStatus 设置为 SIGNAL,如果 shouldParkAfterFailedAcquire 设置成功后会调用 parkAndCheckInterrupt 方法来挂起当前线程。
//通过前置结点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //获取前置结点的状态 if (ws == Node.SIGNAL) //前置结点处于SIGNAL转态 return true; if (ws > 0) { //如果前置结点处于CANCELLED状态,就说明已经取消获取资源 do { node.prev = pred = pred.prev; //循环找到第一个非CANCELLED状态的结点 } while (pred.waitStatus > 0); pred.next = node; //忽略中间的所有CANCELLED状态的结点 } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置前置结点的状态为SIGNAL } return false; //前置结点状态不为SIGNAL }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //线程挂起,程序不会继续向下执行 // 根据 park 方法 API描述,程序在下述三种情况会继续向下执行 // 1. 被 unpark // 2. 被中断(interrupt) // 3. 其他不合逻辑的返回才会继续向下执行 //当线程满足上面三个情况之一时,会继续往下运行。 //如果线程在挂起的时候被中断,该方法会返回 true return Thread.interrupted(); }
当 acquireQueued 方法返回后,会继续执行 acquire 方法,可以看到,根据 acquireQueued 方法的返回值还会做一次自我中断,为什么已经获取到锁了还要这个中断操作呢?
public final void acquire(int arg) { //如果申请资源失败并且新增一个独占类型的结点到同步队列成功,就中断该线程,否则直接返回 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先要明确的是 java 中的中断操作是一种协作式中断,也就是说是否要中断一个线程不是别的线程说了算,而是本线程决定的,调用 interrupt 方法只是把中断标记设置成了 true,但线程不一定被中断了。在这里有如下原因:
1、当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted() 方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 false),并记录下来,如果发现该线程被中断过,就再中断一次。
2、线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
独占式获取锁的整个过程如下:
独占锁的释放—— release
public final boolean release(int arg) { //调用子类重写的tryRelease方法,尝试释放资源 if (tryRelease(arg)) { Node h = head; //记录头结点,需要释放资源的结点 //如果头结点不为空并且头结点不是初始化时的状态,其实就是SIGNAL if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒后继结点 return true; } return false; }
独占锁的释放过程比较简单,具体过程如下:
1、调用 tryRelease 方法尝试释放资源,具体方法由子类重写。如果成功就继续第二步,否则直接返回 false 表示释放资源失败。
2、如果头结点不为空,并且头结点的状态为 SIGNAL就唤醒后继结点,返回 true。
为什么头结点状态为 SIGNAL?
从上面的源码看,在独占式获取资源时,如果一个结点被挂起,那么必须满足两个条件:
1、该结点不是头结点或者获取锁失败。
2、shouldParkAfterFailedAcquire 方法返回值为 true。
而在 shouldParkAfterFailedAcquire 方法的源码中可以看出,如果返回的是 true,那么该结点的前置结点的状态一定是 SIGNAL。
独占式响应中断获取锁—— acquireInterruptibly
我们在上面独占式获取锁时就使用过 interrupt 方法来中断线程,但其实并没有起到作用,究其原因是 java 中的中断是协作式的,线程即使被告知需要中断,也会不断尝试获取锁。但 acquireInterruptibly 方法是确实可以响应中断的。
public final void acquireInterruptibly(int arg) throws InterruptedException { //如果线程已被中断,就抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试非阻塞式获取锁失败,就执行doAcquireInterruptibly if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
//独占式响应中断获取锁 private void doAcquireInterruptibly(int arg) throws InterruptedException { //基于当前线程新增一个独占的Node节点进入同步等待队列中 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { //获取当前结点的前置结点 final Node p = node.predecessor(); //如果前置结点时头结点,且获取资源成功 if (p == head && tryAcquire(arg)) { setHead(node); //设置头结点,即该节点出队 p.next = null; // help GC return; } //当前结点被挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //当挂起的结点被唤醒时,不再设置中断状态 //如果线程是因为中断而被唤醒,则直接抛出异常 throw new InterruptedException(); } } catch (Throwable t) { //捕获异常,取消结点 cancelAcquire(node); throw t; } }
可以看到,doAcquireInterruptibly 方法 acquireQueued 方法非常像,区别在于:
acquireQueued 方法中,线程在结束挂起状态时,无论是被唤醒还是被中断,都会继续循环获取资源,只不过如果是由于中断而结束挂起状态时,会将中断标志设置为 true。
doAcquireInterruptibly 方法中,如果线程被中断,那么不再设置中断标志,而是直接抛出异常并且在异常捕获时取消结点。
独占式超时限制获取锁—— tryAcquireNanos
如果线程在指定时间内获取到锁,那么就返回 true,否则返回 false。该模式主要问题是时间间隔的计算,在超时时间内,线程会尝试获取锁的,每次获取锁肯定有时间消耗,所以每次循环都需要重新计算超时的时限。当然,deadline 在第一次尝试获取锁时就确定了,是不变的。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果获取锁成功就直接返回true return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); //否则执行该方法 }
//doAcquireNanos方法用到的常量 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //超时时间内没有获取到锁,直接返回false if (nanosTimeout <= 0L) return false; //计算超时截止的最终时间,为系统纳秒 + 超时时限 final long deadline = System.nanoTime() + nanosTimeout; //以独占的方式加入到同步队列 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } //计算新的超时时间 nanosTimeout = deadline - System.nanoTime(); //如果超时,取消结点并返回false if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } if (shouldParkAfterFailedAcquire(p, node) && //判断最新的超时时限是否大于1000 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) //将线程挂起 nanosTimeout 长时间,时间到,自动返回 //在下一次循环时会返回false LockSupport.parkNanos(this, nanosTimeout); //如果线程在这期间被中断过,就抛出异常 if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { //线程被中断,取消结点 cancelAcquire(node); throw t; } }
计算好 deadline,然后每次循环重新计算 nanosTimeout = deadline - System.nanoTime() 从而可以判断是否超时。至于为什么要判断是否大于1000纳秒,可以看 JDK 的注释。
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */
就是说,1000 纳秒的时间限已经非常非常短暂了,没必要再执行挂起和唤醒操作了,不如让当前线程直接进入下一次循环。
独占模式的同步器的一个显著特点就是:头结点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头结点,原来所在结点会移除出同步等待队列,原来的队列长度就会减少1,然后头结点的第一个有效的后继结点继续开始竞争资源。
使用独占模式同步器的主要类库有:
1、可重入锁ReentrantLock
。
2、读写锁ReentrantReadWriteLock
中的写锁WriteLock
到此,独占模式获取锁已经全部学习完了。2020-08-03 23:29:29
共享(SHARED)模式中的"共享"的含义是:同一个时刻,如果有一个节点所在线程获取(acuqire)资源成功,那么它会解除阻塞被唤醒,并且会把唤醒状态传播到所有有效的后继节点(换言之就是唤醒整个同步等待队列中的所有有效的节点)。共享模式同步器的功能主要由下面的四个方法提供:
1、acquireShared(int arg):申请获取 arg 个资源(假设资源上限设置为 max,那么申请成功可以简单理解为 max = max - arg,当 max < 0 时申请会失败)。
2、acquireSharedInterruptibly(int arg):申请获取 arg 个资源,响应线程中断。
3、tryAcquireSharedNanos(int arg, long nanosTimeout):申请获取 arg 个资源,带超时的版本。
4、releaseShared(int arg):释放arg个资源(释放成功可以简单理解为 max = max + arg)。
//共享模式下获取资源 public final void acquireShared(int arg) { //获取资源失败,则加入同步队列 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
//共享模式下获取资源或者通不处理 private void doAcquireShared(int arg) { //新建一个共享结点封装当前线程 final Node node = addWaiter(Node.SHARED); boolean interrupted = false; //设置打断标记 try { for (;;) { final Node p = node.predecessor(); //如果前置结点为头结点 if (p == head) { //每一轮循环都会判断是否能成功获取资源 int r = tryAcquireShared(arg); // 设置头结点,并且传播获取资源成功的状态, // 然后任意一个节点晋升为头节点都会唤醒其第一个有效的后继节点, // 起到一个链式释放和解除阻塞的动作 if (r >= 0) { setHeadAndPropagate(node, r);//这个方法的作用是确保唤醒状态传播到所有的后继节点 p.next = null; // help GC return; } } // 判断获取资源失败是否需要阻塞,这里会把前驱节点的等待状态CAS更新为Node.SIGNAL if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); } }
// 设置同步等待队列的头节点,判断当前处理的节点的后继节点是否共享模式的节点,如果共享模式的节点, // 即 propagate大于0或者节点的waitStatus为PROPAGATE则进行共享模式下的释放资源 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //重复进行两次判断 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 当前节点(其实已经成为头节点)的第一个后继节点为null或者是共享模式的节点 if (s == null || s.isShared()) doReleaseShared(); //释放资源 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。