当前位置:   article > 正文

【JAVA AQS解析】_java aqs详解

java aqs详解

一、AQS是什么

AQS是 AbstractQueuedSynchronizer (抽象同步队列)的缩写,AQS内部提供了同步队列和条件队列实现锁的功能,开发过程中我们不会用到这个了,一般都是用到它的实现类。

继承AQS的子类

  1. ReentrantLock

  2. ReentrantReadWriteLock

  3. Semaphore

  4. CountDownLatch

另外 CyclicBarrier 内部维护一个ReentrantLock 对象控制线程同步功能,使得线程在达到一个屏障点时往下执行

 阻塞队列ArrayBlockingQueue 内部也维护了一个ReentrantLock锁对象,通过条件队列来控制队列中数据的读取

二、关于ReentrantLock锁的例子

 下面是关于ReentrantLock锁的一个例子

 提交5个任务到线程池执行

  1. public class ReentrantLockDemo {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = Executors.newCachedThreadPool();
  4. ReentrantLock reentrantLock = new ReentrantLock();
  5. for(int i= 0;i<5;i++){
  6. Lock lock = new Lock(reentrantLock ,"lock"+i);
  7. executorService.submit(lock);
  8. }
  9. }
  10. }
  11. class Lock implements Runnable{
  12. private ReentrantLock reentrantLock;
  13. private String name;
  14. public Lock(ReentrantLock reentrantLock ,String name) {
  15. this.reentrantLock = reentrantLock;
  16. this.name= name;
  17. }
  18. @Override
  19. public void run() {
  20. this.reentrantLock.lock(); // @1
  21. System.out.println("执行对象name是"+this.name+"的线程"+Thread.currentThread()+"获取到锁");
  22. }
  23. }

  运行下结果是

执行对象name是lock1的线程Thread[pool-1-thread-2,5,main]获取到锁

 可以看到只有一个任务运行到Sytem打印语句处

  1. public void run() {
  2. this.reentrantLock.lock();
  3. System.out.println("执行对象name是"+this.name+"的线程"+Thread.currentThread()+"获取到锁");
  4. this.reentrantLock.unlock(); // @2
  5. System.out.println("执行对象name是"+this.name+"的线程"+Thread.currentThread()+"释放锁");
  6. }

 我们再修改下run方法里面的代码看下结果是怎样

  1. 执行对象name是lock1的线程Thread[pool-1-thread-2,5,main]获取到锁
  2. 执行对象name是lock1的线程Thread[pool-1-thread-2,5,main]释放锁
  3. 执行对象name是lock4的线程Thread[pool-1-thread-5,5,main]获取到锁
  4. 执行对象name是lock4的线程Thread[pool-1-thread-5,5,main]释放锁
  5. 执行对象name是lock0的线程Thread[pool-1-thread-1,5,main]获取到锁
  6. 执行对象name是lock0的线程Thread[pool-1-thread-1,5,main]释放锁
  7. 执行对象name是lock2的线程Thread[pool-1-thread-3,5,main]获取到锁
  8. 执行对象name是lock2的线程Thread[pool-1-thread-3,5,main]释放锁
  9. 执行对象name是lock3的线程Thread[pool-1-thread-4,5,main]获取到锁
  10. 执行对象name是lock3的线程Thread[pool-1-thread-4,5,main]释放锁

 比较一下可以看到 当一个线程在@1处或得锁后不释放锁,其它线程卡在@1处,当获取到锁的线  程在@2处获得锁后,其它线程才会继续后面的流程。大胆地猜下其它线程是怎么卡在@1处的,

实际上没拿到锁的线程在执行循环逻辑,当获取到锁的线程释放锁后,其它线程满足退出循环的条件,然后下一个获取锁的线程就能继续往下走了,这也就是AQS的逻辑,AQS封装了上层抽象

三、AQS的阻塞队列与条件队列

