赞
踩
本文内容及图片代码参考视频:https://www.bilibili.com/video/BV12K411G7Fg/?spm_id_from=333.788.recommend_more_video.-1
AQS,全称AbstractQuenedSynchronizer,可以理解为抽象的队列同步器。所谓抽象就代表着这个类要被继承和复用,实际上AQS这个类也确实是Java实现多线程并发编程的中必不可少的一个基础类,JUC中许多工具类的内部都继承了AQS,而AQS的底层则广泛的应用了CAS的实现。下面我们从源码的角度来深入了解一下AQS这个类。
进入AQS类的内部我们先来了解其最重要的三个内部属性:
-
-
- private transient volatile Node head;
-
- private transient volatile Node tail;
-
- private volatile int state;
从AQS的名字中我们就可以看出AQS内部会维护一个队列,实际上 AQS内部也确实是将竞争资源对象的线程都排成了一个FIFO队列,而这里的head与tail就代表了这个队列的对头与队尾。值得一提的是head实际上并不会真的指向队列中的第一个线程,它的下一个结点才是队列中的第一个线程。而这里的state则是代表了当前资源对象的锁状态,用于判断当前资源对象是否正在被占用,使用volatile实现其线程之间的可见性。这三个内部属性构成了AQS的简单架构,如下图所示:
可以看出,AQS内部维护一个先进先出的队列,用以记录所有等待调用资源对象的线程,而对象资源的占用状态则用state表示,从之前的内部属性我们可以看出AQS是用Node来实现队列,那么接下来让我们看一下AQS内部是如何定义这个Node的:
- static final class Node {
- /** Marker to indicate a node is waiting in shared mode */
- static final Node SHARED = new Node();
- /** Marker to indicate a node is waiting in exclusive mode */
- static final Node EXCLUSIVE = null;
-
- /** waitStatus value to indicate thread has cancelled */
- static final int CANCELLED = 1;
- /** waitStatus value to indicate successor's thread needs unparking */
- static final int SIGNAL = -1;
- /** waitStatus value to indicate thread is waiting on condition */
- static final int CONDITION = -2;
- /**
- * waitStatus value to indicate the next acquireShared should
- * unconditionally propagate
- */
- static final int PROPAGATE = -3;
-
-
- volatile int waitStatus;
-
-
- volatile Node prev;
-
- volatile Node next;
-
- volatile Thread thread;
-
- Node nextWaiter;
-
- /**
- * Returns true if node is waiting in shared mode.
- */
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
-
- /**
- * Returns previous node, or throws NullPointerException if null.
- * Use when predecessor cannot be null. The null check could
- * be elided, but is present to help the VM.
- *
- * @return the predecessor of this node
- */
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
-
- Node() { // Used to establish initial head or SHARED marker
- }
-
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
-
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
从Node的代码里面我们可以看出以下信息:
①Node有两种状态,共享与独占;
②对于每一个Node都会有两个指针,分别指向自己的前后结点,因此可以看出AQS维护的这个FIFO队列内部其实是用双向链表实现的;
③每个Node结点都有一个等待状态waitStatus用以表示当前结点的线程状态,这是一个枚举值,取值范围为: CANCELLED = 1;SIGNAL = -1;CONDITION = -2;PROPAGATE = -3,对于这几个枚举值我们对其解释如下:
SIGNAL:表示该节点的后继节点被阻塞(或即将被阻塞),因此当前节点在释放或取消时必须解除其后继节点的阻塞。
CANCELLED:表示由于超时或中断,此节点被取消,节点永远不会离开这种状态。取消了节点的线程不会再阻塞其它线程。
CONDITION:表示此节点当前位于条件队列中,可以理解为它关联了某个条件对象。在传输之前,它不会被用作同步队列节点,传输时状态将被设置为0。(此处使用该值与该字段的其他用途无关,只是简化了机制。)
PROPAGATE:表示共享模式的结点释放资源对象后应该将该信息传播给其它结点。
而在初始化waitStatus时,会默认将其赋值为0,表示普通的同步结点,对条件队列则使用CONDITION。它们的修改过程都是CAS的。
以上就是Node内部类中比较重要的几个信息,当然它还有一些其它方法,如获取每个node的前置结点,记录当前线程信息等方法,上述代码中已有展示,在此不再一一叙述。
在我们基本上了解了AQS内部主要的属性,数据结构和架构后,接下来我们就要探究一下AQS具体是怎么使用我们说到的state,Node和FIFO实现多线程的同步管理的。
首先我们要知道,实现多线程同步的方法是对资源对象进行加锁,而加锁的情况可以分为两种,一种是线程尝试对资源对象加锁,不管成功与否都直接返回结果;一种是强行加锁,如果加锁不成功,就一直等待直到加锁成功。而AQS中正好也是使用tryAcquire和acquire这两个方法实现这两种不同的加锁情况的,下面我们来看一下具体实现的源码;
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
上面代码中我们可以看出AQS对尝试加锁tryAcquire方法的描写非常简单,里面只有一行简单的抛出异常,这是为了让开发人员去继承重写这个方法,在重写的方法里面我们可以加上自己的业务逻辑代码,而对于加锁acquire方法,AQS直接使用final修饰,意味着开发人员只需要去调用我这个方法就行,不需要去修改,其对自己的这个方法充满信心,那么让我们来仔细研究一下这个方法的具体实现吧。
首先在进入acquire方法后我们会进行判断,如果当前线程通过tryAcquire方法尝试获取锁成功,那么就不需要加锁,直接退出,如果尝试加锁失败就会进入addWaiter(Node.EXCLUSIVE)这个方法中,这个方法看名字应该是把当前结点加入等待队列的意思,那么让我们看一下它的具体实现吧:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
进入addWaiter方法,我们首先看到它为当前线程创建了一个Node结点,并传入它是共享还是独占模式的,接下来又创建了一个Node结点pred,并让它指向了FIFO的尾结点,然后判断pred是否为空,如果不为空,那么就将代表当前线程的结点node的前指针指向pred,然后通过CAS操作将当前结点置为尾结点,即入队操作,如果CAS操作成功,则将pred结点尾指针指向node,完成完整的入队操作,返回node即可。但如果CAS操作失败,则代表当前的尾结点发生了变化,pred指向的已经不是尾结点,那么入队操作失败,执行完整的入队操作enq方法。
在入队操作中,如果CAS操作失败,或者一开始的队列为空都需要执行enq完整的入队方法,下面我们看一下完整入队方法的实现:
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
可以看出,完整的入队方法enq里面是一个自旋操作,首先我们创建一个新结点t指向队尾,如果队尾为空,那我们就用CAS操作创建队头,然后让队尾指向队头结点,之后执行的入队方法就与addWaiter中的操作类似,使用CAS操作修改队尾,自旋直到成功后返回即可。
到此,我们就完全搞明白了acquire方法中执行入队操作的addWaiter方法的执行流程,那么回归acquire方法,在执行完入队操作后,我们就需要对这些入队的线程进行管理,接下来就会执行acquireQueued方法,下面我们来看一下acquireQueued方法的实现:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
在这个方法中,我们看到AQS定义了默认为true的failed和默认为false的interrupted属性,接下来我们来看看AQS是怎么操作的,首先,程序会进入一个自旋,AQS定义一个Node结点p指向当前node的前置结点,如果node的前置结点是头结点,并且node结点尝试获取锁成功,那么我们将当前结点设置为头结点然后修改failed值为false,并返回值为false的interrupt额度,代表当前结点获取锁没有被中断。如果p确实为头结点,那么这里就会自旋等待当前结点获取锁。
若p结点不是头结点,这里我们就要进shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,并修改interrupted为true,代表等待锁过程中被中断过,而如果获取锁的过程中出现异常等操作进入finally中,则会取消当前结点加锁的行为。
下面让我们再来看一下shouldParkAfterFailedAcquire与parkAndCheckInterrupt方法都做了什么吧:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
首先是shouldParkAfterFailedAcquire方法,这个方法是判断当前结点是否需要挂起(park),进入这个方法后会先获取当前结点前置结点的waitStatus,记为ws。如果ws值为SIGNAL,代表其前置结点也在拿锁,且其拿到锁在准备释放时会唤醒它后面的结点,因此当前结点就是安全的,可以被挂起;但如果ws>0,代表前置结点处于CANCEL,即被取消状态,那么就需要不断的向前寻找,直到找到第一个ws不是CANCEL状态的结点,此时当前结点的前置结点已经发生了变化,因此对其返回false,不允许挂起当前结点,要求其按照新的前置结点,再执行以此外层的判断。
如果前置结点的状态不是SIGNAL或者CANCEL,那么我们就通过CAS的操作把前置结点的状态更改为SIGNAL,然后返回false在此执行外层逻辑。
通过shouldParkAfterFailedAcquire方法我们判断当前结点能不能被挂起,如果返回false则不能挂起,继续执行外层逻辑,如果能挂起,那我们就需要执行parkAndCheckInterrupt执行具体的挂起操作,其内部代码如下:
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
这个方法很简单,它调用一个LockSupport.park(this)方法实现对当前结点的挂起操作,而这个方法是通过unsafe方法调用native方法使用操作系统原语实现挂起的。在挂起期间,当前结点会在此阻塞,直到其它结点释放锁唤醒它。在park期间,当前结点是不会响应中断的,所以在当前结点被唤醒后,需要返回 Thread.interrupted();去判断它挂起期间是否有被中断过。
在了解完这两个方法后,我们基本上就了解了整个AQS加锁的过程,但其中还有一个问题没有解决,那就是所得释放问题,与加锁对应,释放锁也有两个方法tryRelease和release方法,具体如下:
- protected boolean tryRelease(int arg) {
- throw new UnsupportedOperationException();
- }
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
从代码中我们可以看出,tryRelease与tryAcquire类似,都是用于让开发人员继承去实现具体的业务逻辑的,而release方法则是关键的释放代码,在release方法中,首先尝试释放锁,如果释放则直接返回false,若成功,则需要去唤醒后面的其它结点,具体方法为unparkSuccessor(h);其内部实现如下:
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
-
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
在这个方法里面,它首先会获取头结点的waitStatus,记为ws。如果ws<0,则使用CAS操作将其置为0,表示锁已经释放。然后创建Node结点指向队中第一个结点,记为s,如果s不存在,代表当前队列没有结点了,或者队中第一个结点的状态>0,及为CANCEL,那么先将s置为null,然后从后往前扫描整个队列,找到距离head最近的状态不是CANCEL的结点,然后唤醒它就可以了。这里需要说的是,为什么从后向前扫描,这是因为加入队列是我们是先让当前结点前指针指向记录的尾结点,然后修改记录的尾结点指向当前结点这个顺序的,这个操作在多线程情况下是线程安全的,因为两个结点的相对位置是固定的,而在扫描时,从前向后就可能出现线程不安全的情况。
在前面两节内容中我们介绍了AQS中主要的内部属性和加锁,释放锁的过程,现在,我们对其进行总结如下:
AQS的内部维护了一个Node结点组成的FIFO队列,用来控制多线程等待获取资源对象,使用state属性记录资源对象的锁占有状态。
当多个线程竞争一个资源对象时,AQS提供两种加锁方式,第一种tryAcquire方法是用来让开发人员继承实现业务逻辑的,而acquire方法则是实现加锁的核心操作。在acquire方法中,首先会调用tryAcquire方法判断能否直接加锁,若成功则直接返回,否则进入addWaiter方法,在addWaiter方法中,首先创建Node结点指向当前FIFO队列的队尾,创建Node结点获取当前线程及其独占共享状态,之后判断如果队尾不为空则使用CAS的操作将当前结点置为队尾,修改当前结点的前指针为之前记录的队尾,将记录的队尾的后指针指向当前结点,然后返回当前结点即可。如果当前记录的队尾为空或者CAS操作失败,进入完整的enq入队方法,完整的enq方法里面是一个自旋操作,它会判断如果队尾为空,则创建一个队头,让队尾指向队头,然后自旋执行与addWaiter方法中类似的CAS操作,将当前结点入队即可。
在完成入队操作后,AQS通过acquireQuened方法管理队列,acquireQuened方法中初始设置faild为true与interrupted为false,创建结点p执行当前结点的前指针,如果p为头指针且对当前结点尝试加锁成功,则将当前结点置为头指针,然后返回interrupted表示当前获取锁过程是否有中断;当分析p不是头指针或者尝试获取锁失败后,需要考虑当前结点是否需要挂起,使用shouldParkAfterFailedAcquire与parkAndCheckInterrupt方法判断并执行挂起操作。shouldParkAfterFailedAcquire方法内部通过判断结点的waitStatus决定当前结点能否被挂起,而parkAndCheckInterrupt方法使用lockSupport.park执行挂起操作,唤醒后返回Thread.isInterrupted方法判断挂起期间是否有被中断过。
在释放锁时AQS与加锁类似,release方法调用unparkSuccessor方法唤醒下一个非CANCEL状态的结点去尝试获取锁。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。