当前位置:   article > 正文

JUC并发编程 - ReentrantLock源码分析_no error in timeoutthreshold

no error in timeoutthreshold

前言

前面分析了AQS类的源码,但真正实现AQS的实现类都在JUC中,当然AQS也是JUC的一部分,只是它不面向应用,除非自己去继承实现一套逻辑。

在java的java.util.concurrent包,简称JUC,其内包含的类都与多线程有关,是非常重要的一个包。接下来准备针对JUC下常用类进行分析,剖析它们的原理及使用特点。而本文将针对比较常用的ReentrantLock源码来分析。

ReentrantLock锁是AQS的一种实现,它做到了可重入、可中断,分为公平和非公平两类实现。在使用时,需要手动调用o.lock()和o.unlock()来加锁和解锁,并且解锁是必须的。这种形式,将加锁、解锁的时机开放给了开发者,因此更加灵活。

类的定义

看一下ReentrantLock类的结构:

public class ReentrantLock implements Lock, java.io.Serializable

并没继承AQS,反而实现了Lock;因为ReentrantLock是锁,实现Lock很自然。看一下Lock接口的定义:

  1. public interface Lock {
  2. void lock();
  3. void lockInterruptibly() throws InterruptedException;
  4. boolean tryLock();
  5. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  6. void unlock();
  7. Condition newCondition();
  8. }

加锁、解锁,还有一个方法用于创建Condition对象,这个在《xxx》有介绍过,可以移步加深一下印象。Condition与本文内容关系不大,回到ReentrantLock,看一下它内部有哪些成员。

成员变量

只有一个Sync类型的sync实例:

private final Sync sync;

内部类

Sync是内部类,它实现了AQS类,既然AQS成为抽象队列同步器,那么我们可以称Sync为队列同步器、同步器。

abstract static class Sync extends AbstractQueuedSynchronizer

另外还有两个内部类,都是继承自Sync:

  1. static final class NonfairSync extends Sync
  2. static final class FairSync extends Sync

从结构及名称看,ReentrantLock实现了两种锁:公平同步器FairSync和非公平同步器NonfairSync,都源自AQS。

构造函数

两个构造函数,默认无参的创建的是非公平同步器,还有一个根据入参来决定同步器类型:

  1. public ReentrantLock() {
  2. sync = new NonfairSync();
  3. }
  4. public ReentrantLock(boolean fair) {
  5. sync = fair ? new FairSync() : new NonfairSync();
  6. }

实例方法

内部定义很多,重点看一下Lock接口的实现方法:

  1. public void lock() {
  2. sync.lock();
  3. }
  4. public void lockInterruptibly() throws InterruptedException {
  5. sync.acquireInterruptibly(1);
  6. }
  7. public boolean tryLock() {
  8. return sync.nonfairTryAcquire(1);
  9. }
  10. public boolean tryLock(long timeout, TimeUnit unit)
  11. throws InterruptedException {
  12. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  13. }
  14. public void unlock() {
  15. sync.release(1);
  16. }
  17. public Condition newCondition() {
  18. return sync.newCondition();
  19. }

发现一个现象,都是调用的sync中的实现。那么后面就对Sync的实现做重点分析。由于ReentrantLock默认实现非公平同步器,那么就逐个分析NonfairSync类的实现。

非公平同步器NonfairSync

Lock接口的Lock()实现

实现的内容真少,看来主要内容还是在Sync里。

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = 7316153563782823691L;
  3. /**
  4. * Performs lock. Try immediate barge, backing up to normal
  5. * acquire on failure.
  6. */
  7. final void lock() {
  8. if (compareAndSetState(0, 1))
  9. setExclusiveOwnerThread(Thread.currentThread());
  10. else
  11. acquire(1);
  12. }
  13. protected final boolean tryAcquire(int acquires) {
  14. return nonfairTryAcquire(acquires);
  15. }
  16. }