接下来我们来看看AQS(AbstractQueuedSynchronizer)功能的实现

AQS有两个Node类型的对象,实现了链表的结构,

有一个 int类型的state字段,子类获取资源来判断tryAcquire是否成功

  1. Node节点

    

       AQS有个Node内部类 ,有下面这些属性

       两个节点标识 分别标记当前node节点是在共享模式还是独占模式下等待

      几个状态位常量

CANCELLED

由于超时或者中断,该 Node 中的线程处于取消状态

SIGNAL

当前 Node 的后继 Node 将被阻塞。当前 Node 释放锁或者被取消时,它要将后继 Node 唤醒,后面到的线程未抢到线程时,添加到队列中,将前驱节点状态标记为SIGNAL,等待被环境

CONDITION

条件队列的标记,当前node节点在条件队列中

PROPAGATE

传播 共享锁时用到

     前驱和后继节点 (两个Node对象)

     nextWaiter 条件队列指向下一个等待节点

     Node节点对应的线程对象,线程作为节点的一个属性,Node节点在AQS中作为链表结构的一        个元素,这样就能实现锁的功能

2  条件队列

    条件队列也是有前面的Node节点,构成的链表结构,因为只有nextWaiter 所以是个单链表结构,条件队列中的节点waitStatus是 CONDITION

   可以创建多个条件队列,比如阻塞队列中,存取数据,受到队列满,空的条件限制,达到条件后需要阻塞等待条件满足后才能继续操作,这就是通过条件队列实现的

  条件队列提供 休眠,唤醒的方法

三、独占锁的实现

     1获取锁

      acquire方法实现了独占锁功能,整体流程如下图所示

   

下面具体来看下源码怎么实现的

 acqire方法 tryAcquire由子类实现,如果获取资源(这里的资源指的是state字段比如ReentrantLock就是将state从0设置成1就成功了)成功,就不走&&后面的逻辑(后面的逻辑阻塞线程),执行业务逻辑

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

 子类获取资源失败后先调用addWaiter将当前线程包装成节点添加到AQS的阻塞队列中

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;//这里的pred是原来的tail节点
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

这里的逻辑如果tail节点不为null 当前节点前驱节点设置成原来tail节点,cas设置尾结点为当前节点,

tail节点设置成功后原来的tail节点的后继节点指向tail节点

如果tail节点为null的话,进入enq方法

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

 整体逻辑还是获取tail节点成功将当前线程添加到AQS阻塞队列后返回

 enq为什么还要判断tail不为null的情况,这是因为addWaiter方法是并发的,多个线程同时往队列里面添加节点可能同时进入enq逻辑,其中有一个线程可能先执行完enq逻辑从而tail不为null

    现在假设有两个线程ThreadA ThreadB 两个线程没有获取到资源,在调用addWaiter方法后AQS中阻塞队列是怎样的呢

第一个线程添加到队列

第二个线程添加到队列


 添加进队列后接下来调用的是acquireQueued方法,这里尝试对入队里的线程再获取锁(如果是第一个等待的节点,其它节点将前驱节点标记为SIGNAL,然后调用LockSupport.park将节点对应的线程休眠,等待unparkSuccessor唤醒 

2释放锁

前面说到,未获取到锁的任务,会添加到队列里面阻塞,那什么时候可以执行呢,答案显然是获取到锁的线程执行完任务后,释放锁再唤醒队列里面阻塞的任务,这在AQS里面就是release方法

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

释放锁的逻辑比较简单,成功释放资源后,调用unparkSuccessor方法,这个方法里面就实现了唤醒排在队列最前面的非取消状态节点

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next;
  17. if (s == null || s.waitStatus > 0) {
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)//这里就从tail节点一直往前找
  20. if (t.waitStatus <= 0)
  21. s = t;
  22. }
  23. if (s != null)
  24. LockSupport.unpark(s.thread);
  25. }

