赞
踩
前面分析了AQS类的源码,但真正实现AQS的实现类都在JUC中,当然AQS也是JUC的一部分,只是它不面向应用,除非自己去继承实现一套逻辑。
在java的java.util.concurrent包,简称JUC,其内包含的类都与多线程有关,是非常重要的一个包。接下来准备针对JUC下常用类进行分析,剖析它们的原理及使用特点。而本文将针对比较常用的ReentrantLock源码来分析。
ReentrantLock锁是AQS的一种实现,它做到了可重入、可中断,分为公平和非公平两类实现。在使用时,需要手动调用o.lock()和o.unlock()来加锁和解锁,并且解锁是必须的。这种形式,将加锁、解锁的时机开放给了开发者,因此更加灵活。
看一下ReentrantLock类的结构:
public class ReentrantLock implements Lock, java.io.Serializable
并没继承AQS,反而实现了Lock;因为ReentrantLock是锁,实现Lock很自然。看一下Lock接口的定义:
- public interface Lock {
-
- void lock();
-
- void lockInterruptibly() throws InterruptedException;
-
- boolean tryLock();
-
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
-
- void unlock();
-
- Condition newCondition();
- }
加锁、解锁,还有一个方法用于创建Condition对象,这个在《xxx》有介绍过,可以移步加深一下印象。Condition与本文内容关系不大,回到ReentrantLock,看一下它内部有哪些成员。
成员变量
只有一个Sync类型的sync实例:
private final Sync sync;
内部类
Sync是内部类,它实现了AQS类,既然AQS成为抽象队列同步器,那么我们可以称Sync为队列同步器、同步器。
abstract static class Sync extends AbstractQueuedSynchronizer
另外还有两个内部类,都是继承自Sync:
- static final class NonfairSync extends Sync
- static final class FairSync extends Sync
从结构及名称看,ReentrantLock实现了两种锁:公平同步器FairSync和非公平同步器NonfairSync,都源自AQS。
构造函数
两个构造函数,默认无参的创建的是非公平同步器,还有一个根据入参来决定同步器类型:
- public ReentrantLock() {
- sync = new NonfairSync();
- }
- public ReentrantLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- }
实例方法
内部定义很多,重点看一下Lock接口的实现方法:
- public void lock() {
- sync.lock();
- }
- public void lockInterruptibly() throws InterruptedException {
- sync.acquireInterruptibly(1);
- }
-
- public boolean tryLock() {
- return sync.nonfairTryAcquire(1);
- }
-
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- }
-
- public void unlock() {
- sync.release(1);
- }
- public Condition newCondition() {
- return sync.newCondition();
- }
发现一个现象,都是调用的sync中的实现。那么后面就对Sync的实现做重点分析。由于ReentrantLock默认实现非公平同步器,那么就逐个分析NonfairSync类的实现。
Lock接口的Lock()实现
实现的内容真少,看来主要内容还是在Sync里。
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = 7316153563782823691L;
-
- /**
- * Performs lock. Try immediate barge, backing up to normal
- * acquire on failure.
- */
- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
-
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- }
从加锁lock()方法看到,如果AQS.state=0,并且可以设置为1,则设置线程持有者为自己。
逐个逻辑第一次看可能有疑问,为什么state设置为1就加锁成功了? 因为ReentrantLock是排它锁的实现,而“非公平”的特点就是不管AQS排队那一套,只要现在state=0,我就先去改一下值,改成功了锁就是抢到了,否则再去走AQS.acquire(1)的流程。
看一看AQS的acquire():
- public final void acquire(int arg) {
- if (!tryAcquire(arg) && // t1
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
在t1位置,AQS并没有实现tryAcquire()方法,而是由Sync实现的,这在上面NonfairSync有,而且它调用的是Sync的nonfairTryAcquire()方法,调用链是:
lock()-->acquire()-->tryAcquire()-->nonfairTryAcquire()-->acquire()
看一下nonfairTryAcquire()方法的实现
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
内容并不复杂,根据AQS中state的值,以及排它锁持有者,来决定不同结果。
- 1、state==0,无锁,直接CAS设置state的值,成功了就说明拿到锁,否则肯定有其他线程申请到了。
- 2、state!=0,有锁,如果持有者是自己,则对state的值累加,并且返回成功true
在2中的累加,含义是ReentrantLock是一个可重入锁,持有锁的线程可以多次申请锁,但释放锁测次数要与申请次数相等,才能真正释放锁。
以上就是尝试获取非公平锁tryAcquire的过程,再结合AQS中acquire()的实现,梳理下整个申请锁过程。
介绍下AQS的大致结构:
- 1、包含头尾指针的同步队列head、tail
- 2、同步队列的节点类Node,内部包含Thread线程对象、waitState节点状态、同步队列的前后指针prev、next
- 3、资源值state,将其成功改变的线程将会持有锁
在AQS中,参与申请锁流程的逻辑是, 申请锁的线程会被封装为node对象,加入同步队列中,之后会被挂起,当线程被唤醒后,会CAS方式修改state的值,也就是3中的流程;否则会再次挂起,这种自旋会一直持续,直到申请锁成功、取消申请、线程中断。
而申请流程的源码如下,注意看注释信息:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) && // 如果前面返回false未获得锁,则进acquireQueued()
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter加入同步队列,再申请锁
- selfInterrupt(); // 中断线程;当申请完成后,会返回线程状态,true==线程中断了,false==线程未中断
- }
- // 加入同步队列
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode); // 封装当前线程
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail; // 尾插法,直接加入同步队列
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node); // 再次for(;;)尝试加入同步队列
- return node;
- }
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) { // 又调用tryAcquire()
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 以上,让同步队列头节点的后继节点,尝试获取锁,成功的话将node调整为头节点
- // 以下,获取锁失败,将线程挂起,等待被唤醒
- if (shouldParkAfterFailedAcquire(p, node) && // 检查node状态,决定是否应该被挂起
- parkAndCheckInterrupt()) // 挂起线程,并在被唤醒后检查线程是否中断
- interrupted = true; // parkAndCheckInterrupt返回true,代表线程中断了
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
AQS详细源码分析,请移步《面试必考AQS-排它锁的申请与释放》等系列文章。
这里再梳理一次非公平排它锁的调用:
- public final void acquire(int arg){...} // 获取排它锁的入口
- # protected boolean tryAcquire(int arg); // 尝试直接获取锁,这里可以替换为nonfairTryAcquire()
- final boolean nonfairTryAcquire(int acquires) {...} // 非公平排它锁实现tryAcquire(),执行完毕回到AQS流程
- final boolean acquireQueued(final Node node, int arg) {...} // AQS中获取排它锁流程整合
- private Node addWaiter(Node mode){...} // 将node加入到同步队列的尾部
- # protected boolean tryAcquire(int arg); // 如果当前node的前置节点pre变为了head节点,则尝试获取锁(因为head可能正在释放)
- private void setHead(Node node) {...} // 设置 同步队列的head节点
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果获取锁失败,则整理队中节点状态,并判断是否要将线程挂起
- private final boolean parkAndCheckInterrupt() {...} // 将线程挂起,并在挂起被唤醒后检查是否要中断线程(返回是否中断)
- private void cancelAcquire(Node node) {...} // 取消当前节点获取排它锁,将其从同步队列中移除
- static void selfInterrupt() {...} // 操作将当前线程中断
Lock接口的lockInterruptibly()是申请锁过程允许中断,当检测到线程中断时会抛出异常。
public void lockInterruptibly() throws InterruptedException {...}
具体逻辑与acquire()差别不太大,感兴趣可以自行分析。
Lock接口的tryLock()实现
- public boolean tryLock() {
- return sync.nonfairTryAcquire(1);
- }
这个很有趣,它并没有走AQS流程,而是走了tryAcquire()的具体实现nonfairTryAcquire(),也就是说,只是根据state的值、线程持有者,来确定是否申请到锁,并没有执行CLH模型的内容,在实际使用时要当心其语义。
Lock接口的tryLock(long time,TimeUnit unit)实现
这个实现主要源码:
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- }
-
- public final boolean tryAcquireNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquire(arg) ||
- doAcquireNanos(arg, nanosTimeout);
- }
-
- private boolean doAcquireNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout;
- final Node node = addWaiter(Node.EXCLUSIVE);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return true;
- }
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L)
- return false;
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > spinForTimeoutThreshold)
- LockSupport.parkNanos(this, nanosTimeout);
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
它实际走的是AQS的带超时时间的申请流程,内部在自旋过程中增加了事件的判断,来决定是否继续等待申请,并且它也是支持线程中断的。
申请部分与acquireQueued()方法如出一辙,将与时间有关的代码列出:
- if (nanosTimeout <= 0L) // 非法值校验
- return false;
- final long deadline = System.nanoTime() + nanosTimeout; // 获取超时时刻
-
- // ...
- nanosTimeout = deadline - System.nanoTime(); // 每自旋一次,计算 剩余申请时间
- if (nanosTimeout <= 0L) // 如果剩余时间<=0,结束
- return false;
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > spinForTimeoutThreshold) // 挂起增加一个条件,剩余时间要大于1000L
- LockSupport.parkNanos(this, nanosTimeout); // 挂起时指定超时时间,超时自动结束挂起状态
- if (Thread.interrupted())
- throw new InterruptedException();
-
-
- static final long spinForTimeoutThreshold = 1000L;
比较特殊的地方是在挂起前判断上,如果剩余时间小于1000L,不会进行挂起操作,而是直接进入下一次循环,这个应该是考虑一次挂起唤醒的过程,耗时较高,剩余时间可能不足,也是为尽可能申请到锁做努力。
Lock接口的unlock()实现
相关源码
- public void unlock() {
- sync.release(1);
- }
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
解锁unlock()很简单,直接调用的是AQS的release(),在Sync中并没有重写。
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
看一下tryRelease()的实现
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases; // 获取需要释放的state数量,与当前state值做差
- if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程没有持有锁
- throw new IllegalMonitorStateException(); // 直接抛出异常
- boolean free = false;
- if (c == 0) { // 如果差值为0,说明已经解锁了
- free = true;
- setExclusiveOwnerThread(null); // 清空锁的持有者
- }
- setState(c); // 修改state的值
- return free;
- }
在tryRelease()中,主要是修改state的值,如果没到0,说明解锁成功了,后面也就不用操作了,如果能修改到0,那么后面还要继续执行AQS的内容,去唤醒后继节点申请锁。
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0) // 判断并修改节点状态
- compareAndSetWaitStatus(node, ws, 0);
-
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next; // 对于后继节点,要遍历出一个正常等待的节点来唤醒
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread); // 唤醒后继节点
- }
Lock接口的newCondition()实现
这个方法就是返回一个Condition对象:
- public Condition newCondition() {
- return sync.newCondition();
- }
- final ConditionObject newCondition() {
- return new ConditionObject();
- }
也就是说,Condition的使用,是通过ReentrantLock实现的,这也进一步验证,在await()和signal()调用的场景,必须是持有锁的场景,而锁,就是创建Condition对象的ReentrantLock锁持有的,这个在应用时一定要注意,不要ReentrantLock1 创建的Condition 在执行await()是,先申请ReentrantLock2的锁,这就有问题了。
FairSync 的定义中,只有lock()和tryAcquire(),其中lock()并没有像非公平同步器NonfairSync中,直接尝试修改资源state,而是直接调用了AQS的acquire()。
- static final class FairSync extends Sync {
- private static final long serialVersionUID = -3000897897090466540L;
-
- final void lock() {
- acquire(1);
- }
-
- /**
- * Fair version of tryAcquire. Don't grant access unless
- * recursive call or no waiters or is first.
- */
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) { // 如果当前没有线程持有锁
- if (!hasQueuedPredecessors() && // 检查同步队列
- compareAndSetState(0, acquires)) { // 设置资源值
- setExclusiveOwnerThread(current); // 如果成功,说明申请锁成功
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) { // 当前是否持有锁
- int nextc = c + acquires; // 重入锁逻辑,直接改变资源值
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- }
在tryAcquire()中,有个!hasQueuedPredecessors()方法,在无锁情况下,由它决定是否能直接去申请锁:
- public final boolean hasQueuedPredecessors() {
- Node t = tail;
- Node h = head;
- Node s;
- return h != t &&
- ((s = h.next) == null || s.thread != Thread.currentThread());
- }
这个方法的目的是判断 同步队列的节点状态,根据状态:返回false,则会直接去申请锁
AQS的同步队列:head节点只是标识,并不记录线程信息,当调用setHead(Node node)时,会清除node的前后节点指针。而enq()方法在初始化同步队列时,是将tail指向了head,而之后添加节点时,是尾插法,也就是head不知道后置节点,而同步队列中的节点都知道自己的前置节点。
结合同步队列添加节点的方法:
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // 初始化队列
- if (compareAndSetHead(new Node()))
- tail = head; // 头尾相等
- } else { // 尾插加入
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
回看释放锁的代码,是没有设置head节点的,也就是说当释放完锁,如果没有后继节点可被唤醒,head节点将保持最后一次加锁时设置的值;也就是除了都为null 以及 首次初始化还未来得及添加节点时head==tail,其他时刻都head!=tail。
分析一下这个方法的判断
- return h != t &&
- ((s = h.next) == null || s.thread != Thread.currentThread());
(h!=t)==true,头尾节点不相等,说明同步队列已经初始化过
(h!=t)==false,头尾节点相等,上面分析的情况【导致外层方法if判断通过,尝试获取锁】
((s = h.next) == null)==true,头节点没有后继节点,可能都已经出队
((s = h.next) == null)==false,头节点有后继节点
(s.thread != Thread.currentThread()),当前线程是否为头节点的后继节点
返回情况有:
上述情况1\2\3下,会导致方法返回false,进而导致外层去直接CAS申请资源;而情况4则会去排队。
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
至于公平同步器的解锁,它直接使用的是Sync的tryRelease(),流程上面已经介绍过。
整个ReentrantLock由两部分组成,一个是实现AQS的Sync同步器,再一个是自身实现的Lock接口,并由Sync同步器去做具体实现。由Sync定义tryAcquire()和tryRelease(),也就是state如何操作、变为何种值才算加锁成功,否则进入AQS的同步队列,排队获取锁。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。