当前位置:   article > 正文

JAVA多线程基础篇-AQS(AbstractQueuedSynchronizer)_java aqs

java aqs

1.概述

AQS(AbstractQueuedSynchronizer)抽象队列同步器,是一种用来构建锁和同步器的框架。在JAVA,AQS是其它同步组件的基础框架(ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、SynchronusQueue等),它是JUC并发包中的核心组件。它不仅解决了在实现同步器时涉及的大量细节问题(自定义标准同步状态、FIFO同步队列等),还充分考虑了伸缩性,降低了上下文开销、提高了吞吐量。本文将从解析AQS设计原理及源码,帮助大家更好地来理解AQS。

2.AQS原理解析

2.1 CLH队列

在了解AQS的原理之前,首先需要了解一个基础概念CLH(Craig, Landin, and Hagersten locks)队列。 CLH是一种基于链表的可扩展、高性能、公平的自旋锁,它的队列中每个节点等待前驱节点释放锁,当前置节点执行完成,才会唤醒后置节点,这样最多只有后置节点和新进入的线程来抢占CPU资源,其余线程处于阻塞状态,极大地较少了CPU开销。CLH同步队列的结构图如下图所示:
在这里插入图片描述
在CLH队列中,每个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prve)、后置节点(next)。前驱节点指向当前节点的前一个节点,后置节点指向当前节点的后一个节点,形成双向链表。

2.1.1 CLH入队

由上述CLH队列的结构图可知,当新节点入队时,首先需要将tail指向新节点,同时将新节点的prve指向队列中旧的tail节点,还要将旧的tail节点的next指向新节点,以此来建立联系。具体如下图所示:
在这里插入图片描述

2.1.2 CLH出队

CLH队列出队遵循FIFO(First In First Out,先进先出)原则,首节点是获取到锁的节点,首节点的线程释放锁之后,会唤醒它的后继节点(next),而后继节点在获取到锁时会将自己设置为首节点。具体如下图所示:

在这里插入图片描述

2.2 AQS核心思想

AQS通过模板方法模式定义了同步组件语义,它的核心包括:同步队列、独占式锁的获取和释放,共享锁的获取和释放,以及可中断锁,超时等待锁获取等实现。AQS使用了一个int成员变量来表示同步状态(state),同时使用Node实现FIFO队列,用于构建锁和其它同步装置;AQS资源共享方式包括独占式锁(Exclusive)和共享锁(Share),独占式锁和共享式锁一般不会一起使用,ReentrantReadWriteLock也是通过两个内部类(读锁和写锁)分别实现了两套API。AQS中获取锁的方式分为独占式(Exclusive)和共享式(Share),独占式指的是只允许一个线程获取同步状态锁,当这个线程没有释放同步状态锁时,其它线程是不能获取锁的,只能加入到同步队列中去。共享式锁允许多个线程同时获取同步状态锁,很多读锁都是共享式锁。
AQS核心设计思想就是共享变量state和FIFO队列,state被关键字volatile修饰,当state>0表明当前对象锁已经被占有,其它线程再次加锁就会失败,会被放入FIFO队列中,线程会被底层UNSAFE.pake()阻塞挂起,等待其它获取锁的线程释放锁后线程才能被唤醒。具体原理如下图所示:
在这里插入图片描述

2.3 AQS源码探析

2.3.1 属性

AQS的核心属性主要如下:


//同步队列头节点
private transient volatile Node head;

//同步队列尾节点
private transient volatile Node tail;

//同步资源状态,当state值大于0时,表明已被线程持有
private volatile int state;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

CLH队列主要基于内部类Node实现,Node类的属性如下:

//独占锁标识
static final Node SHARED = new Node();
//独占锁标识   
static final Node EXCLUSIVE = null;
//常量,waitStatus为1时,表明失效
static final int CANCELLED =  1;
//常量,waitStatus为-1时,表明后续节点需要被唤醒    
static final int SIGNAL    = -1;
//常量,waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)    
static final int CONDITION = -2;
//常量,waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下,PROPAGATE状态的线程处于可运行状态   
static final int PROPAGATE = -3;
//Node对象存储上述常量的地方,      
volatile int waitStatus;
//存储当前节点的前一个节点(前置节点)      
volatile Node prev;
//存储当前节点的后一个节点(后置节点)
volatile Node next;
//当前node绑定的线程
volatile Thread thread;
//等待队列中的下一个节点
Node nextWaiter;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.3.2 独占锁