从加锁lock()方法看到,如果AQS.state=0,并且可以设置为1,则设置线程持有者为自己。

逐个逻辑第一次看可能有疑问,为什么state设置为1就加锁成功了? 因为ReentrantLock是排它锁的实现,而“非公平”的特点就是不管AQS排队那一套,只要现在state=0,我就先去改一下值,改成功了锁就是抢到了,否则再去走AQS.acquire(1)的流程。

看一看AQS的acquire():

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && // t1
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

在t1位置,AQS并没有实现tryAcquire()方法,而是由Sync实现的,这在上面NonfairSync有,而且它调用的是Sync的nonfairTryAcquire()方法,调用链是:

lock()-->acquire()-->tryAcquire()-->nonfairTryAcquire()-->acquire()

看一下nonfairTryAcquire()方法的实现

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. else if (current == getExclusiveOwnerThread()) {
  11. int nextc = c + acquires;
  12. if (nextc < 0) // overflow
  13. throw new Error("Maximum lock count exceeded");
  14. setState(nextc);
  15. return true;
  16. }
  17. return false;
  18. }

内容并不复杂,根据AQS中state的值,以及排它锁持有者,来决定不同结果。

  1. 1、state==0,无锁,直接CAS设置state的值,成功了就说明拿到锁,否则肯定有其他线程申请到了。
  2. 2、state!=0,有锁,如果持有者是自己,则对state的值累加,并且返回成功true

在2中的累加,含义是ReentrantLock是一个可重入锁,持有锁的线程可以多次申请锁,但释放锁测次数要与申请次数相等,才能真正释放锁。

以上就是尝试获取非公平锁tryAcquire的过程,再结合AQS中acquire()的实现,梳理下整个申请锁过程。

介绍下AQS的大致结构:

  1. 1、包含头尾指针的同步队列head、tail
  2. 2、同步队列的节点类Node,内部包含Thread线程对象、waitState节点状态、同步队列的前后指针prev、next
  3. 3、资源值state,将其成功改变的线程将会持有锁

在AQS中,参与申请锁流程的逻辑是, 申请锁的线程会被封装为node对象,加入同步队列中,之后会被挂起,当线程被唤醒后,会CAS方式修改state的值,也就是3中的流程;否则会再次挂起,这种自旋会一直持续,直到申请锁成功、取消申请、线程中断。

而申请流程的源码如下,注意看注释信息:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && // 如果前面返回false未获得锁,则进acquireQueued()
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter加入同步队列,再申请锁
  4. selfInterrupt(); // 中断线程;当申请完成后,会返回线程状态,true==线程中断了,false==线程未中断
  5. }
  6. // 加入同步队列
  7. private Node addWaiter(Node mode) {
  8. Node node = new Node(Thread.currentThread(), mode); // 封装当前线程
  9. // Try the fast path of enq; backup to full enq on failure
  10. Node pred = tail; // 尾插法,直接加入同步队列
  11. if (pred != null) {
  12. node.prev = pred;
  13. if (compareAndSetTail(pred, node)) {
  14. pred.next = node;
  15. return node;
  16. }
  17. }
  18. enq(node); // 再次for(;;)尝试加入同步队列
  19. return node;
  20. }
  21. private Node enq(final Node node) {
  22. for (;;) {
  23. Node t = tail;
  24. if (t == null) { // Must initialize
  25. if (compareAndSetHead(new Node()))
  26. tail = head;
  27. } else {
  28. node.prev = t;
  29. if (compareAndSetTail(t, node)) {
  30. t.next = node;
  31. return t;
  32. }
  33. }
  34. }
  35. }
  36. final boolean acquireQueued(final Node node, int arg) {
  37. boolean failed = true;
  38. try {
  39. boolean interrupted = false;
  40. for (;;) {
  41. final Node p = node.predecessor();
  42. if (p == head && tryAcquire(arg)) { // 又调用tryAcquire()
  43. setHead(node);
  44. p.next = null; // help GC
  45. failed = false;
  46. return interrupted;
  47. }
  48. // 以上,让同步队列头节点的后继节点,尝试获取锁,成功的话将node调整为头节点
  49. // 以下,获取锁失败,将线程挂起,等待被唤醒
  50. if (shouldParkAfterFailedAcquire(p, node) && // 检查node状态,决定是否应该被挂起
  51. parkAndCheckInterrupt()) // 挂起线程,并在被唤醒后检查线程是否中断
  52. interrupted = true; // parkAndCheckInterrupt返回true,代表线程中断了
  53. }
  54. } finally {
  55. if (failed)
  56. cancelAcquire(node);
  57. }
  58. }

