当前位置:   article > 正文

Java并发编程AQS详解_aqs java

aqs java

本文内容及图片代码参考视频:https://www.bilibili.com/video/BV12K411G7Fg/?spm_id_from=333.788.recommend_more_video.-1

AQS,全称AbstractQuenedSynchronizer,可以理解为抽象的队列同步器。所谓抽象就代表着这个类要被继承和复用,实际上AQS这个类也确实是Java实现多线程并发编程的中必不可少的一个基础类,JUC中许多工具类的内部都继承了AQS,而AQS的底层则广泛的应用了CAS的实现。下面我们从源码的角度来深入了解一下AQS这个类。

1.AQS的内部属性

进入AQS类的内部我们先来了解其最重要的三个内部属性:

  1. private transient volatile Node head;
  2. private transient volatile Node tail;
  3. private volatile int state;

从AQS的名字中我们就可以看出AQS内部会维护一个队列,实际上 AQS内部也确实是将竞争资源对象的线程都排成了一个FIFO队列,而这里的head与tail就代表了这个队列的对头与队尾。值得一提的是head实际上并不会真的指向队列中的第一个线程,它的下一个结点才是队列中的第一个线程。而这里的state则是代表了当前资源对象的锁状态,用于判断当前资源对象是否正在被占用,使用volatile实现其线程之间的可见性。这三个内部属性构成了AQS的简单架构,如下图所示:

可以看出,AQS内部维护一个先进先出的队列,用以记录所有等待调用资源对象的线程,而对象资源的占用状态则用state表示,从之前的内部属性我们可以看出AQS是用Node来实现队列,那么接下来让我们看一下AQS内部是如何定义这个Node的:

  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. static final Node SHARED = new Node();
  4. /** Marker to indicate a node is waiting in exclusive mode */
  5. static final Node EXCLUSIVE = null;
  6. /** waitStatus value to indicate thread has cancelled */
  7. static final int CANCELLED = 1;
  8. /** waitStatus value to indicate successor's thread needs unparking */
  9. static final int SIGNAL = -1;
  10. /** waitStatus value to indicate thread is waiting on condition */
  11. static final int CONDITION = -2;
  12. /**
  13. * waitStatus value to indicate the next acquireShared should
  14. * unconditionally propagate
  15. */
  16. static final int PROPAGATE = -3;
  17. volatile int waitStatus;
  18. volatile Node prev;
  19. volatile Node next;
  20. volatile Thread thread;
  21. Node nextWaiter;
  22. /**
  23. * Returns true if node is waiting in shared mode.
  24. */
  25. final boolean isShared() {
  26. return nextWaiter == SHARED;
  27. }
  28. /**
  29. * Returns previous node, or throws NullPointerException if null.
  30. * Use when predecessor cannot be null. The null check could
  31. * be elided, but is present to help the VM.
  32. *
  33. * @return the predecessor of this node
  34. */
  35. final Node predecessor() throws NullPointerException {
  36. Node p = prev;
  37. if (p == null)
  38. throw new NullPointerException();
  39. else
  40. return p;
  41. }
  42. Node() { // Used to establish initial head or SHARED marker
  43. }
  44. Node(Thread thread, Node mode) { // Used by addWaiter
  45. this.nextWaiter = mode;
  46. this.thread = thread;
  47. }
  48. Node(Thread thread, int waitStatus) { // Used by Condition
  49. this.waitStatus = waitStatus;
  50. this.thread = thread;
  51. }
  52. }

从Node的代码里面我们可以看出以下信息:

①Node有两种状态,共享与独占;

②对于每一个Node都会有两个指针,分别指向自己的前后结点,因此可以看出AQS维护的这个FIFO队列内部其实是用双向链表实现的;

③每个Node结点都有一个等待状态waitStatus用以表示当前结点的线程状态,这是一个枚举值,取值范围为: CANCELLED = 1;SIGNAL = -1;CONDITION = -2;PROPAGATE = -3,对于这几个枚举值我们对其解释如下:

SIGNAL:表示该节点的后继节点被阻塞(或即将被阻塞),因此当前节点在释放或取消时必须解除其后继节点的阻塞。

CANCELLED:表示由于超时或中断,此节点被取消,节点永远不会离开这种状态。取消了节点的线程不会再阻塞其它线程。

CONDITION:表示此节点当前位于条件队列中,可以理解为它关联了某个条件对象。在传输之前,它不会被用作同步队列节点,传输时状态将被设置为0。(此处使用该值与该字段的其他用途无关,只是简化了机制。)

PROPAGATE:表示共享模式的结点释放资源对象后应该将该信息传播给其它结点。

而在初始化waitStatus时,会默认将其赋值为0,表示普通的同步结点,对条件队列则使用CONDITION。它们的修改过程都是CAS的。

