赞
踩
是什么?
AQS,通常指的是 java.util.concurrent 下locks包内的 AbstractQueuedSynchronizer
类。类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…。
谁和它有关?
AQS 就是将 用来实现锁或者其他同步器组件的公共基础部分的抽象实现,是重要级基础框架及整个JUC体系的基石,主要用于解决锁分配给 “谁”的问题。
Java并发大佬DougLee,提出同意规范并简化了锁的实现,将其抽象出来屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的——公共基础部分。
AQS原理概述
AQS使用一个volatile的int类型的成员变量 State 来表示同步状态,通过内置的FIFO队列(CLH)来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
AQS同步队列的基本结构如下:
state变量 + CLH双端队列
AQS同步队列的基本结构详细如下:
Node = waitStatus + Thread
内部类Node(Node类在AQS类内部)
属性说明:
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的节点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AQS 对资源的共享方式
AQS定义两种资源共享方式
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
AQS底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException
。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
我们知道Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的,接下来我们以ReentrantLock为突破口来进行AQS源码的探究~
我们已知ReentrantLock是公平锁也可以是非公平锁,那么它所对应的调用是否一致呢?
那么公平锁和非公平锁的不同点在哪呢?是如何实现公平如和实现不公平呢?
我们发现 在非公平锁中首先通过CAS的方式抢占锁,抢不到再进行排队调用acquire()
方法。而公平锁是直接调用了acquire()
方法。点开两者都会调用的acquire()
方法一探究竟~
接下来我们逐一分析一下三个方法:
!tryAcquire(arg)
addWaiter(Node.EXCLUSIVE)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
tryAcquire()
我们发现 tryAcquire()
方法中只是抛出了UnsupportedOperationException()
异常,并用 protected
关键字修饰,其意就是让我们的子类去重写~
FairSync实现类方法:
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判断资源是否被占用 if (c == 0) { // 资源未被占用则占用,并返回true if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 资源被占用是否是当前线程 else if (current == getExclusiveOwnerThread()) { // 当前线程占用重入加锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false return false; }
NonfairSync 非公平锁实现类:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
我们发现其调用了 nonfairTryAcquire()
方法~
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判断资源是否被占用 if (c == 0) { // 资源未被占用则占用,并返回true if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 资源被占用是否是当前线程 else if (current == getExclusiveOwnerThread()) { // 当前线程占用重入加锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false return false; }
addWaiter()
如果调用tryAcquire()失败,即抢夺资源未成功。则需要将当前线程加入队列排队。
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 封装Node节点:线程为key Node node = new Node(Thread.currentThread(), mode); // 判断为指针是否为null Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 加入队列 enq(node); return node; }
双向链表中,第一个和节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。
private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; if (t == null) { // 必须初始化,创建个节点(虚拟)做为头节点 if (compareAndSetHead(new Node())) tail = head; } else { // 尾节点不为null,将当前节点的前指针指向队列尾节点 node.prev = t; if (compareAndSetTail(t, node)) { // 将t设置为当前节点,即当前节点为尾节点 // 将之前的尾节点的后指针指向当前节点 t.next = node; return t; } } } }
acquireQueued()
在加入队列排队的同时尝试抢夺资源。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { // 获得当前节点的前置节点,判断当前节点是否是头节点。若是则进行抢占锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 若抢占成功,则设置当前节点为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 修改Node节点状态,使其线程等待。等候唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程被唤醒,进行自旋判断抢占锁 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 方法解读~如果前置节点的 waitStatus 是 SIGNAL 状态,即 shouldParkAfterFailedAcquire
程序会继续向下执行 parkAndCheckInterrupt
方法,用于将当前线程挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前置节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回true // 准备继续调用 parkAndCheckInterrupt 方法 return true; if (ws > 0) { // 如果 ws 大于0说明是 CANCELLED 状态 do { // 循环判断前置节点的前置节点是否也为 CANCELLED 状态,忽略该状态的节点,重新连接队列 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将当前节点的前置节点设置为 SIGNAL 状态,用于后续唤醒操作 // 程序第一次执行到这返回为false,还会进行外层第二次循环,最终从代码第7行返回 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
挂起线程,等候唤醒
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执行
LockSupport.park(this);
//因以下述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态。如果由于被中断,该方法会返回true。
return Thread.interrupted();
}
我们知道唤醒 park
方法的三种情况:
unpark
interrupt
根据以上分析加入队列等待的线程都已被 park(),那么合适唤醒他们呢?接下来我们来解读 unlock
方法
尝试释放锁,发现 其仍是抛出了UnsupportedOperationException()
异常,并用 protected
关键字修饰,其意就是让子类去重写~
我们来看看 ReentrantLock 实现类的 tryRelease
方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
当释放掉当前锁之后,接下来呢?
public final boolean release(int arg) {
// 尝试释放锁,释放锁之后 唤醒节点的后续节点(如果存在)。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) { // 如果状态为负(即可能需要信号),尝试清除,以预期信号。如果此操作失败或等待线程更改了状态,则可以。 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Unparak的线程保存在后续节点中,通常只是下一个节点。但如果取消或明显为空,则从尾部向后遍历以找到实际的未取消的后续项。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 若找到实际的未取消的后续项并唤醒,则回到之前线程等待的代码-> if (s != null) LockSupport.unpark(s.thread); }
以上分析都是基于正常情况下分析,若在队列当中出现了异常该怎么处理呢?我们看到 finally 代码快中使用 cancelAcquire
方法取消正在执行的获取尝试~
// 取消正在进行的获取尝试。 private void cancelAcquire(Node node) { // 如果节点不存在,则忽略 if (node == null) return; node.thread = null; // 跳过已取消的前置任务,使当前节点的前置指针指向前面最后一个未取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext是要取消切片的明显节点。如果没有,下面的案例将失败,在这种情况下,我们输给了另一个取消或信号,所以没有进一步的行动是必要的。 Node predNext = pred.next; // 通过CAS改变节点的waitStatus 为取消 node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,就把自己移开。 if (node == tail && compareAndSetTail(node, pred)) { // 通过 头指针+偏移量 计算出当前节点对应的内存位置并清空 compareAndSetNext(pred, predNext, null); } else { // 若后置节点需要信号,试着设置pred的下一个链接,这样它就会得到一个。否则唤醒它进行传播。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。