当前位置:   article > 正文

java并发编程笔记03_AQS详解_aqs java

aqs java

Java并发编程笔记03_AQS详解

了解Java并发编程的人应该都知道AQS的重要性,它可以说是整个JUC的基石吧,当然了,还有一个与其并驾齐驱的CAS,本篇文章是在拥有了CAS的基础上进行学习的AQS,CAS后续整理完毕再说吧…

首先,开篇感谢周阳老师的大厂面试题第三季视频,学习完毕之后颇有所得,在此附上链接。有兴趣的同学可以观看一波,本篇博客是在此视频的基础上自己又加了一下公平锁与相应中断的逻辑,如果写的不好,还望大佬们多多指教。本篇文章是以ReentrantLock来进行切入AQS的,并且是独占模式。好,废话不多说,让我们一起来了解AQS。

AQS(AbstractQueuedSynchronizer):抽象的队列同步器,它相当于一个框架,ReentrantLock就是基于AQS来实现的,还有其他的一些辅助类如:CountDownLatch、CyclicBarrier、Semaphore等都是基于AQS来实现的,由此可见AQS的重要性,那么我们先来看看AQS中都有什么

static final class Node { //1
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    volatile AbstractQueuedSynchronizer.Node prev;
    volatile AbstractQueuedSynchronizer.Node next;
    volatile Thread thread;
    AbstractQueuedSynchronizer.Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
        AbstractQueuedSynchronizer.Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}


private transient volatile Node head;//2
private transient volatile Node tail;//3
private volatile int state;//4

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