AQS详细源码分析,请移步《面试必考AQS-排它锁的申请与释放》等系列文章。

这里再梳理一次非公平排它锁的调用:

  1. public final void acquire(int arg){...} // 获取排它锁的入口
  2. # protected boolean tryAcquire(int arg); // 尝试直接获取锁,这里可以替换为nonfairTryAcquire()
  3. final boolean nonfairTryAcquire(int acquires) {...} // 非公平排它锁实现tryAcquire(),执行完毕回到AQS流程
  4. final boolean acquireQueued(final Node node, int arg) {...} // AQS中获取排它锁流程整合
  5. private Node addWaiter(Node mode){...} // 将node加入到同步队列的尾部
  6. # protected boolean tryAcquire(int arg); // 如果当前node的前置节点pre变为了head节点,则尝试获取锁(因为head可能正在释放)
  7. private void setHead(Node node) {...} // 设置 同步队列的head节点
  8. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果获取锁失败,则整理队中节点状态,并判断是否要将线程挂起
  9. private final boolean parkAndCheckInterrupt() {...} // 将线程挂起,并在挂起被唤醒后检查是否要中断线程(返回是否中断)
  10. private void cancelAcquire(Node node) {...} // 取消当前节点获取排它锁,将其从同步队列中移除
  11. static void selfInterrupt() {...} // 操作将当前线程中断

Lock接口的lockInterruptibly()是申请锁过程允许中断,当检测到线程中断时会抛出异常。

public void lockInterruptibly() throws InterruptedException {...}

具体逻辑与acquire()差别不太大,感兴趣可以自行分析。

Lock接口的tryLock()实现

  1. public boolean tryLock() {
  2. return sync.nonfairTryAcquire(1);
  3. }

这个很有趣,它并没有走AQS流程,而是走了tryAcquire()的具体实现nonfairTryAcquire(),也就是说,只是根据state的值、线程持有者,来确定是否申请到锁,并没有执行CLH模型的内容,在实际使用时要当心其语义。

Lock接口的tryLock(long time,TimeUnit unit)实现

这个实现主要源码:

  1. public boolean tryLock(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  4. }
  5. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  6. throws InterruptedException {
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. return tryAcquire(arg) ||
  10. doAcquireNanos(arg, nanosTimeout);
  11. }
  12. private boolean doAcquireNanos(int arg, long nanosTimeout)
  13. throws InterruptedException {
  14. if (nanosTimeout <= 0L)
  15. return false;
  16. final long deadline = System.nanoTime() + nanosTimeout;
  17. final Node node = addWaiter(Node.EXCLUSIVE);
  18. boolean failed = true;
  19. try {
  20. for (;;) {
  21. final Node p = node.predecessor();
  22. if (p == head && tryAcquire(arg)) {
  23. setHead(node);
  24. p.next = null; // help GC
  25. failed = false;
  26. return true;
  27. }
  28. nanosTimeout = deadline - System.nanoTime();
  29. if (nanosTimeout <= 0L)
  30. return false;
  31. if (shouldParkAfterFailedAcquire(p, node) &&
  32. nanosTimeout > spinForTimeoutThreshold)
  33. LockSupport.parkNanos(this, nanosTimeout);
  34. if (Thread.interrupted())
  35. throw new InterruptedException();
  36. }
  37. } finally {
  38. if (failed)
  39. cancelAcquire(node);
  40. }
  41. }