如果头结点的next节点是null 或者是取消状态,则从tail节点一直往前找,找到离head节点最近的非取消状态的节点,调用unpark唤醒线程

三、共享锁的实现

      前面说的独占锁,在成功获取锁的线程完成业务释放锁后,只能是阻塞队列里面最前面的节点获取到锁,接着再释放锁,队列后面的节点才能获得锁

     共享锁有点不一样,在释放资源后,阻塞队列里面等待的线程都有机会获取资源

    整体上流程跟独占锁差不多,都是未成功获取资源添加到AQS阻塞队列中,等待唤醒继续尝试获取资源,占有资源成功的线程完成业务逻辑后释放资源唤醒锁

   共享锁获取,释放锁的逻辑分别是acquireShared releaseShared

  1 获取共享锁

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

tryAcquireShared 尝试获取锁有子类实现,返回值小于0代表未获取到AQS中的资源,调用doAcquireShared方法

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. if (r >= 0) {
  11. setHeadAndPropagate(node, r);
  12. p.next = null; // help GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&
  20. parkAndCheckInterrupt())
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }

获取锁的流程,跟独占锁流程类似,同样是调用addWaiter将当前线程包装成节点添加到AQS阻塞队列,然后再次尝试获取锁,如果还是没获取成功则将前面节点状态标记成SIGNAL,这里看到在尝试获取锁成功后的处理逻辑跟独占锁的流程还是有不同的,这里我们后面来看下,我们先看下释放锁的流程

2 释放共享锁

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

tryReleaseShared 尝试释放资源,这里由子类实现 ,doReleaseShared实现释放锁的功能

  1. private void doReleaseShared() {
  2. for (;;) {//这里for循环的原因是共享锁可以唤醒阻塞队列里面的多个线程
  3. Node h = head;//这里是头结点
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue; // loop to recheck cases
  9. unparkSuccessor(h);//这里唤醒线程
  10. }
  11. else if (ws == 0 &&
  12. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  13. continue; // loop on failed CAS
  14. }
  15. if (h == head) // loop if head changed
  16. //头结点发生变化,有唤醒的线程重新获取到锁改变了头结点
  17. break;
  18. }
  19. }

这里面有一个for循环,跟释放独占锁的逻辑还是不一样的

这里明确两点共享锁释放后,阻塞队列里的等待线程都可以获取锁,所以这里有for循环,可以唤醒阻塞队列里面的多个线程,l另外要有个退出条件(不然不就一直循环下去了),这个条件就是head节点发生变化,也就是有线程重新获取了资源,可以执行业务逻辑,执行完成后又会重新释放锁,也就不用这个最开始获取锁的线程来执行唤醒任务了。

接下来我们再看看获取共享锁的逻辑

添加到阻塞队列后,如果发现自己是第一个等待的节点,会再尝试获取一下锁,获取锁的方法会返回一个整数,如果大于等于0代表获取成功

获取共享锁成功后调用setHeadAndPropagate方法

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node);//头结点设置为活动到锁的当前节点
  4. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  5. (h = head) == null || h.waitStatus < 0) {
  6. Node s = node.next;
  7. if (s == null || s.isShared())
  8. doReleaseShared();
  9. }
  10. }

 这个方法除了重新设置head节点外,还有一段doReleaseShared的逻辑(释放锁,唤醒阻塞队列里面的线程)

这里propagate 是尝试获取共享锁返回的一个变量,如果大于0代表还有资源可以让阻塞的线程运行,就不等待当前线程释放锁再去通知同步队列中阻塞的线程了。

四、条件队列

AQS 内部类 ConditionObject 有firstWaiter nextWaiter两个Node节点,组成了一个单链表结构

条件队列的应用场景是这样,在任务执行所需的条件不够时,任务阻塞,当条件重新满足时解除阻塞重新运行。生产者、消费者问题就是这样一个场景当队列里面数据为空时不能消费数据,消费者线程阻塞,生产者生成数据后队列不为空唤醒消费者,这里的条件就是队列数据是否为空

