当前位置:   article > 正文

Java 多线程 —— AQS 详解_多线程aqs

多线程aqs

引言

AQS 是AbstractQuenedSynchronizer 的缩写,抽象的队列式同步器,它是除了java自带的synchronized关键字之外的锁机制。是 JUC 下的重要组件。

相关产物有:ReentrantLock、CountDownLatch、Semaphore、ReadWriteLock等。

一、AQS的设计思想

AbstractQuenedSynchronizer 维护了一个 volatile int state 变量,代表共享资源。

若state 是0,代表资源空闲,当前线程将 0 改为 1,表示上锁,当前线程置为工作线程;

若state不为0,代表资源占用,当前线程依然会 acquire() 一个资源,如果恰好是当前的工作线程,那么state 累加,以此描述“重入性”;如果当前线程并不是工作线程,则会被安置在一个由AQS维护的资源等待队列。

AQS队列会让第一个线程Node自旋获取资源,而后面的线程,则通过 LockSupport.park(this) 方法将线程置为 WAITING 状态等待被唤醒。

如果第一个线程获取到了资源,那么就将它设置为队列的 head 节点,原 head 就会被移出队列。

AQS的设计中用到了模板方法模式,不同的资源共享机制如互斥或共享可以由子类自定义实现:

Exclusive:如ReentrantLock、
Share:如信号量、闭锁、读写锁等。

AQS 的另一个特点是自旋+CAS。

在请求资源和入列等操作中,经常会看到 for(;;) 、compareAndSetState、compareAndSetTail等操作,这与synchronized的实现有很大区别。

通过比较并设置的方式,可以有效提高资源获取的效率,但同时也会消耗额外的CPU资源。

二、两种资源访问策略的代表

在AQS中维护了一个 Node 节点,它有两种等待模式,同时也表示资源的两种不同的访问策略:

  1. /** Marker to indicate a node is waiting in shared mode */
  2. static final Node SHARED = new Node();
  3. /** Marker to indicate a node is waiting in exclusive mode */
  4. static final Node EXCLUSIVE = null;

由此,衍生出两类不同的子类实现,第一类是以ReentrantLock为代表的互斥锁,它在语义上与synchronized实现了相同的互斥性和可重入性;另一类是以闭锁CountDownLatch 为代表的线程同步工具。

2.1 ReentrantLock

首先 AQS 中的 state 为 0.

A 线程在执行 lock() 方法后,以独占方式 CAS state 为 1。AQS 会记录 A 线程为当前的独占线程,其他线程如果再尝试获取资源,就会进入等待队列,直到 A 线程调用 unlock() 方法,释放了资源,即 state 回归 0 状态。

在“重入性”方面,如果A线程第二次尝试取锁,state 会累加。也就是说,上锁的次数一定等于释放锁的次数。

2.2 CountDownLatch

CountDownLatch 翻译为 “闭锁”或“门闩”,这是一种非常好用的同步工具,可以延迟线程的进度直到终止状态。

与 ReentrantLock 不同的是,在构造 CountDownLatch 对象的时候,会先设定一个 state 大小:

CountDownLatch latch = new CountDownLatch(3);

这个 3 就是 state 变量的初始值,然后线程使用 countDown() 方法递减这个计数,直到 state = 0,放行所有 waiting 中的线程。

CountDownLatch 维护的 state 表示的是事件数量,当指定数量的事件执行完毕后,就会 unpark() 主调线程,继续后续动作。

在使用CountDownLatch时有一个误区是,state 的值就代表了线程的数量,认为我 state = 3 ,就需要 3 个线程去执行任务,其实,state = 10 也依然可以使用 一个线程去执行,关键要区分事件与并行任务的概念。

三、ReentrantLock 源码

作为补充 synchronized 的锁机制,ReentrantLock 显示锁的功能非常强大,但这里不打算全面分析ReentrantLock的奇技淫巧,而是从 lock() 方法出发,分析一下 AQS 是如何实现资源的锁定和等待队列的维护的。