1. acquire(int arg)

  public final void acquire(int arg) {
  //tryAcquire再次尝试获取锁,如果尝试成功,返回true,
        if (!tryAcquire(arg) &&
        //获取锁资源失败后,需要将当前线程封装成一个Node节点,追加到CLH队列中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //线程中断
            selfInterrupt();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.tryAcquire(int arg)
模板方法,获取锁,具体实现以ReentrantLock中公平锁和非公平锁为例。

  protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
  • 1
  • 2
  • 3

ReentrantLock关于tryAcquire(int arg)方法的实现如下:

//公平锁
  protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            //公平锁相对于下面非公平锁,此处多了一个hasQueuedPredecessors()方法判断,该方法判断队列中是否有等待线程,若无,才会进行CAS加锁,若队列中有元素,返回false,表示获取锁失败
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }


//非公平锁
final boolean nonfairTryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取AQS的state状态,
            int c = getState();
            //如果state为0,表示锁已经被释放,可以尝试再次获取锁资源
            if (c == 0) {
            	//CAS尝试修改state
                if (compareAndSetState(0, acquires)) {
                   //修改state成功,获取到锁,修改ExclusiveOwnerThread属性为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判断占有锁资源的线程是否是当前线程,如果是,执行锁重入操作
            else if (current == getExclusiveOwnerThread()) {
            	//将state+1
                int nextc = c + acquires;
                //如果state+1<0,可能state的值为Integer.MAX_VALUE,+1就变为负数,超出锁可重入的最大值,抛出Error
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //重新设置state的值
                setState(nextc);
                //表示锁冲入成功
                return true;
            }
            return false;
        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

3.acquireQueued(final Node node, int arg)

//已经将node加入到双向队列中,然后执行当前方法
   final boolean acquireQueued(final Node node, int arg) {
        //failed属性唯一可能变为true,是JVM出现异常
        boolean failed = true;
        try {
            // 
            boolean interrupted = false;
            //死循环
            for (;;) {
            //获取当前节点的前置节点
                final Node p = node.predecessor();
                //如果当前节点的前置节点是head,再次尝试获取锁资源,成功返回true,失败返回false
                if (p == head && tryAcquire(arg)) {
                //设置head节点为当前节点,将thread的prev设置为null,已经拿到了锁资源,就与队列无关了
                    setHead(node);
                    p.next = null; // help GC
                    failed = false; //将标识修改为false
                    return interrupted; //返回标识
                }
                //保证上一个节点的状态是-1(是否是SIGNAL),才会返回true,
                if (shouldParkAfterFailedAcquire(p, node) &&
                //阻塞线程,获取锁资源(基于Unsafe类的park方法)
                    parkAndCheckInterrupt()) //此处是failed属性可能变为true的地方,JVM内部出现异常导致,所以finally中代码执行的概率很小,约等于0
                    interrupted = true;
            }
        } finally {
            if (failed)
            // 取消当前节点锁竞争,并寻找当前节点的有效节点
                cancelAcquire(node);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

4.selfInterrupt()
模板中的线程中断方法。

 static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
  • 1
  • 2
  • 3

5. addWaiter(Node mode)
此方法表明前面获取锁资源失败,放入队列中等待。

private Node addWaiter(Node mode) {
    //创建Node节点,并且设置thread为当前线程,设置为排他锁或共享锁
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //获取队列尾部节点(node)
    Node pred = tail;
    //如果tail不为null,表明队列中元素,
    if (pred != null) {
    //将当前节点的prev,设置为刚才的尾部节点
        node.prev = pred;
        //基于CAS将当前节点设置为新的尾节点
        if (compareAndSetTail(pred, node)) {
        //将旧的尾节点的next属性,设置为当前节点
            pred.next = node;
            return node;
        }
    }
    //进入此方法,表明队列中现在无元素,需要构建队列(头节点和队列元素)
    enq(node);
    return node;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

6.enq(final Node node)

private Node enq(final Node node) {
//死循环
        for (;;) {
        //重新获取尾节点
            Node t = tail;
            if (t == null) { // Must initialize
            //尾节点不存在,当前元素是队列中第一个元素,构建一个新的Node节点作为头节点(此node节点无意义)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            //队列中有元素,将当前节点设置为新的尾节点,
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                //将旧的尾节点的next属性,设置为当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

7.tryRelease(int arg)
释放锁节点:

  protected boolean tryRelease(int arg) {
   throw new UnsupportedOperationException();
}
  • 1
  • 2
  • 3

8.isHeldExclusively()
判断是否是独占锁获取:

protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
  • 1
  • 2
  • 3

9.release(int args)

//独占式释放锁资源
  public final boolean release(int arg) {
  //调用tryRelease方法释放锁资源
        if (tryRelease(arg)) {
        //获取头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
            //独占模式释放锁资源
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

//该方法先获取头节点的后继节点,后继节点不为空时调用unpark()方法进行唤醒
//每一次唤醒节点都是唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程
private void unparkSuccessor(Node node) {
  
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        Node s = node.next;
        //此处表明头节点的下一个节点已经失效或者状态已经失效,向前寻找一个未失效的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //向前寻找一个未失效的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

2.3.3 共享锁

共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。

1.acquireShared(int arg)
共享式获取同步状态:

  public final void acquireShared(int arg) {
  //调用tryAcquireShared(int arg)方法尝试获取同步状态,成功则直接返回
        if (tryAcquireShared(arg) < 0)
        //失败则通过doAcquireShared()中的park()进入等待队列,直到被unpark()/interrupt()并成功获取到资源才返回(整个等待过程也是忽略中断响应)
            doAcquireShared(arg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.doAcquireShared(int arg)

//
 private void doAcquireShared(int arg) {
        //新增一个共享式节点并设置为尾节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //死循环
            for (;;) {
            //获取当前节点的前置节点(旧的尾节点)
                final Node p = node.predecessor();
                //判断旧的尾节点是否是头节点,若是头节点,表明当前队列中只有一个元素,尝试获取锁
                if (p == head) {
                //共享式获取锁
                    int r = tryAcquireShared(arg);
                    //获取锁成功
                    if (r >= 0) {
                    //将head指向当前节点,并唤醒head节点之后的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //与上述acquireQueued意义相同
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

3.tryAcquireShared(int arg)

  protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
  • 1
  • 2
  • 3

4.tryReleaseShared(int arg)

 protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
  • 1
  • 2
  • 3

5.releaseShared(int arg)
releaseShared()用于共享模式下线程释放共享资源,释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。独占式下tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程;共享模式下,拥有资源的线程在释放资源后就可以唤醒后继节点。

  public final boolean releaseShared(long arg) {
  //尝试释放资源
        if (tryReleaseShared(arg)) {
        //唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6.doReleaseShared()
该方法主要唤醒后继节点,当state为正数,去获取剩余共享资源;当state=0时去获取共享资源。

 private void doReleaseShared() {
        for (;;) {
         //获取头节点
            Node h = head;
            //判断队列中是否有元素
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                        //唤醒后继
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

3.小结

1.AQS和CAS是JAVA并发编程的基石,是JUC构建锁和同步器的框架;
2.AQS的核心是共享变量state和CLH队列(FIFO队列),CLH队列保证资源竞争公平性;
3.AQS资源共享方式包括独占式锁(Exclusive)和共享锁(Share),用于处理不同情况下的资源访问。

4.参考文献

1.https://www.bilibili.com/video/BV1oF41147kR
2.https://juejin.cn/post/6844903601538596877
3.https://www.jianshu.com/p/c89a3e76c9c0

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/149145?site
推荐阅读
相关标签
  

闽ICP备14008679号