它实际走的是AQS的带超时时间的申请流程,内部在自旋过程中增加了事件的判断,来决定是否继续等待申请,并且它也是支持线程中断的。

申请部分与acquireQueued()方法如出一辙,将与时间有关的代码列出:

  1. if (nanosTimeout <= 0L) // 非法值校验
  2. return false;
  3. final long deadline = System.nanoTime() + nanosTimeout; // 获取超时时刻
  4. // ...
  5. nanosTimeout = deadline - System.nanoTime(); // 每自旋一次,计算 剩余申请时间
  6. if (nanosTimeout <= 0L) // 如果剩余时间<=0,结束
  7. return false;
  8. if (shouldParkAfterFailedAcquire(p, node) &&
  9. nanosTimeout > spinForTimeoutThreshold) // 挂起增加一个条件,剩余时间要大于1000L
  10. LockSupport.parkNanos(this, nanosTimeout); // 挂起时指定超时时间,超时自动结束挂起状态
  11. if (Thread.interrupted())
  12. throw new InterruptedException();
  13. static final long spinForTimeoutThreshold = 1000L;

比较特殊的地方是在挂起前判断上,如果剩余时间小于1000L,不会进行挂起操作,而是直接进入下一次循环,这个应该是考虑一次挂起唤醒的过程,耗时较高,剩余时间可能不足,也是为尽可能申请到锁做努力。

Lock接口的unlock()实现

相关源码

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. public final boolean release(int arg) {
  5. if (tryRelease(arg)) {
  6. Node h = head;
  7. if (h != null && h.waitStatus != 0)
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }

解锁unlock()很简单,直接调用的是AQS的release(),在Sync中并没有重写。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

看一下tryRelease()的实现

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases; // 获取需要释放的state数量,与当前state值做差
  3. if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程没有持有锁
  4. throw new IllegalMonitorStateException(); // 直接抛出异常
  5. boolean free = false;
  6. if (c == 0) { // 如果差值为0,说明已经解锁了
  7. free = true;
  8. setExclusiveOwnerThread(null); // 清空锁的持有者
  9. }
  10. setState(c); // 修改state的值
  11. return free;
  12. }

在tryRelease()中,主要是修改state的值,如果没到0,说明解锁成功了,后面也就不用操作了,如果能修改到0,那么后面还要继续执行AQS的内容,去唤醒后继节点申请锁。

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0) // 判断并修改节点状态
  9. compareAndSetWaitStatus(node, ws, 0);
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next; // 对于后继节点,要遍历出一个正常等待的节点来唤醒
  17. if (s == null || s.waitStatus > 0) {
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)
  20. if (t.waitStatus <= 0)
  21. s = t;
  22. }
  23. if (s != null)
  24. LockSupport.unpark(s.thread); // 唤醒后继节点
  25. }

Lock接口的newCondition()实现

这个方法就是返回一个Condition对象:

  1. public Condition newCondition() {
  2. return sync.newCondition();
  3. }
  4. final ConditionObject newCondition() {
  5. return new ConditionObject();
  6. }

也就是说,Condition的使用,是通过ReentrantLock实现的,这也进一步验证,在await()和signal()调用的场景,必须是持有锁的场景,而锁,就是创建Condition对象的ReentrantLock锁持有的,这个在应用时一定要注意,不要ReentrantLock1 创建的Condition 在执行await()是,先申请ReentrantLock2的锁,这就有问题了。

公平同步器FairSync