3.1 acquire

acquire 是 AQS 的顶层入口,他表示获取锁资源。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

Doug Lea 的代码非常简洁,根本没有一句废话,就连代码结构也非常精简,如果是分析源码的话,我们可以尝试去改写一下这个方法,让其可读性更强一些:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg)) {
  3. Node newWaiter = addWaiter(Node.EXCLUSIVE);
  4. boolean needInterrupt = acquireQueued(newWaiter, arg);
  5. if (needInterrupt) {
  6. selfInterrupt();
  7. }
  8. }
  9. }

从方法中的一系列方法名和判断逻辑来看。

尝试获取资源,如果成功,则直接返回。如果不成功,addWaiter 添加一个独占模式的等待者,acquireQueued 以排队的方式去获取资源。

3.2 tryAcquire

tryAcquire 在 ReentrantLock 中有两种实现,分别是:

FairSync 中的公平锁实现;

NonfairSync 中的非公平锁实现

当然,公平与非公平并不是重点,就以非公平的实现来看一下,

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }
  17. return false;
  18. }

当前线程会 CAS state 0->1,或累加重入,成功返回true,失败返回false。

3.3 addWaiter

在acquire上锁操作失败后,会执行这个方法:

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

addWaiter ,添加一个等待者,它只完成一项工作,就是向等待队列中添加一个 Node:

1、将当前线程封装为一个队列 Node;

2、取得队列的尾节点 tail,并CAS 新的节点设置为新的 tail

3、设置新 tail 成功,直接返回

4、若设置新 tail 不成功,或者干脆,原tail 就不存在,执行 enq 方法,自旋操作以上步骤,直到成功。

enq方法是 enqueue 的缩写,意思是“使队列化”,它就是一个 while-true ,如果队列不存在,就创建一个队列,如果队列已经存在,就把 node 放到最后一个:

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

这里就用到了自旋操作,每次自旋都会获取当前的 tail 节点,避免在设置的过程中间被其他线程加塞,却又不知道。

刚进入方法的时候,肯定需要走初始化的逻辑,这会创建一个 空的 Node 节点作为 head,所以由此我们也知道,AQS 队列中的头结点实际上就是一个没有实际意义的功能型节点,里边是没有线程的,真正封装了线程的节点是从第二个节点开始。

总体来看,addWaiter 完全就是一个 do-while 循环,先执行一次 CASTail,失败后循环执行CASTail,直到成功后返回该 node,同时也是新的 tail 节点。

3.4 acquireQueued

在 addWaiter 添加了新的 tail 后,需要做哪些事情呢?acquireQueued!

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

这里需要说明一下,该方法的逻辑兼顾了中断的操作,如果对中断机制不太了解,可以暂时不去理会。

该方法同样是一个 while-true 循环,当且仅当,当前节点是队列中第二个节点(addWaiter中已经很明确,AQS队列中的head 节点就是一个空的 Node),并且 tryAcquire 成功,才会返回。在返回之前,仅做了一些队列的维护工作:设置新的head 节点。

如果没有“当且仅当”,那么执行 park:

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

也就是说,除了第二个节点以后的节点,都要进行 park,即线程切换为 WAITING 状态。

四、AQS acquire 流程

经过了上一节的源码分析,我们已经大概清楚了 lock() 方法调用之后发生的事情,接下来就需要总结一下 acquire 流程步骤,提炼一下 AQS 队列的工作原理:

总结

AQS 使用了大量的 CAS 操作,避免上锁,你在ReentrantLock中看不到一句 synchronized 。

通过CAS 和自旋的配合可以一定程度上提高同步代码的性能。

state 以 volatile 类型修饰,可以在多线程之间提供可见性。

ReentrantLock 和 CountDownLatch 对 state 的访问方式分为独占和共享两种,本文虽然没有解析 CountDownLatch 的源码,但通过上面源码的分析,可以想到其大致实现流程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/149116
推荐阅读
相关标签
  

闽ICP备14008679号