private static final boolean compareAndSetNext(Node node,
                                               Node expect,
                                               Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

以上是AQS中的大致的一些属性以及方法,当然一些重要的方法,我们会在随后提到。我们先来解释上面的一些属性及方法。

首先,一上来就是一个很明显的静态内部类Node

PS:AQS里面维护了一个双向链表,当然这句话如果要想成立的,前提是必须有线程来对共享资源进行争抢,并且有线程争抢资源失败的情况下,如果没有发生线程争抢并且失败的话,那么它永远不会有这个双向链表的存在。

这个双向链表是一个先进先出的(First In First Output),应该是对CLH来进行了一些改变。

由这个内部类Node来形成一个双向链表。如果线程来进行争抢资源并且争抢失败的情况下,我们争抢资源失败的线程就会被封装成一个一个的Node(就是Node里面有一个Thread thread 这么个属性,没有多复杂),之后越来越多的Node就形成了一个双向的链表。

Node里面的属性我们介绍完AQS之后再回来说明。

第二:private transient volatile Node head;

这是双向链表的头结点。

第三:private transient volatile Node tail;

这是双向链表的尾节点。

第四:private volatile int state;

这是线程来发生争抢,争抢的就是这个共享变量,这个变量就是一个int类型的,并且是被volatile修饰的。

PS:volatile的特点大家如果要是不明确的话,可以自己找找视频,或者帖子来研究一下在这里我简单说说它的特点吧,防止的接下来讲解大家的思路总是停留在这里

1、保证内存的可见性

2、不保证原子性

3、禁止指令重排

可见性我们可以简单理解,算了我还是画个图吧。

简单说明:就是线程1将state=0修改为1之后,state要是再想对state进行操作(比如+1)那么线程2就会拿到最新的state值(也就是线程1对state操作完成之后的结果state = 1)。

第五:compareAndSetState(int expect, int update)方法

该方法里面就是调用了unsafe.compareAndSwapInt,将期望值与希望修改的值传递进去,该操作是原子性的。目的就是改变state的值

第六:tryAcquire(int arg):尝试争抢资源

当调用ReentRantLock的lock方法后会走该方法,但是我们会发现AQS中该方法直接跑出了一个异常:

throw new UnsupportedOperationException();

也就是说子类必须要自己实现这个尝试获取锁的方法,如果你不实现该方法,那么父类就会直接给你抛出了一个异常

所以说AQS是一个框架,在这里不可能什么都给你写好了,如果要是这个方法写好了,那么可能也就没有上面那些辅助类什么事儿了~~

第七:tryRelease(int arg):尝试释放资源

当调用ReentRantLock的unLock方法后会走该方法,剩下介绍与上面一样,同第六条,也是直接抛异常。

AQS中的内容我们大致介绍到这里,下面介绍它的内部类Node

第一:
static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;
  • 1
  • 2
  • 3

我们可以将上面这两个当做一个枚举来看待。当为SHARED的时候,当前模式为共享模式,当为EXCLUSIVE时,当前模式为独占模式(也有人叫排他模式)

第二:表示当前Node的等待状态
volatile int waitStatus;
  • 1

具体它的等待状态有几种,请看下面

第三:waitStatus的取值
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
  • 1
  • 2
  • 3
  • 4

当前waitStatus的几种状态

​ 当状态值为1时,说明当前Node的状态为取消排队的状态,后续Node要进行出队的操作。

​ 当状态值为 -1时,表示线程已经准备好了,就等资源释放了,如果刚才那段话不能使你明白的话,你可以这样想:当前的双向队列中有很多的Node,当有一个Node的waitStatus为-1时,表示当前这个节点需要唤醒这个节点的后继节点,其实表示的就是当前节点的后继节点的状态,只有当前节点的状态为-1 时他才可以唤醒排在他后面的Node。如果还不明白,那么请看下面的图:

​ 当Node节点的waitStatus状态为-1时,例如节点1的waitStatus状态,那么说明节点2要想被唤醒,节点1的waitStatus状态要为-1。

​ 当状态值为-2时,表示节点在等待队列中,节点线程等待唤醒

​ 当状态值为-3时,当前线程处在SHARED情况下,该字段才会使用

当然,说完以上几种状态,大家就可以知道当前waitStatus的状态,默认Node的状态值就是0。表示初始化状态值。

第四:表示当前节点的前一个节点
volatile Node prev;
  • 1

从上面的图中我们就可以看出来

第五:表示当前节点的后继节点
volatile Node next;
  • 1

从上面的图中我们也可以看出来

第六:表示当前节点中的线程
volatile Thread thread;
  • 1

线程是被封装到Node中进行排队与唤醒的

以上就是Node内部的大致属性梳理,其中waitStatus在我们演示过程中的状态为0,-1,1,其他两种状态我们没有演示到

了解完以上的属性后,我们开始介绍AQS,其中我们按照一条线来介绍:

第一个线程抢资源,一次就抢到,第二个线程再来抢占资源,抢占未成功,进入等待队列,当第一个线程调用了unPark()之后,第二个正在等待的线程被唤醒。

我们还是按照周阳老师(上面提到)的情景来介绍(模拟一个银行办理业务的场景)

package cn.cmysz.aqs;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Thread threadA = new Thread(() -> {try {lock.lock(); System.out.println("我是一行字符串....");
                Thread.sleep(200000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        threadA.start();

        Thread threadB = new Thread(() -> {
            try { lock.lock(); System.out.println("我是B线程的一行字符串....");}
            finally {lock.unlock();}
        });
        threadB.start();

        Thread threadC = new Thread(() -> {
            try { lock.lock(); System.out.println("我是C线程的一行字符串....");}
            finally {lock.unlock();}
        });
        threadC.start();


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

对上图进行说明:

​ state为上面介绍的AQS的内部属性,也就是线程所要争抢的资源,默认初始值为0,线程A、B、C都想要将state的值变为 1

​ 初始的时候,银行办理业务的窗口没有人(Thread = null ),等待区(进入等待队列的Node,Node里面维护了线程,这之前我们提过)也没有人。

接下来我们正式进入该场景,银行开始办理业务(main方法开始执行),此时A线程办理业务(假设办理业务的时间为20分钟)

在A线程中我们可以看到调用了lock.lock()方法。现在我们点进去。

public void lock() {
    sync.lock();
}
  • 1
  • 2
  • 3

发现在里面没有任何逻辑,除了调用一个sync的lock方法,那么这个sync又是谁?

我们可以看到Sync是ReentrantLock的一个静态内部类,里面其实是调用了Sync的lock()方法,但是我们看到它里面的lock()方法是一个抽象的,未实现的,这是为什么?不知道大家还记不记得,ReentrantLock在我们创建对象的时候,在构造器里面可以传递true和false两个参数,ReentrantLock可以是公平锁也可以是非公平锁。

源码如下:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

当我们创建对象,不向构造器传递参数时,默认就是非公平的,当我们传递参数的时候,它会进行判断,使用的是三元运算符(什么是三元运算符就不介绍了…如果不懂什么是三元运算符…),本篇文章以非公平锁进行讲解

最终会调用非公平锁中的lock()方法,当A线程进入if判断中,首先就是使用的CAS来进行设置state的值。那么我们这时就会想这样一个问题:为什么使用的是CAS来进行设置的state状态,为什么不直接使用setState()方法?想象一下,在此处的if判断中有没有并发?相信你已经有答案了。所以我们要保证只有一个线程可以修改这个共享变量state的值,将 0 设置为 1。

假设我们当前要办理业务的A线程现在坐到了办理业务的窗口的那把椅子。那么就会进入到 if 的里面,将线程设置为当前线程(也就是A线程)那么这个方法是哪里的呢?

好吧,那我们再来看一下他们的继承关系吧!

abstract static class Sync extends AbstractQueuedSynchronizer 
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
  • 1
  • 2

这回应该知道了吧!

所以,现在就结束了~~,线程A已经坐在了办理业务的窗口啦!那么现在的状态就应该是这样!

所以就这样结束了吗?当然不是!还有线程B和线程C在虎视眈眈的盯着A呢!A的屁股如果要从椅子上离开,他们就会立即去抢的!**(非公平锁!知识点啊各位!)**他们从哪个地方开始去抢呢?肯定是等待区中(队列)出发喽~~(相信各位都去过银行办理业务 ~~)

现在线程B来了,线程B也调用了lock()方法,也会走到非公平锁的lock()方法,当然也会走到 if 判断中,但是它真的可以CAS设置state成功吗?答案是肯定的,肯定不可能设置成功的。如果忘记了为什么,那么我们再看一遍源码。

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

期望值是 0 要修改为 1 但是现在期望值还是0吗?显然不是,那么他就会走到else中调用acquire(1);

PS:此时都是在ReentrantLock这个类中进行的操作哦~~~

那么我们接下来看acquire(1)方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • 1
  • 2
  • 3
  • 4
  • 5

此时就进入了AQS中,以下三个方法都是AQS中的方法,我们可以看到这里面的 if 判断就调用了三个方法,那么这三个方法分别都是什么意思?

1、tryAcquire(arg):尝试争抢资源,如果争抢到了资源,那么就会返回true

2、addWaiter(Node.EXCLUSIVE):将当前线程添加到等待队列中,返回的就是一个Node(那么我们就可以凭借自己的想象,我们进去一个线程,给我们返回了一个Node,肯定是这里面将我们的线程进行了封装)

3、acquireQueued(final Node node, int arg):挂起当前线程,唤醒之后的相关逻辑。返回true表示挂起过程中线程被中断唤醒过,false表示未被中断过

当if判断中的逻辑都为true时,这时候就会调用 selfInterrupt(); 方法:线程争抢失败,并且挂起过程中线程被中断过。那么我们此时线程还有没有被中断?答:不知道,因为上面三个方法还没跟进去看。(但是此时我们假设没有被中断,因为如果加上响应中断逻辑就会稍微复杂一些,所以我们先一切从简开始)

首先看 第一个方法:tryAcquire()
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
  • 1
  • 2
  • 3

我们点击的时候发现直接抛出了一个异常,至于为什么抛出了一个异常?上面我们提到过~不知道各位还记不记得。因为这个方法是要求子类必须实现的。所以我们看非公平锁(NonfairSync中的 tryAcquire(int arg) )

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
  • 1
  • 2
  • 3

在方法中我们发现什么逻辑都没有,只是调用了 nonfairTryAcquire(acquires) 方法,那么我们接着点进去这个方法看。

注意:一定要看好这个方法是哪个类中的哪个方法,不要跟丢。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在这个方法中尝试竞争获取锁

首先获取当前线程(当前线程是B线程哦~),获取statue的值,进入到 if 判断,那么它的判断会成立吗?当然不会,因为此时statue的值已经被线程A设置成了 1 ,那么我们走到else if 再次进行判断,当前线程是否等于 getExclusiveOwnerThread() 返回的线程?

显然也不等于(至于为什么不等于,因为线程A已经将它设置成了自己,在哪设置的?? 还记得 AbstractOwnableSynchronizer这个类吗?)。

所以当前方法会返回false

靠!什么都不做!就是返回了一个false,别急,一会我们还会回到这里的~~

我们先来解释一下吧为什么会有这两个判断:

if (c == 0) { //极端运气好的情况,B刚刚要进去,A刚刚要走了,status这个值就从1变成了0

else if (current == getExclusiveOwnerThread()) { //如果A刚抬起屁股要走,B刚要坐下,A说我还要办业务(重入锁)
  • 1
  • 2
  • 3

那么现在我们又有疑问了:为什么在if(c == 0)里面设置state值时使用的CAS来进行的设置,但是在else If 中却不是使用的CAS,而是直接使用的setStatue()?

答案是:在if(c == 0)中有并发!在else if 中没有并发!那么为什么 在else If中不存在并发呢?如果你要是这么问的话,那说明你没有好好看我上面的代码!因为他进行了判断!如果当前线程不为getExclusiveOwnerThread()返回的线程那么它就不会进入这个else If的内部~~,所以你品~这里面有没有并发。

接下来看第二个方法:addWaiter(Node.EXCLUSIVE)

再次提醒:这三个方法都是AQS中的方法哦!

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

首先我们再说一遍这个方法是干什么的吧(主要就是怕你们忘我是不会忘的(狗头保命)):

将当前线程添加到等待队列中,返回的就是一个Node(那么我们就可以凭借自己的想象,我们进去一个线程,给我们返回了一个Node,肯定是这里面将我们的线程进行了封装)

正式进入该方法:

为我们创建了一个Node对象,带了两个参数,一个是当前线程(当前线层是B线程哦)把当前线程封装进了Node,另一个是独占模式的标记(相当于枚举,之前说过的哦):

Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}
  • 1
  • 2
  • 3
  • 4

将tail赋值给了pred变量,并且在 if 中判断pred是否为空(那么当前tail是否为空?答案是肯定的,为空),那么为什么有这个判断?因为有很多线程来进行争抢,第一次线程B进来的时候为空,但是线程C进来的时候就不为空了。

现在我们已经知道 if 判断返回的是false了,那么就会走到下面的enq()方法,但是!,想象一下,只有这一种情况才会走到下面的enq方法吗?答案是:不是只有这一种情况。

首先我们将眼界放开一点,因为我们现在的情况是:线程B就是第一个进来的,那么我们如果是第10个进来这个addWaiter方法呢?上面的 pred != null 是不是就会返回true,之后开始CAS设置tail,如果要是设置失败呢?他们改何去何从?所以走到enq方法就有了两种情况:

1、当前队列是空队列 tail == null

2、CAS竞争设置tail失败也会来到这里

接下来我们进入enq方法中

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这个方法中会保证传递进来的Node一定会入队成功~(传递进来的node就是刚刚我们创建的,此时ThreadB就在该node中)

我们看到的是一个死循环

tail赋值给 t 进入 if中,判断 t == null ,此时t为空,在下面的代码中使用CAS来设置一个队列的头结点,此时我们可以看到new 的Node对象就是一个空对象(Node中的Thread == null,waitStatus == 0),并且将head 赋值给了tail,这时 head 和 tail 就都已经初始化好了。

现在的情况就是:

此时循环已经结束,进入第二次循环。再次走到if判断,但是此时 在if判断中 tail 已经不为空,所以走到else判断

node.prev = t;
  • 1

将t赋值给了node.prev,此时就是将线程B的Node的prev指向 tail,也指向head,之后使用CAS将tail设置为node,如果设置成功,则将head的next指向node,并且将node返回

如果不明白怎么回事的请看下图:

这回应该明白了吧!

此时enq方法执行完毕,线程B已经成功入队。

总结:

​ 1、队列的初始化是在第一个获取锁失败的线程入队时进行初始化的。

​ 2、当前持锁的线程,它获取锁时,直接tryAcquire成功了,没有向 阻塞队列 中添加任何node,所以作为后继需要为它创建head节点,并且将head的next指向他自己。

​ 3、当线程B入队成功之后,自旋结束。返回头结点

​ 4、双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。

此时addWaiter执行结束,但是此时线程B还没有被挂起(也就是说,B线程现在已经进入到了银行的等待区,但是他还是一直盯着A再看没有休息。

接下来看第三个方法:acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

再解释一遍该方法的作用:

1、挂起当前线程

2、唤醒之后的相关逻辑。返回true表示挂起过程中线程被中断唤醒过,false表示未被中断过

参数:

​ 1、node:就是addWaiter()方法返回来的,刚刚入队成功的Node(在这里是我们刚入队成功的B线程被封装的Node节点)

​ 2、当前线程抢占资源成功后,设置state值。

进入该方法时,我们看到两个标记:

​ 1、boolean failed = true; 当为true时,表示需要执行出队的逻辑,响应中断的逻辑,过后我们在讲解

​ 2、boolean interrupted = false; 表示线程是否被中断true为是,false…相反呗

此时又是自旋操作(就是死循环),获取当前Node的前一个节点,并且赋值给 p

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

该方法是AQS中Node类的方法

获取完当前node的前一个节点之后,进入if判断

if (p == head && tryAcquire(arg))
    条件一: p == head 当前head为node.prev节点,所以返回true
    条件二: tryAcquire(arg): 此时又进入tryAcquire(arg)方法
  • 1
  • 2
  • 3

让我们再来回顾一下这个方法:

此时返回false,再一次获取失败,所以走下一个if判断

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  • 1

此时调用了两个方法:

1、shouldParkAfterFailedAcquire()

2、parkAndCheckInterrupt()

**先说第一个方法:**shouldParkAfterFailedAcquire(Node pred, Node node)

此时传递进来的第一个参数:head节点,第二个参数:线程B被封装的Node节点

我们看到首先获取了head节点的waitStatus,此时我们是第一次进入这个方法,当前的waitStatus为0,下面进入判断

if (ws == Node.SIGNAL) 此时ws为0不等于 -1 ,继续向下执行

if (ws > 0) 此时ws为0,不大于0,继续向下执行

compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 使用CAS来将ws的状态值由0设置为-1返回,当前方法返回false。

此时在acquireQueued方法中进行第二次循环,这时还会尝试竞争,我们假设竞争还失败(线程A还在办理业务),当然如果竞争成功了,那么下面就 不走了,那我们就没法往下说了!

这时还会走到shouldParkAfterFailedAcquire方法中这时head的waitStatus为-1,走到第一个if判断

if (ws == Node.SIGNAL) 为true,直接返回true,不会向下执行。

接着看第二个方法parkAndCheckInterrupt(),当然此时第一个方法不够明细,我们先看完第二个方法之后,再回来补充第一个方法
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  • 1
  • 2
  • 3
  • 4

此时我们看到直接将ThreadB线程给park掉了,线程B就直接在这睡了!!!

lock方法讲述完毕。线程直接在这不动。有些小伙伴会说return Thread.interrupted();不走吗?是的,不会走了,线程就卡在这了,之前我们说addWaiter方法返回的时候,那时线程B还在一直盯着线层A,现在他彻底睡着了。

ok!那我们接着补充一下方法一:

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

我们看到有两个if和一个else,第一个if和else没啥好说的,我们说第二个if

想象一下这种情况:

方法:shouldParkAfterFailedAcquire(Node pred, Node node),此时pred为ThreadD,node为ThreadE,那么第二个if就会为true,即:ws > 0

if (ws > 0) {
    do {
        node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这个if里面看着很复杂,其实总结起来就是一句话:当前线程封装的Node节点(也就是第二个参数)他的前一个节点的waitStatus小于等于0的时候这时当前线程封装的Node节点的prev就指向了那个waitStatus小于等于0的那个节点,并且将waitStatus小于等于0的节点的next指向当前线程封装的Node节点!,如果不是小于等于0,那么就会继续while循环,向前寻找。

以上就是lock.lock()方法,接下来我们看lock.unLock()。

public void unlock() {
    sync.release(1);
}
  • 1
  • 2
  • 3

我们可以看到,当我们在写代码调用unLock的时候,走的也是内部类Sync的方法release(1);,接下来我们追进这个方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

我们可以看到调用的是AQS的release(int arg)方法,在if判断中调用了tryRelease(arg)方法,我们先看这个tryRelease(arg)方法。点进之后发现也是直接抛出异常,至于为什么抛出异常,同tryAcqiure方法解释。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

我们先来说明tryRelease方法,之后返回去说明release方法

在tryRelease方法中:

获取AQS中的state值,并且减去1(因为传递过来的就是1),接下来判断当前线程是否拥有锁的线程,如果不是拥有锁的线程那么毫不留情,直接抛异常,这就相当于路人甲上的锁凭什么路人乙可以打开?钥匙就一个而且在路人甲身上

切记:如果为true的话,那么下面的代码也是没有并发的

接着出现一个完全释放的标记,默认为false,如果c == 0 那么此时该锁已经完全释放(想象一下重入锁),将当前拥有锁的线程设置为null ,返回true。如果没有完全释放,那么将结果设置进statue,并且返回false

此时我们假设释放完全,因为释放不完全还会来这个方法。。所以没必要在解释一遍~~
此时我们现在看release方法

再贴一遍代码

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

思考:这时我们的tryRelease(arg)已经返回true,那么想一下我们下一步要做什么?

当if代码块中为true时,将head赋值给 h 进入if块判断 h 不为空(我们现在的head当然不为空)并且h.waitStatus != 0(我们的h.waitStatus != 0现在是 -1 )

接下来调用unparkSuccessor(h);

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

我们可以看见unparkSuccessor(Node node)这个方法传递进来一个参数,这个参数node就是head节点

在方法里面首先拿到head的waitStatus并且赋值给ws变量,当前ws为-1,走到第一个if块中并且为true,使用CAS将head的waitStatus设置为0(在这里思考为什么要设置为0,因为当前节点head已经完成唤醒他后继节点head.next的任务了),接着向下获取head的next节点(也就是在我们这里线程B所被封装的Node),这时走到第二个if块中里面的两个条件s == null || s.waitStatus > 0都为false(第一个条件不用说肯定不为空,第二个条件:线程B的waitStatus现在为0所以也为false)所以接着又走到第三个if块中(第二个if块中的逻辑我们走完这个释放锁的流程之后回来再补充)在里面对线程B的Node进行判断空,此时显然不为空,调用的就是unPark方法,将当前线程B封装的Node的thread传递进去,此时线程B就被唤醒,线程B继续向下执行任务(思考:线程B被唤醒之后,从哪里开始执行?

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  • 1
  • 2
  • 3
  • 4

线程B从此处被唤醒,返回一个中断标记(当线程被中断唤醒后Thread.interrupted();就会返回true,那么显然我们本次线程B被唤醒不是由中断唤醒的,所以此处返回false)。

这个parkAndCheckInterrupt()是在哪里被调用的不知道大家是否还记得?

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

就是acquireQueued(final Node node, int arg)这个方法里面被调用的,而且不要忘记,这是一个自旋操作~,现在,线程从第二个if块中被unPark唤醒,那么肯定会接着向下执行。在第二个if块中我们就可以清晰的知道条件不成立,那么就是下一次的自旋操作。

拿到线程B对应Node的前一个节点(前一个节点就是head节点),接着走到第一个if块中(p == head && tryAcquire(arg)),第一个判断不用说了,肯定是true,那么就又会走到tryAcquire方法中,让我们再来走一遍tryAcquire方法吧~

在走tryAcquire方法之前我们要先知道现在银行里面是什么情况

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在该方法中首先获取当前线程,当前线程为ThreadB,接着获取getState()的值为0,走到第一个if块中,在该if块中判断当前state是否为0,很明显我们当前state确实为0,然后走进里面的if块使用CAS来设置state的值为参数传递的acquires的值(这个值为1),因为我们当前state的值为0,所以线程B使用CAS将state的值设置为了1(这时线程B抢占到了锁),并且将ExclusiveOwnerThread设置成为了ThreadB,将结果true返回。

这时我们的tryAcquire方法返回为true,那么接着向下执行,走到acquireQueued方法的setHead(node);方法里面(至于为什么走到这个方法,请动动手向上找一下吧,不再贴acquireQueued的源码了,上面有的),并且将线程B对应的Node传递进去,我们接下来接着看这个setHead()方法(思考:当前我们要做的事情有哪些?

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
  • 1
  • 2
  • 3
  • 4
  • 5

在该方法中,将node设置为当前线程的head,并且将当前node的Thread设置为null,将node的prev节点设置为null

如果不懂的话,让我们用张图来对比一下吧:

这是原来的:

这是执行完setHead方法之后的:

看到这之后相信有的小伙伴就在想,我们是不是应该把next的这个指向给去掉?我们接着向下执行

这时我们已经执行完setHead()方法

if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

下面就是p.next=null也就是将原来老的head的next指针指向null(源码中的注释也有写:help GC)

将failed标记设置为false返回线程的中断标记,此时肯定是返回false的,之后方法就会返回,还记得该方法执行完毕会返回到那里吗?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • 1
  • 2
  • 3
  • 4
  • 5

acquireQueued返回的中断标记为false,那么什么事也不干,我们的释放锁操作就结束了,并且下一个线程也已经获取到了锁。整个逻辑也就结束了。

接下来该说我们在前面留下来的一个疑问:

在unparkSuccessor(Node node)这个方法的第二个if块中的逻辑

if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

那么这里是干嘛的呢?

上面的循环中,会找到一个离当前线程最近的需要被唤醒的node节点,node节点有可能找不到,node节点也有可能是null。

但是要注意:当 s==null 为false的时候才会进行第二步的判断,当s.waitStatus > 0成立的时候,说明当前节点的状态为取消排队的状态,那么我们就需要一个合适的被唤醒的节点

if (s != null) LockSupport.unpark(s.thread);
如果找到合适的可以被唤醒的node,那么就唤醒,如果找不到就啥也不做。

以上全部就是加锁和释放锁的过程
接下来我们看公平锁与非公平锁他们两者之间的区别:

首先怎样构造一个公平锁与非公平锁其实非常简单:

ReentrantLock lock = new ReentrantLock(true);//这就是构造了一个公平锁

ReentrantLock lock = new ReentrantLock(false);
ReentrantLock lock = new ReentrantLock();
//上面两种方式就是构造了一个非公平锁
  • 1
  • 2
  • 3
  • 4
  • 5

首先看他们的lock方法:

公平锁:

非公平锁:

可以看出非公平锁在加锁的时候,一上来就是使用CAS将state的值设置为1,但是公平锁不是

其次就是他们的tryAcquire方法:

公平锁:

非公平锁:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
  • 1
  • 2
  • 3

非公平锁中的tryAcquire方法中又调用了nonfairTryAcquire(acquires);方法

我们可以看出在争抢资源的时候非公平锁在争抢资源的时候,总是会去判断在当前节点之前是否还有其他被等待唤醒的节点,也就是调用hasQueuedPredecessors()方法,这个方法返回为true时代表当前节点前面有其他正在等待被唤醒的线程,返回为false时,表示当前线程前面没有需要被等待唤醒的线程,自己就是head.next节点,那我们来看一下这个方法

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

返回true的情况:

​ 当头结点不等于尾节点并且头结点的下一个节点不为空,头结点的下一个节点的Thread 不等于当前线程时说明在队列中当前线程的Node节点之前有正在等待的Node节点。

返回false的情况:

​ 1、当头结点不等于尾节点并且头结点的下一个节点不为空,但是头结点的下一个节点的Thread等于当前线程时,说明当前正在等待的Node就是下一个即将被唤醒的线程

​ 2、如果头结点等于尾节点时,也会返回false

​ 3、头结点不等于尾节点,但是头结点的下一个节点为空时,也会返回false

以上就是公平锁与非公平锁的区别,剩下的流程都一样。总之:公平锁在加锁之前会判断当前有没有线程在队列中进行等待,非公平锁不会判断。

最后,我们来看一下响应中断的情况:

响应中断本人找到了两种切入点

一:从ReentrantLock类的lockInterruptibly()方法开始
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
  • 1
  • 2
  • 3
二:在acquireQueued方法中if (failed)为true的情况

但是最终他们都走到了一个方法:就是AQS中的cancelAcquire(Node node)

如果感兴趣的小伙伴可以从这两个地方开始进行源码的追溯,在这里我们主要说明他们两个最终调用的cancelAcquire(Node node)方法。

 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)
         return;
     node.thread = null;
     // Skip cancelled predecessors
     Node pred = node.prev;
     while (pred.waitStatus > 0)
         node.prev = pred = pred.prev;
     Node predNext = pred.next;
     node.waitStatus = Node.CANCELLED;

     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {
         compareAndSetNext(pred, predNext, null);
     } else {
         int ws;
         if (pred != head &&
             ((ws = pred.waitStatus) == Node.SIGNAL ||
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
             pred.thread != null) {
             Node next = node.next;
             if (next != null && next.waitStatus <= 0)
                 compareAndSetNext(pred, predNext, next);
         } else {
             unparkSuccessor(node);
         }

         node.next = node; // help GC
     }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

该方法的作用就是取消指定的node参与竞争。

牢记这张图:

首先进入方法一开始就判断当前想要取消排队的节点是否为null ,如果为null则直接返回。什么都不做。

如果当前想要取消排队的节点不为null,则继续向下执行,将Node里面的Thread设为null ,因为已经取消排队了,所以将node内部关联的线程直接置为空。

获取当前取消排队的node得前继节点Node pred = node.prev;

while (pred.waitStatus > 0)  node.prev = pred = pred.prev;
  • 1

当前继节点的waitStatus大于0的时候,就继续向前寻找,直到找到当前取消的前继节点的waitStatus小于等于0的时候。这时将当前取消排队的Node的prev指向waitStatus小于等于0的节点pred。

Node predNext = pred.next;
  • 1

拿到pred的next节点,这时会有两种情况:

​ 1、pred的next就是当前要取消排队的node

​ 2、pred的next可能也是waitStatus大于0的节点

node.waitStatus = Node.CANCELLED;
  • 1

设置当前node(响应中断要取消的node)的状态为取消状态,现在node(响应中断要取消的node)所处的位置按照下面的代码所示,就有三种情况:当然这三种情况不可能同时出现,只会出现一种

if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
} else {
    // If successor needs signal, try to set pred's next-link
    // so it will get one. Otherwise wake it up to propagate.
    int ws;
    if (pred != head &&
        ((ws = pred.waitStatus) == Node.SIGNAL ||
         (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
        pred.thread != null) {
        Node next = node.next;
        if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next);
    } else {
        unparkSuccessor(node);
    }

    node.next = node; // help GC
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
第一种情况:当前取消排队的node所处的位置是队尾
if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
  • 1
  • 2

那么就会使用CAS来将tail指向当前节点的前一个节点,如果CAS成功(我们现在假设它就成功)那么就会进入到下面的代码,使用CAS来设置next节点。将pred的next节点置空。

下面的两张图来对比:

出队前:

出队后:

第二种情况:当前出队的节点node既不是head.next也不是tail
if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);
} else {
    int ws;
    if (pred != head &&
        ((ws = pred.waitStatus) == Node.SIGNAL ||
         (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
        pred.thread != null) {
        Node next = node.next;
        if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用ws来保存当前节点waitStatus的状态

if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && 
compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null)
  • 1
  • 2

1、pred != head 成立, 说明当前node 不是 head.next 节点,也不是 tail。

2、(ws = pred.waitStatus) == Node.SIGNAL

​ 成立:说明node的前驱状态是 -1

​ 不成立:前驱状态可能是0

3、(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) 假设前驱状态是 <= 0, 则设置前驱状态为 Signal状态表示以后要用来唤醒后继节点。

4、pred.thread != null:pred的Thread肯定不为空

Node next = node.next;
if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
  • 1
  • 2
  • 3

获取要出队节点的next节点,判断next节点不为空并且next节点的waitStatus <= 0。为true的情况下

使用CAS设置要出队的节点的前一个节点的下一个节点指向要出队节点的下一个节点

当node.next节点 被唤醒后会调用 shouldParkAfterFailedAcquire让node.next 节点越过取消状态的节点完成真正出队。

出队前:

出队后:

第三种情况:当前node 是 head.next节点。
else {
    unparkSuccessor(node);
}
  • 1
  • 2
  • 3

出队前:

出队后:

当唤醒等待的线程时,后继节点被唤醒后,会调用 shouldParkAfterFailedAcquire 会让node.next 节点越过取消状态的节点,队列的第三个节点 会 直接 与 head 建立 双重指向的关系
head.next 指向 ThreadC的node

ThreadB是被出队的head.next

ThreadC的node.prev 指向 head

此时出队完成!

以上就是响应中断的全部过程。

总结:

AQS:
抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种【排队等候机制】。
抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去【候客区排队等待】),
但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了【排队等候机制】,那么就一定会有某种队列形成,这样的队列是一个先进先出的双向队列。
如果共享资源被占用,【就需要一定的阻塞等待唤醒机制来保证锁分配】这个机制主要用的是CLH队列的变体来实现的,
将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。他将请求共享资源的线程封装成队列的结点(Node),
通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制结果。

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作
将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

Node节点等于waitStatus(表示排队的他的每一个节点他的状态)+前后指针的指向(控制它的入队和出队)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/149211?site
推荐阅读
相关标签
  

闽ICP备14008679号