以上就是Node内部类中比较重要的几个信息,当然它还有一些其它方法,如获取每个node的前置结点,记录当前线程信息等方法,上述代码中已有展示,在此不再一一叙述。

2.AQS中的重要方法

在我们基本上了解了AQS内部主要的属性,数据结构和架构后,接下来我们就要探究一下AQS具体是怎么使用我们说到的state,Node和FIFO实现多线程的同步管理的。

首先我们要知道,实现多线程同步的方法是对资源对象进行加锁,而加锁的情况可以分为两种,一种是线程尝试对资源对象加锁,不管成功与否都直接返回结果;一种是强行加锁,如果加锁不成功,就一直等待直到加锁成功。而AQS中正好也是使用tryAcquire和acquire这两个方法实现这两种不同的加锁情况的,下面我们来看一下具体实现的源码;

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }
  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

上面代码中我们可以看出AQS对尝试加锁tryAcquire方法的描写非常简单,里面只有一行简单的抛出异常,这是为了让开发人员去继承重写这个方法,在重写的方法里面我们可以加上自己的业务逻辑代码,而对于加锁acquire方法,AQS直接使用final修饰,意味着开发人员只需要去调用我这个方法就行,不需要去修改,其对自己的这个方法充满信心,那么让我们来仔细研究一下这个方法的具体实现吧。

首先在进入acquire方法后我们会进行判断,如果当前线程通过tryAcquire方法尝试获取锁成功,那么就不需要加锁,直接退出,如果尝试加锁失败就会进入addWaiter(Node.EXCLUSIVE)这个方法中,这个方法看名字应该是把当前结点加入等待队列的意思,那么让我们看一下它的具体实现吧:

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

进入addWaiter方法,我们首先看到它为当前线程创建了一个Node结点,并传入它是共享还是独占模式的,接下来又创建了一个Node结点pred,并让它指向了FIFO的尾结点,然后判断pred是否为空,如果不为空,那么就将代表当前线程的结点node的前指针指向pred,然后通过CAS操作将当前结点置为尾结点,即入队操作,如果CAS操作成功,则将pred结点尾指针指向node,完成完整的入队操作,返回node即可。但如果CAS操作失败,则代表当前的尾结点发生了变化,pred指向的已经不是尾结点,那么入队操作失败,执行完整的入队操作enq方法。

在入队操作中,如果CAS操作失败,或者一开始的队列为空都需要执行enq完整的入队方法,下面我们看一下完整入队方法的实现:

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

可以看出,完整的入队方法enq里面是一个自旋操作,首先我们创建一个新结点t指向队尾,如果队尾为空,那我们就用CAS操作创建队头,然后让队尾指向队头结点,之后执行的入队方法就与addWaiter中的操作类似,使用CAS操作修改队尾,自旋直到成功后返回即可。

到此,我们就完全搞明白了acquire方法中执行入队操作的addWaiter方法的执行流程,那么回归acquire方法,在执行完入队操作后,我们就需要对这些入队的线程进行管理,接下来就会执行acquireQueued方法,下面我们来看一下acquireQueued方法的实现:

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

在这个方法中,我们看到AQS定义了默认为true的failed和默认为false的interrupted属性,接下来我们来看看AQS是怎么操作的,首先,程序会进入一个自旋,AQS定义一个Node结点p指向当前node的前置结点,如果node的前置结点是头结点,并且node结点尝试获取锁成功,那么我们将当前结点设置为头结点然后修改failed值为false,并返回值为false的interrupt额度,代表当前结点获取锁没有被中断。如果p确实为头结点,那么这里就会自旋等待当前结点获取锁。

若p结点不是头结点,这里我们就要进shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,并修改interrupted为true,代表等待锁过程中被中断过,而如果获取锁的过程中出现异常等操作进入finally中,则会取消当前结点加锁的行为。