FairSync 的定义中,只有lock()和tryAcquire(),其中lock()并没有像非公平同步器NonfairSync中,直接尝试修改资源state,而是直接调用了AQS的acquire()。

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. /**
  7. * Fair version of tryAcquire. Don't grant access unless
  8. * recursive call or no waiters or is first.
  9. */
  10. protected final boolean tryAcquire(int acquires) {
  11. final Thread current = Thread.currentThread();
  12. int c = getState();
  13. if (c == 0) { // 如果当前没有线程持有锁
  14. if (!hasQueuedPredecessors() && // 检查同步队列
  15. compareAndSetState(0, acquires)) { // 设置资源值
  16. setExclusiveOwnerThread(current); // 如果成功,说明申请锁成功
  17. return true;
  18. }
  19. }
  20. else if (current == getExclusiveOwnerThread()) { // 当前是否持有锁
  21. int nextc = c + acquires; // 重入锁逻辑,直接改变资源值
  22. if (nextc < 0)
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }
  27. return false;
  28. }
  29. }

在tryAcquire()中,有个!hasQueuedPredecessors()方法,在无锁情况下,由它决定是否能直接去申请锁:

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail;
  3. Node h = head;
  4. Node s;
  5. return h != t &&
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }

这个方法的目的是判断 同步队列的节点状态,根据状态:返回false,则会直接去申请锁

AQS的同步队列:head节点只是标识,并不记录线程信息,当调用setHead(Node node)时,会清除node的前后节点指针。而enq()方法在初始化同步队列时,是将tail指向了head,而之后添加节点时,是尾插法,也就是head不知道后置节点,而同步队列中的节点都知道自己的前置节点。

 结合同步队列添加节点的方法:

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // 初始化队列
  5. if (compareAndSetHead(new Node()))
  6. tail = head; // 头尾相等
  7. } else { // 尾插加入
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

回看释放锁的代码,是没有设置head节点的,也就是说当释放完锁,如果没有后继节点可被唤醒,head节点将保持最后一次加锁时设置的值;也就是除了都为null 以及 首次初始化还未来得及添加节点时head==tail,其他时刻都head!=tail。

分析一下这个方法的判断

  1. return h != t &&
  2. ((s = h.next) == null || s.thread != Thread.currentThread());

(h!=t)==true,头尾节点不相等,说明同步队列已经初始化过

(h!=t)==false,头尾节点相等,上面分析的情况【导致外层方法if判断通过,尝试获取锁】

((s = h.next) == null)==true,头节点没有后继节点,可能都已经出队

((s = h.next) == null)==false,头节点有后继节点

(s.thread != Thread.currentThread()),当前线程是否为头节点的后继节点

返回情况有:

  1. 队列还未初始化 false && (who care!)
  2. 队列已经初始化过,并且 head没有后继节点 true && (false || who care!)
  3. 当前队列存在有效节点s,并且s的线程与当前线程相同 ->这种情况我不认为存在,不可能一个线程还在排队,又操作一次申请锁 true && (true || false)
  4. 当前队列存在有效节点s,并且s的线程与当前线程不相同 --> 这种情况 会让当前线程进入同步队列,这种情况是在同步队列中有节点正在申请锁,而还未申请完成state==0,又有新线程来竞争,这种情况必须入队。 true && (true || true)

上述情况1\2\3下,会导致方法返回false,进而导致外层去直接CAS申请资源;而情况4则会去排队。

  1. if (!hasQueuedPredecessors() &&
  2. compareAndSetState(0, acquires)) {
  3. setExclusiveOwnerThread(current);
  4. return true;
  5. }

至于公平同步器的解锁,它直接使用的是Sync的tryRelease(),流程上面已经介绍过。

尾声

整个ReentrantLock由两部分组成,一个是实现AQS的Sync同步器,再一个是自身实现的Lock接口,并由Sync同步器去做具体实现。由Sync定义tryAcquire()和tryRelease(),也就是state如何操作、变为何种值才算加锁成功,否则进入AQS的同步队列,排队获取锁。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/966814
推荐阅读
相关标签
  

闽ICP备14008679号