AQS条件队列提供了这样的实现,await方法阻塞线程,signal唤醒线程,要特别注意一定在调用await signal方法前必须先调用AQS的获取独占锁方法

1 await方法

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

await方法的整体流程

我们在具体细看下每一步的逻辑

     1  添加到条件节点

       当前线程包装成节点添加到条件列表尾部,调用的是addCondition 

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // If lastWaiter is cancelled, clean out.
  4. if (t != null && t.waitStatus != Node.CONDITION) {
  5. unlinkCancelledWaiters();
  6. t = lastWaiter;
  7. }
  8. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  9. if (t == null)
  10. firstWaiter = node;
  11. else
  12. t.nextWaiter = node;
  13. lastWaiter = node;
  14. return node;
  15. }

2 释放锁

int savedState = fullyRelease(node);

这里释放锁,让其它线程有机会获得锁来执行await 或者signal方法

3 刚添加进条件队列里面的节点判断是否在同步队列里面

  1. while (!isOnSyncQueue(node)) {
  2. LockSupport.park(this);
  3. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  4. break;
  5. }
isOnSyncQueue 返回fasle则阻塞,这就达到了阻塞的目的
  1. final boolean isOnSyncQueue(Node node) {
  2. if (node.waitStatus == Node.CONDITION || node.prev == null)
  3. // 刚添加到条件队列里面肯定是这个状态,条件队列里面没对prev这个属性设置
  4. // 刚执行await方法添加到条件队列的节点一般都是满足这个条件的返回false
  5. return false;
  6. if (node.next != null) // If has successor, it must be on queue
  7. return true;
  8. return findNodeFromTail(node);
  9. }

刚添加到条件队列里面的节点一般是返回fase,也就是会阻塞,那什么时候会解除阻塞呢,也就是什么时候调用unpark方法,我们想想前面说到的独占锁,获取到锁的线程会唤醒阻塞队列里面的等待的线程。

这里再说明一下,在调用await signal方法前必须先调用AQS的获取独占锁方法,先获取独占锁可以保证在调用signal方法时,线程在阻塞队列中等待唤醒

 4 唤醒任务重新获取独占锁

acquireQueued(node, savedState) 

这就是前面的逻辑了

2 signal signalAll方法

 signal方法主要逻辑是将条件队列中的节点转移到AQS同步队列中,然后释放锁,同步队列中等待的线程拿到锁重新运行,发现原来在条件队列的节点转移到同步队列中了,就退出await逻辑里面的while循环,重新尝试获取锁执行业务逻辑

signalAll是唤醒所有等待任务 具体实现是  doSignal  doSignalAll

  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)
  4. lastWaiter = null;
  5. first.nextWaiter = null;
  6. } while (!transferForSignal(first) &&
  7. (first = firstWaiter) != null);
  8. }

  1. private void doSignalAll(Node first) {
  2. lastWaiter = firstWaiter = null;
  3. do {
  4. Node next = first.nextWaiter;
  5. first.nextWaiter = null;
  6. transferForSignal(first);
  7. first = next;
  8. } while (first != null);
  9. }

都调用到了transferForSignal方法

  1. final boolean transferForSignal(Node node) {
  2. /*
  3. * If cannot change waitStatus, the node has been cancelled.
  4. */
  5. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  6. return false;
  7. Node p = enq(node);
  8. int ws = p.waitStatus;
  9. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  10. LockSupport.unpark(node.thread);
  11. return true;
  12. }

通过enq方法将节点添加到同步队列中

signal只唤醒一个是因为调用transferForSignal将节点从条件队列转移到同步队列中后返回true,while循环条件为false,所以只执行一次

signalAll能唤醒所有线程是因为不断将条件队列里面的节点转移到同步队列中,直到条件队列里面没有数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/149195
推荐阅读
相关标签
  

闽ICP备14008679号