下面让我们再来看一下shouldParkAfterFailedAcquire与parkAndCheckInterrupt方法都做了什么吧:

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park.
  7. */
  8. return true;
  9. if (ws > 0) {
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry.
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

首先是shouldParkAfterFailedAcquire方法,这个方法是判断当前结点是否需要挂起(park),进入这个方法后会先获取当前结点前置结点的waitStatus,记为ws。如果ws值为SIGNAL,代表其前置结点也在拿锁,且其拿到锁在准备释放时会唤醒它后面的结点,因此当前结点就是安全的,可以被挂起;但如果ws>0,代表前置结点处于CANCEL,即被取消状态,那么就需要不断的向前寻找,直到找到第一个ws不是CANCEL状态的结点,此时当前结点的前置结点已经发生了变化,因此对其返回false,不允许挂起当前结点,要求其按照新的前置结点,再执行以此外层的判断。

如果前置结点的状态不是SIGNAL或者CANCEL,那么我们就通过CAS的操作把前置结点的状态更改为SIGNAL,然后返回false在此执行外层逻辑。

通过shouldParkAfterFailedAcquire方法我们判断当前结点能不能被挂起,如果返回false则不能挂起,继续执行外层逻辑,如果能挂起,那我们就需要执行parkAndCheckInterrupt执行具体的挂起操作,其内部代码如下:

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

这个方法很简单,它调用一个LockSupport.park(this)方法实现对当前结点的挂起操作,而这个方法是通过unsafe方法调用native方法使用操作系统原语实现挂起的。在挂起期间,当前结点会在此阻塞,直到其它结点释放锁唤醒它。在park期间,当前结点是不会响应中断的,所以在当前结点被唤醒后,需要返回 Thread.interrupted();去判断它挂起期间是否有被中断过。

在了解完这两个方法后,我们基本上就了解了整个AQS加锁的过程,但其中还有一个问题没有解决,那就是所得释放问题,与加锁对应,释放锁也有两个方法tryRelease和release方法,具体如下:

  1. protected boolean tryRelease(int arg) {
  2. throw new UnsupportedOperationException();
  3. }
  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

从代码中我们可以看出,tryRelease与tryAcquire类似,都是用于让开发人员继承去实现具体的业务逻辑的,而release方法则是关键的释放代码,在release方法中,首先尝试释放锁,如果释放则直接返回false,若成功,则需要去唤醒后面的其它结点,具体方法为unparkSuccessor(h);其内部实现如下:

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next;
  17. if (s == null || s.waitStatus > 0) {
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)
  20. if (t.waitStatus <= 0)
  21. s = t;
  22. }
  23. if (s != null)
  24. LockSupport.unpark(s.thread);
  25. }

在这个方法里面,它首先会获取头结点的waitStatus,记为ws。如果ws<0,则使用CAS操作将其置为0,表示锁已经释放。然后创建Node结点指向队中第一个结点,记为s,如果s不存在,代表当前队列没有结点了,或者队中第一个结点的状态>0,及为CANCEL,那么先将s置为null,然后从后往前扫描整个队列,找到距离head最近的状态不是CANCEL的结点,然后唤醒它就可以了。这里需要说的是,为什么从后向前扫描,这是因为加入队列是我们是先让当前结点前指针指向记录的尾结点,然后修改记录的尾结点指向当前结点这个顺序的,这个操作在多线程情况下是线程安全的,因为两个结点的相对位置是固定的,而在扫描时,从前向后就可能出现线程不安全的情况。

3.总结

在前面两节内容中我们介绍了AQS中主要的内部属性和加锁,释放锁的过程,现在,我们对其进行总结如下:

AQS的内部维护了一个Node结点组成的FIFO队列,用来控制多线程等待获取资源对象,使用state属性记录资源对象的锁占有状态。

当多个线程竞争一个资源对象时,AQS提供两种加锁方式,第一种tryAcquire方法是用来让开发人员继承实现业务逻辑的,而acquire方法则是实现加锁的核心操作。在acquire方法中,首先会调用tryAcquire方法判断能否直接加锁,若成功则直接返回,否则进入addWaiter方法,在addWaiter方法中,首先创建Node结点指向当前FIFO队列的队尾,创建Node结点获取当前线程及其独占共享状态,之后判断如果队尾不为空则使用CAS的操作将当前结点置为队尾,修改当前结点的前指针为之前记录的队尾,将记录的队尾的后指针指向当前结点,然后返回当前结点即可。如果当前记录的队尾为空或者CAS操作失败,进入完整的enq入队方法,完整的enq方法里面是一个自旋操作,它会判断如果队尾为空,则创建一个队头,让队尾指向队头,然后自旋执行与addWaiter方法中类似的CAS操作,将当前结点入队即可。

在完成入队操作后,AQS通过acquireQuened方法管理队列,acquireQuened方法中初始设置faild为true与interrupted为false,创建结点p执行当前结点的前指针,如果p为头指针且对当前结点尝试加锁成功,则将当前结点置为头指针,然后返回interrupted表示当前获取锁过程是否有中断;当分析p不是头指针或者尝试获取锁失败后,需要考虑当前结点是否需要挂起,使用shouldParkAfterFailedAcquire与parkAndCheckInterrupt方法判断并执行挂起操作。shouldParkAfterFailedAcquire方法内部通过判断结点的waitStatus决定当前结点能否被挂起,而parkAndCheckInterrupt方法使用lockSupport.park执行挂起操作,唤醒后返回Thread.isInterrupted方法判断挂起期间是否有被中断过。

在释放锁时AQS与加锁类似,release方法调用unparkSuccessor方法唤醒下一个非CANCEL状态的结点去尝试获取锁。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/149201
推荐阅读
相关标签
  

闽ICP备14008679号