赞
踩
AQS 最核心的三大部分就是状态、队列和期望协作工具类去实现的获取/释放等重要方法。
如果 AQS 想要去管理或者想作为协作工具类的一个基础框架,那么它必然要管理一些状态,而这个状态在 AQS 内部就是用 state 变量去表示的。
它的定义如下:
/**
* The synchronization state.
*/
private volatile int state;
而 state 的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义,下面举几个例子。
在 Semaphore 信号量里面,state 表示的是剩余许可证的数量。
如果最开始把 state 设置为 10,这就代表许可证初始一共有 10 个,
然后当某一个线程取走一个许可证之后,这个 state 就会变为 9,所以信号量的 state 相当于是一个内部计数器。
在 CountDownLatch 工具类里面,state 表示的是需要“倒数”的数量。
一开始假设把它设置为 5,当每次调用 countDown() 方法时,state 就会减 1,一直减到 0 的时候就代表这个门闩被放开。
在 ReentrantLock 中它表示的是锁的占有情况。
最开始是 0,表示没有任何线程占有锁;如果 state 变成 1,则就代表这个锁已经被某一个线程所持有了。
但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state会递增。
比如重入5次,那么state=5。
而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁。
假设很多线程都想要同时抢锁,那么大部分的线程是抢不到的,那怎么去处理这些抢不到锁的线程呢?
就得需要有一个队列来存放、管理它们。
所以 AQS 的一大功能就是充当线程的“排队管理器”。
当多个线程去竞争同一把锁的时候,就需要用排队机制把那些没能拿到锁的线程串在一起;
而当前面的线程释放锁之后,这个管理器就会挑选一个合适的线程来尝试抢刚刚释放的那把锁。
所以 AQS 就一直在维护这个队列,并把等待的线程都放到队列里面。
也有的把这个队列称之为 CLH ,其实就是想表达这个队列有三个人搞的:Craig, Landin, and Hagersten locks 。
AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。
所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。
每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到ASQ队列中去。
AbstractQueuedSynchronizer内部维护了一个Node节点类和一个ConditionObject内部类。
Node内部类是一个双向的FIFO队列,用来保存阻塞中的线程以及获取同步状态的线程。
而ConditionObject是等待和通知机制。
除了Node节点的这个FIFO队列,还有一个重要的概念就是waitStatus一个volatile关键字修饰的节点等待状态。
在AQS中waitstatus有五种值:
1、INITAL 值为0,表示当前没有线程获取锁(初始状态)。
2、SIGNAL 值为-1,后继节点的线程处于等待的状态,当前节点的线程如果释放了同步状态或者被取消会通知后继节点,后继节点会获取锁并执行。
(当一个节点的状态为SIGNAL时就意味着在等待获取同步状态,前节点是头节点也就是获取同步状态的节点)
3、CANCELLED 值为1,因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。
处于这种状态的结点会被踢出队列,被GC回收。
(一旦节点状态值为1说明被取消,那么这个节点会从同步队列中删除)
4、CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition,当其它线程对Condition调用了singal()方法该节点会从等待队列中移到同步队列中。
5、PROPAGATE 值为-3,表示下一次共享式同步状态获取将会被无条件的被传播下去。(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态)
AQS通过Node内部类构成的一个双向链表结构的同步队列,来完成线程获取锁的排队工作。
当有线程获取锁失败后,就被添加到队列未尾。
在AQS中有两个和链表息息相关的变量:
在队列中,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点。
头节点可以理解为“当前持有锁的线程”,而在头节点之后的线程就被阻塞了,它们会等待被唤醒,唤醒也是由 AQS 负责操作的。
// 同步队列的头结点,是懒加载的,即不会立即创建一个同步队列,只有当某个线程获取不到锁,需要排队的时候,才会初始化头结点
private transient volatile Node head;
// 同步队列的尾结点,同样是懒加载。
private transient volatile Node tail;
static final class Node {
// 标识节点在抢占共享锁
static final Node SHARED = new Node();
// 标识节点在抢占独占锁
static final Node EXCLUSIVE = null;
// 节点等待状态值 1:取消状态
static final int CANCELLED = 1;
// 节点等待状态值-1:标识后继线程处于等待状态
static final int SIGNAL = -1;
// 节点等待状态值-2:标识当前线程正在进行条件等待
static final int CONDITION = -2;
// 节点等待状态值-3:标识下一次共享锁的 acquireShared 操作需要无条件传播
static final int PROPAGATE = -3;
/**
* SIGNAL: 当前节点的线程完成或者取消后, 需要设置为此状态以便唤醒后节点的线程
* CANCELLED: 节点会被取消如果获取锁超时或者线程被打断, 设置为此状态的节点不会再改变其状态
* CONDITION: 表示节点处于 Condtion 队列中, 不会被用作同步队列节点
* PROPAGATE: 用于共享模式下的传播释放等待线程
* 0: 初始值
*
* 通过 CAS 来改变属性值
*/
volatile int waitStatus;
/**
* 指向前节点
* 当前节点被取消时, 需要指向新的未被取消的节点作为前节点,
* 头节点不会被取消:因为节点成为头节点就意味着成功获取到了锁, 一个被取消的节点永远不会获取到锁
*/
volatile Node prev;
/**
* 指向后节点, 当后节点被取消时需要指向新的节点或者空
*/
volatile Node next
/**
* 入队节点的线程, 结束之后会被置空
*/
volatile Thread thread;
/**
*Condition 队列中的下一个等待节点
*/
Node nextWaiter;
// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回前驱节点,如果为空抛出空指针
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS通过内部类ConditionObject构建等待队列(可有多个),
当Condition调用wait()方法后,线程将会加入等待队列中,
而当Condition调用signal()方法后,线程将从等待队列转移动同步队列中进行锁竞争。
同步器队列基本构造(一个同步队列 + 多个等待队列)
问题:
多个等待队列的话,节点怎么选择其中一个等待队列加入呢?
解答:
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、notify()以及notifyAll()方法。
这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
JUC包提供了Condition来对锁进行精准控制,Condition是一个多线程协调通信的工具类。
可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的。
condition中两个最重要的方法:
await,把当前线程阻塞挂起;
signal,唤醒阻塞的线程
关于释放和获取,在 AQS 中其实只是定义一个模板,让子类自己去实现。
由此可知, AQS 用到了模板方法模式。
首先来看一下获取方法。
获取操作通常会依赖 state 变量的值,根据 state 值不同,协作工具类也会有不同的逻辑,并且在获取的时候也经常会阻塞。
下面来看几个具体的例子。
比如 ReentrantLock 中的 lock 方法就是其中一个“获取方法”。
执行时,如果发现 state 不等于 0 且当前线程不是持有锁的线程,那么就代表这个锁已经被其他线程所持有了。
这个时候,当然就获取不到锁,于是就让该线程进入阻塞状态。
// /ReentrantLock.NonfairSync中
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AQS中
protected final boolean compareAndSetState(int expect, int update) {
// 比较state是不是==0,是就把state设置为1
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// AQS中
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ReentrantLock.NonfairSync中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state==0
if (c == 0) {
// 设置state==1
if (compareAndSetState(0, acquires)) {
// 拥有线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 重入,判断拥有线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
// state = state+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再比如,Semaphore 中的 acquire 方法就是其中一个“获取方法”,作用是获取许可证,此时能不能获取到这个许可证也取决于 state 的值。
如果 state 值是正数,那么代表还有剩余的许可证,数量足够的话,就可以成功获取;
但如果 state 是 0,则代表已经没有更多的空余许可证了,此时这个线程就获取不到许可证,会进入阻塞状态,所以这里同样也是和 state 的值相关的。
// Semaphore 中
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 返回值大于等于0,表示获取成功
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// Semaphore.NonfairSync 中
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Semaphore.Sync 中
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// state=state -1
// 通过compareAndSetState()设置state值
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
再举个例子, CountDownLatch 获取方法就是 await () 方法(包含重载方法),作用是“等待,直到倒数结束”。
执行 await 的时候会判断 state 的值,如果 state 不等于 0,线程就陷入阻塞状态,
直到其他线程执行倒数方法把 state 减为 0,此时就代表现在这个门闩放开了,所以之前阻塞的线程就会被唤醒。
// CountDownLatch 中
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
//判断state是否为0
return (getState() == 0) ? 1 : -1;
}
总结一下,“获取方法”在不同的类中代表不同的含义,但往往和 state 值相关,
也经常会让线程进入阻塞状态,这也同样证明了 state 状态在 AQS 类中的重要地位。
释放方法是站在获取方法的对立面的,通常和刚才的获取方法配合使用。
刚才讲的获取方法可能会让线程阻塞,比如说获取不到锁就会让线程进入阻塞状态,但是释放方法通常是不会阻塞线程的。
比如在 Semaphore 信号量里面,释放就是 release() 方法(包含重载方法), release() 方法的作用是去释放一个许可证,会让 state 加 1;
// Semaphore 中
public void release() {
sync.releaseShared(1);
}
// AQS中
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore.Sync中
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// releases==1
// 加1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
而在 CountDownLatch 里面,释放就是 countDown () 方法,作用是倒数一个数,让 state 减 1。
所以也可以看出,在不同的实现类里面,他们对于 state 的操作是截然不同的,需要由每一个协作类根据自己的逻辑去具体实现。
// CountDownLatch中
public void countDown() {
sync.releaseShared(1);
}
// AQS中
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// CountDownLatch.Sync中
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
// 减1
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
总结一下,到这里,相信你对 AQS 中的state这个变量有更清晰的认识的吧。
总之,就是拿着这个state来搞事情,要么是判断是否等于0,要么是加1,要么是减1。
通过这些条件进行处理相应的逻辑。
在 AQS 中,给子类预留了几个方法,需要子类自己去实现:
// 尝试获取独占锁,需要由子类自己实现,体现了 Abstract
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放独占锁,需要由子类实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 尝试获取共享锁,需要由子类自己实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放共享锁,需要由子类实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
获取和释放独占锁、共享锁都是空方法,由子类自己去实现。
如果想使用 AQS 来写一个自己的线程协作工具类,通常而言是分为以下三步,这也是 JDK 里利用 AQS 类的主要步骤:
第一步,新建一个自己的线程协作工具类,在内部写一个 Sync 类,该 Sync 类继承AbstractQueuedSynchronizer,即 AQS;
第二步,想好设计的线程协作工具类的协作逻辑,在 Sync 类里,根据是否是独占,来重写对应的方法。
如果是独占,则重写 tryAcquire 和 tryRelease 等方法;
如果是非独占,则重写tryAcquireShared 和 tryReleaseShared 等方法;
第三步,在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法,
如果是独占则调用 acquire 或 release 等方法,非独占则调用 acquireShared 或releaseShared 或 acquireSharedInterruptibly 等方法。
在 CountDownLatch 里面有一个子类,该类的类名叫 Sync,这个类正是继承自 AQS。
可以很明显看到最开始一个 Sync 类继承了 AQS,
这正是上一节所讲的“第一步,新建一个自己的线程协作工具类,在内部写一个 Sync 类,该 Sync 类继承 AbstractQueuedSynchronizer,即AQS”。
而在 CountDownLatch 里面还有一个 sync 的变量,正是 Sync 类的一个对象。
同时,我们看到,Sync 不但继承了 AQS 类,而且还重写了 tryAcquireShared 和tryReleaseShared 方法,
这正对应了“第二步,想好设计的线程协作工具类的协作逻辑,在 Sync类里,根据是否是独占,来重写对应的方法。
如果是独占,则重写 tryAcquire 或 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法”。
这里的 CountDownLatch 属于非独占的类型,因此它重写了 tryAcquireShared 和 tryReleaseShared 方法。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//省略其他代码...
}
首先来看看构造函数。
CountDownLatch 只有一个构造方法,传入的参数是需要“倒数”的次数,每次调用 countDown 方法就会倒数 1,
直到达到了最开始设定的次数之后,相当于是“打开了门闩”,所以之前在等待的线程可以继续工作了。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
从代码中可以看到,当 count < 0 时会抛出异常,
当 count > = 0 ,即代码 this.sync = newSync( count ) ,往 Sync 中传入了 count,这个里的 Sync 的构造方法如下:
Sync(int count) {
setState(count);
}
该构造函数调用了 AQS 的 setState() 方法,并且把 count 传进去了,而 setState() 正是给 AQS 中的 state 变量赋值的,代码如下:
protected final void setState(int newState) {
state = newState;
}
所以通过 CountDownLatch 构造函数将传入的 count 最终传递到 AQS 内部的 state 变量,给 state 赋值,state 就代表还需要倒数的次数。
接下来介绍 getCount() 方法,该方法的作用是获取当前剩余的还需要“倒数”的数量,getCount() 方法的源码如下:
public long getCount() {
return sync.getCount();
}
该方法 return 的是 sync 的 getCount:
int getCount() {
return getState();
}
们一步步把源码追踪下去, getCount () 方法调用的是 AQS 的 getState() :
protected final int getState() {
return state;
}
如代码所示, protected final int getState() 方法直接 return 的就是 state 的值,所以最终它获取到的就在 AQS 中 state 变量的值。
再来看看 countDown() 方法,该方法其实就是 CountDownLatch 的“释放”方法,下面来看下源码:
public void countDown() {
sync.releaseShared(1);
}
在 countDown 方法中调用的是 sync 的 releaseShared 方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以看出, releaseShared() 先进行 if 判断,判断 tryReleaseShared() 方法的返回结果,
因此先把目光聚焦到 tryReleaseShared() 方法中, tryReleaseShared() 源码如下所示 :
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
方法内是一个 for 的死循环,在循环体中,最开始是通过 getState() 拿到当前 state 的值并赋值给变量 c,这个 c 可以理解为是 count 的缩写,
如果此时 c = 0,则意味着已经倒数为零了,会直接会执行下面的 return false 语句,一旦 tryReleaseShared () 方法返回 false,
再往上看上一层的 releaseShared () 方法,就会直接跳过整个 if (tryReleaseShared(arg)) 代码块,直接返回false,
相当于 releaseShared() 方法不产生效果,也就意味着 countDown() 方法不产生效果。
再回到 tryReleaseShared() 方法中往下看 return false 下面的语句,
如果 c 不等于 0,在这里会先把 c-1 的值赋给 nextc,然后再利用 CAS 尝试把 nextc 赋值到 state 上。
如果赋值成功就代表本次 countDown() 方法操作成功,也就意味着把 AQS 内部的 state 值减了 1。
最后,是 return nextc == 0 ,如果 nextc 为 0,意味着本次倒数后恰好达到了规定的倒数次数,门闩应当在此时打开,所以 tryReleaseShared() 方法会返回 true,
那么再回到之前的 releaseShared () 方法中,可以看到,接下来会调用 doReleaseShared() 方法,效果是对之前阻塞的线程进行唤醒,让它们继续执行。
如果结合具体的数来分析,可能会更清晰。
假设 c = 2,则代表需要倒数的值是 2, nextc = c-1 ,所以 nextc 就是 1,然后利用 CAS 尝试把 state 设置为 1,
假设设置成功,最后会 return nextc == 0 ,此时 nextc 等于 1,不等于 0,所以返回 false,
也就意味着 countDown() 之后成功修改了 state 的值,把它减 1 了,但并没有唤醒线程。
下一次执行 countDown() 时,c 的值就是 1,而 nextc = c - 1 ,所以 nextc 等于 0,
若这时 CAS操作成功,最后 return nextc == 0 ,所以方法返回 true,
一旦 tryReleaseShared () 方法 return true ,则 releaseShared() 方法会调用 doReleaseShared() 方法,把所有之前阻塞的线程都唤醒。
最后来看看 await () 方法,该方法是 CountDownLatch 的“获取”方法,调用 await() 方法会把线程阻塞,直到倒数为 0 才能继续执行。
await() 方法和 countDown() 是配对的,追踪源码可以看到 await () 方法的实现:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
它会调用 sync 的 acquireSharedInterruptibly() ,并且传入 1。acquireSharedInterruptibly() 方法源码如下所示:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
可以看到,它除了对于中断的处理之外,比较重要的就是 tryAcquireShared() 方法。
这个方法很简单,它会直接判断 getState() 的值是不是等于 0,如果等于 0 就返回 1,不等于 0 则返回 -1。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
getState() 方法获取到的值是剩余需要倒数的次数,如果此时剩余倒数的次数大于 0,那么 getState() 的返回值自然不等于 0,
因此 tryAcquireShared() 方法会返回 -1,一旦返回 -1,再看到 if (tryAcquireShared(arg) < 0) 语句中,就会符合 if 的判断条件,
并且去执行 doAcquireSharedInterruptibly() 方法,然后会让线程进入阻塞状态。
再来看下另一种情况,当 state 如果此时已经等于 0 了,那就意味着倒数其实结束了,不需要再去等待了,
就是说门闩是打开状态,所以说此时 getState() 返回 0, tryAcquireShared() 方法返回 1 ,
一旦返回 1,对于 acquireSharedInterruptibly () 方法而言相当于立刻返回,
也就意味着 await () 方法会立刻返回,那么此时线程就不会进入阻塞状态了,相当于倒数已经结束,立刻放行了。
这里的 await () 和 countDown() 方法,正对应了本讲一开始所介绍的
“第三步,在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法,
如果是独占则调用 acquire() 或 release() 等方法,非独占则调用 acquireShared() 或 releaseShared() 或 acquireSharedInterruptibly() 等方法。”
最后对 AQS 在 CountDownLatch 的应用进行总结。
当线程调用 CountDownLatch 的 await () 方法时,便会尝试获取“共享锁”,不过一开始通常获取不到锁,于是线程被阻塞。
“共享锁”可获取到的条件是“锁计数器”的值为 0,而“锁计数器”的初始值为 count,
当每次调用 CountDownLatch 对象的 countDown() 方法时,也可以把“锁计数器” -1。
通过这种方式,调用 count 次 countDown() 方法之后,“锁计数器”就为 0 了,于是之前等待的线程就会继续运行了,
并且此时如果再有线程想调用 await 方法时也会被立刻放行,不会再去做任何阻塞操作了。
们在前面对 ReentrantLock 使用做了演示,下面展示一个获取锁的过程:
这是没有竞争的情况下,所以不涉及到队列,接下来,来看看如果存在多线程抢锁的场景。
如果线程A获取到了这个lock锁,此时,线程B又来获取锁了,那该怎么办?
等呗,如何等?
继续回到前面的代码:
// 第一步:ReentrantLock
public void lock() {
sync.lock();
}
// 第二步:ReentrantLock默认非公平锁
// 所以,来到ReentrantLock.NonfairSync中
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 这次的重点在这里,这个方法在AQS中
acquire(1);
}
// 第三步:AQS中
public final void acquire(int arg) {
// 1.先尝试获取,如果获取成功,则直接返回,代表加锁成功
if (!tryAcquire(arg) &&
// 2.如果获取失败,则调用addWaiter在等待队列中增加一个节点
// 3.调用acquireQueued告诉前一个节点,在解锁之后唤醒自己,然后线程进入等待状态
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果在等待过程中被中断,则当前线程中断
selfInterrupt();
}
第三步中,由于有竞争,所以就应该先再试试能不能获取到锁,即调用 tryAcquire() 方法:
这个 tryAcquire() 方法在 NonfairSync 中(咱们这里分析的是非公平锁)。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state值
int c = getState();
// 再次对比state是否等于0
if (c == 0) {
// 再通过CAS去给state赋值acquires
if (compareAndSetState(0, acquires)) {
// 获取锁成功,把持有锁线程设置为当前线程
setExclusiveOwnerThread(current);
// 返回true表示获取到了锁
return true;
}
}
// 比较持有锁线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
// 重入,state=state+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置state值
setState(nextc);
// 返回true表示获取到了锁
return true;
}
// 返回false表示没有获取到锁,那就去排队吧
return false;
}
整个流程如下:
上述流程如果没有获取到锁,那就会执行:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
这里了(如果获取锁失败之后,就会调用 addWaiter () 方法,把当前线程加入同步队列)。
这里先看 addWaiter() 方法:
// AQS中
private Node addWaiter(Node mode) {
// 把当前线程封装成 Node ,并且是独占模式
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队,如果失败,则会调用 enq 入队方法。enq 会初始化队列。
Node pred = tail;
// 如果 tail 不为空,说明当前队列中已经有节点
// 也就是说,队列中有线程在排队等待获取锁
if (pred != null) {
// 把当前 node 的 prev 指针指向 tail
node.prev = pred;
// 通过 CAS 把 node 设置为 tail,即添加到队尾
if (compareAndSetTail(pred, node)) {
// 把旧的 tail 节点的 next 指针指向当前 node
pred.next = node;
return node;
}
}
// 当 tail 为空时,把 node 添加到队列,如果需要的话,先进行队列初始化
// 第一个排队等待的线程
enq(node);
// 入队成功之后,返回当前 node
return node;
}
这个方法里,把重心放在 enq() 方法上,因为其他相对来说比较好理解。
//通过自旋,把当前节点加入到队列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果 tail为空,说明队列未初始化
if (t == null) {
// 创建一个空节点,通过 CAS把它设置为头结点
if (compareAndSetHead(new Node()))
// 此时只有一个 head头节点,因此把 tail也指向它
tail = head;
} else {
// 第二次自旋时,tail不为空,于是把当前节点的 prev指向 tail节点
node.prev = t;
// 通过 CAS把 tail节点设置为当前 node节点
if (compareAndSetTail(t, node)) {
// 把旧的 tail节点的 next指向当前 node
t.next = node;
return t;
}
}
}
}
这里再说说这个队列存节点的问题。
业务场景,有三个线程:线程1、线程2、线程3,都来获取锁。
假设线程A先获取到锁,那线程2和线程3只能等待线程1释放了锁,才有机会获取到锁。
1、线程1获取锁了,正在执行任务,此时不会存在队列。
2、线程2来获取锁,一看state不等于0,并且持有锁线程是线程1,不是线程2,所以,把线程2封装一个Node节点,再初始化队列,最后加入队列。
3、此时,线程3又来获取锁,发现不行,也只能加入队列,加到队尾
4、线程1执行结束,释放锁,唤醒线程2,整个队列就变成。
注意:队列的head一定是null,用于表示正在执行的线程对象,且用于唤醒后续线程。
这里分析完了,再回到:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 代码中,这里就该执行 acquireQueued() 方法了。
// 入队成功之后,就会调用 acquireQueued 方法自旋抢锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点就是 head 节点,就调用 tryAcquire 方法抢锁
if (p == head && tryAcquire(arg)) {
// 如果抢锁成功,就把当前 node 设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
// 抢锁成功后,会把线程中断标志返回出去,终止for循环
return interrupted;
}
// 如果抢锁失败,就根据前驱节点的 waitStatus 状态判断是否需要把当前线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
// 线程被挂起时,判断是否被中断过
parkAndCheckInterrupt())
// 注意此处,如果被线程被中断过,需要把中断标志重新设置一下
interrupted = true;
}
} finally {
if (failed)
// 如果抛出异常,则取消锁的获取,进行出队操作
cancelAcquire(node);
}
}
通过代码,可以看到,当前的同步队列中,只有第二个节点才有资格抢锁。
如果抢锁成功,则会把它设置为头结点。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
需要注意的是,这个方法,会把头结点的线程设置为 null 。
想一下,为什么?
因为,此时头结点的线程已经抢锁成功,需要出队了。
自然的,队列中也就不应该存在这个线程了。
PS:由 enq() 方法,还有 setHead() 方法,可以发现,头结点的线程总是为 null。
这是因为,头结点要么是刚初始化的空节点,要么是抢到锁的线程出队了。
因此,也常常把头结点叫做虚拟节点(不存储任何线程)。
在上面的代码中,提到了抢锁失败就执行: shouldParkAfterFailedAcquire() 方法。
这时,需要判断是否应该把当前线程挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前节点的前驱节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果 ws = -1 ,说明当前线程可以被前驱节点正常唤醒,于是就可以安全的 park了
return true;
if (ws > 0) {
// 如果 ws > 0,说明前驱节点被取消,则会从当前节点依次向前查找,
// 直到找到第一个没有被取消的节点,把那个节点的 next 指向当前 node
// 这一步,是为了找到一个可以把当前线程唤起的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果 ws 为 0,或者 -3(共享锁状态),则把它设置为 -1
// 返回 false,下次自旋时,就会判断等于 -1,返回 true了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果 shouldParkAfterFailedAcquire() 返回 true,说明当前线程需要被挂起。
因此,就执行此方法,同时检查线程是否被中断。
此时,就该进入 parkAndCheckInterrupt() 方法:
private final boolean parkAndCheckInterrupt() {
// 把当前线程挂起,则 acquireQueued 方法的自旋就会暂停,等待前驱节点 unpark
LockSupport.park(this);
// 返回当前节点是否被中断的标志,注意此方法会把线程的中断标志清除。
// 因此,返回上一层方法时,需要设置 interrupted = true 把中断标志重新设置,以便上层代码可以处理中断
return Thread.interrupted();
}
想一下,为什么抢锁失败后,需要判断是否把线程挂起?
因为,如果抢不到锁,并且还不把线程挂起, acquireQueued() 方法就会一直自旋下去。
这样你觉得CPU能受得了吗。
当不停的自旋抢锁时,若发生了异常,就会调用此方法,取消正在尝试获取锁的线程。
node 的位置分为三种情况,将会执行 cancelAcquire() 方法。
private void cancelAcquire(Node node) {
if (node == null){
return;
}
// node 不再指向任何线程
node.thread = null;
Node pred = node.prev;
// 从当前节点不断的向前查找,直到找到一个有效的前驱节点
while (pred.waitStatus > 0){
node.prev = pred = pred.prev;
}
Node predNext = pred.next;
// 把 node 的 ws 设置为 1
node.waitStatus = Node.CANCELLED;
// 第一种:如果 node 是 tail,则把 tail 更新为 pred,并把 pred.next 指向 null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 第二种:如果 node 既不是 tail,也不是 head 的后继节点,就把 node的前驱节点的 ws 设置为 -1
// 最后把 node 的前驱节点的 next 指向 node 的后继节点
if (pred != head
&& ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0){
compareAndSetNext(pred, predNext, next);
}
} else {
// 第三种:如果 node是 head 的后继节点,则直接唤醒 node 的后继节点。
// 这个也很好理解,因为 node 是队列中唯一有资格尝试获取锁的节点,
// 它放弃了资格,当然有义务把后继节点唤醒,以让后继节点尝试抢锁。
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
在第三种的时候,会把后继节点唤醒:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0){
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点向前依次遍历,直到找到距离当前 node 最近的一个有效节点
for (Node t = tail; t != null && t != node; t = t.prev){
if (t.waitStatus <= 0){
s = t;
}
}
}
if (s != null){
// 把这个有效节点的线程唤醒,
// 唤醒之后,当前线程就可以继续自旋抢锁了,(回到 park 的地方)
LockSupport.unpark(s.thread);
}
}
上面线程唤醒用到了 LockSupport ,这里也顺带着给大家聊聊。
下面画一个流程图更直观的查看整个获取锁的过程(非公平锁)。
公平锁和非公平锁的整体流程大致相同,只是在抢锁之前先判断一下是否已经有人排在前面,如果有的话,就不执行抢锁。
先来看一个整体流程:
最后面两个打钩的方法才是重点。
// ReentrantLock中
public void unlock() {
sync.release(1);
}
// AQS中
public final boolean release(int arg) {
// 第一步释放锁
if (tryRelease(arg)) {
Node h = head;
// head==null的情况只有一个线程进入,没有初始化队列,
// !=null至少说明队列被初始化过,但是是否有后续节点未知,waitStatus!=0说明下个节点是等待的
if (h != null && h.waitStatus != 0){
// 第二步:唤醒下一个节点
unparkSuccessor(h);
}
return true;
}
return false;
}
//ReentrantLock.Sync中
protected final boolean tryRelease(int releases) {
// 获取state值,然后减1
int c = getState() - releases;
// 判断当前持有锁线程是否为当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放标识为false,表示为释放
boolean free = false;
// 判断上面计算出来的state值是否为0
// 调用了多少次的lock()方法自然必须调用同样次数的unlock()方法才行,这样才把一个锁给全部解开,
// 才能让state能变成0
if (c == 0) {
// 释放标识
free = true;
// 把持有锁线程设置为null
setExclusiveOwnerThread(null);
}
// 把state值改成先计算出来的值
setState(c);
//返回释放标志
return free;
}
// 唤醒线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// ws小于0表示正常排队线程,先设置为0
if (ws < 0){
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// 这里着实没想出什么时候s会null,除了手动去修改队列,我理解只是作者的保护
// waitStatus>0 表示下个线程是被cancel状态
// 进这个是从队尾开始找,找最近的正常排队的线程
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null){
// 唤醒下一个等待队列中的线程
LockSupport.unpark(s.thread);
}
}
释放流程分成两个部分:
锁释放
节点线程唤醒
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。