当前位置:   article > 正文

JUC并发编程 - ReentrantReadWriteLock源码分析

JUC并发编程 - ReentrantReadWriteLock源码分析

前言

ReentrantReadWriteLock锁是AQS的另一种实现,它做到了可重入、可中断,分为公平和非公平两类实现,并且实现了读锁和写锁两类同时控制。在使用时,读写锁持有的同一个Lock实例,通过控制锁的行为,及CLH节点状态,来操作读锁和写锁,对于写少读多的场景,能提高效率。

由于我对AQS及ReentrantLock的源码已经做过分析,因此本篇文章对经常出现的代码流程介绍的会较为粗略,初次接触ReentrantReadWriteLock源码的同学,建议先从前面的文章看起,这样反而更能清晰的理解。

面试必考AQS-AQS源码全局分析​

面试必考AQS-排它锁的申请与释放​

面试必考AQS-共享锁的申请与释放,传播状态

类的定义

读写锁ReentrantReadWriteLock,实现接口ReadWriteLock,其内部定义了两个方法,分别获取读锁和写锁。

  1. public interface ReadWriteLock {
  2. Lock readLock();
  3. Lock writeLock();
  4. }

看一下ReentrantReadWriteLock内部相关实现:

  1. public class ReentrantReadWriteLock
  2. implements ReadWriteLock, java.io.Serializable {
  3. private final ReentrantReadWriteLock.ReadLock readerLock;
  4. private final ReentrantReadWriteLock.WriteLock writerLock;
  5. final Sync sync;
  6. public ReentrantReadWriteLock() {
  7. this(false);
  8. }
  9. public ReentrantReadWriteLock(boolean fair) {
  10. sync = fair ? new FairSync() : new NonfairSync();
  11. readerLock = new ReadLock(this);
  12. writerLock = new WriteLock(this);
  13. }
  14. public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
  15. public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  16. // ...
  17. }

与ReadWriteLock接口相关的代码,包含两个锁的实例、一个同步器Sync、两个构造函数,其中无参构造函数默认实现了非公平同步器;初始化两个读写锁实例时,将this传入;接下来看一下读写锁类的实现。

实现Lock接口,内部持有一个Sync实例,由构造函数注入。

  1. public static class ReadLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. protected ReadLock(ReentrantReadWriteLock lock) {
  4. sync = lock.sync;
  5. }
  6. // ...
  7. }

在翻看ReadLock的实例方法,都是实现的Lock接口,与ReentrantLock相似。而WriteLock的定义与ReadLock差不多,具体分析时再看。

直接进入重点AQS类的实现类Sync,在ReentrantReadWriteLock中,与ReentrantLock类似,分为公平同步器和非公平同步器,并且默认实现的是非公平同步器。

abstract static class Sync extends AbstractQueuedSynchronizer {...}

在Sync中定义了一组常量及方法,用于记录读锁和写锁数量,它们是:

  1. static final int SHARED_SHIFT = 16; // 共享锁偏移量
  2. static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1左移16位,共享锁的位置
  3. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 允许申请锁的最大数量
  4. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 得到低16位补码;这样的int值EXCLUSIVE_MASK的高16位都是0,低16位都是1
  5. /** Returns the number of shared holds represented in count */
  6. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
  7. /** Returns the number of exclusive holds represented in count */
  8. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

对于AQS的state值,存储的是一个32位int,在ReentrantReadWriteLock中,将高16位记录读锁数量,低16位记录写锁数量。

通过位移等运算,快速的得到需要的值,如sharedCount(int c),传入的c既是state值,通过无符号右移16位,快速得到读锁/共享锁的数量;

而exclusiveCount(int c)方法,传入state的值c,通过与EXCLUSIVE_MASK做&操作,能快速得到低16位的值是多少,也就是写锁/排它锁的数量。

  1. c = 0110 0011 0000 1001 0000 0000 0000 0011
  2. c & 0000 0000 0000 0000 1111 1111 1111 1111
  3. 0000 0000 0000 0000 0000 0000 0000 0011 // 因此有4个排它锁


在这之后,还定义了四个成员变量:

  1. private transient ThreadLocalHoldCounter readHolds;
  2. private transient HoldCounter cachedHoldCounter;
  3. private transient Thread firstReader = null;
  4. private transient int firstReaderHoldCount;

他们的作用在后面使用到的时候再介绍,这里看类的实现:

  1. static final class HoldCounter {
  2. int count = 0;
  3. final long tid = getThreadId(Thread.currentThread());
  4. }
  5. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  6. public HoldCounter initialValue() {
  7. return new HoldCounter();
  8. }
  9. }
  10. static final long getThreadId(Thread thread) {
  11. return UNSAFE.getLongVolatile(thread, TID_OFFSET);
  12. }

实现了ThreadLocal<?> ,记录了count和tid两个值,看样子是保持某些信息的。

在Sync中还有两个抽象方法

  1. abstract boolean readerShouldBlock();
  2. abstract boolean writerShouldBlock();

既然默认实现的是非公平同步器,那么我们来看一下NofairSync的实现,内容很简单:

  1. static final class NonfairSync extends Sync {
  2. final boolean writerShouldBlock() {
  3. return false; // 永远返回false
  4. }
  5. final boolean readerShouldBlock() {
  6. return apparentlyFirstQueuedIsExclusive(); // 调用一个AQS方法
  7. }
  8. }
  9. final boolean apparentlyFirstQueuedIsExclusive() {
  10. Node h, s;
  11. return (h = head) != null &&
  12. (s = h.next) != null &&
  13. !s.isShared() &&
  14. s.thread != null;
  15. }

同步队列的首个节点不为空、队列不止一个节点、非head节点不是共享(存在排它锁等待)、非head节点的线程不为空。以上条件都满足,则认为当前读锁需要阻塞。

零散的源码介绍的差不多,到这应该已经一头雾水了,没关系,下面开始由读写锁的行为分析源码设计,并将上述内容串起来。

非公平同步器实现的写锁

定义了一个Sync类型对象sync,由构造函数传入,也就是这里定义了当前写锁是由“公平同步器实现”还是由“非公平同步器实现”实现的。

  1. private final Sync sync;
  2. protected WriteLock(ReentrantReadWriteLock lock) {
  3. sync = lock.sync;
  4. }

排它锁加锁流程分析

在WriteLock中,加锁方法直接使用的是sync.acquire(1); 追溯起来就是AQS的acquire()申请流程。

  1. public void lock() {
  2. sync.acquire(1);
  3. }
  4. public final void acquire(int arg) {
  5. if (!tryAcquire(arg) &&
  6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  7. selfInterrupt();
  8. }

那么tryAcquire()的实现,则是Sync当中了:

  1. protected final boolean tryAcquire(int acquires) {
  2. /*
  3. * Walkthrough:
  4. * 1. If read count nonzero or write count nonzero
  5. * and owner is a different thread, fail.
  6. * 2. If count would saturate, fail. (This can only
  7. * happen if count is already nonzero.)
  8. * 3. Otherwise, this thread is eligible for lock if
  9. * it is either a reentrant acquire or
  10. * queue policy allows it. If so, update state
  11. * and set owner.
  12. */
  13. Thread current = Thread.currentThread();
  14. int c = getState();
  15. int w = exclusiveCount(c);
  16. if (c != 0) {
  17. // (如果 c != 0 并且写锁数量为0,那么读锁数量肯定不为0)
  18. if (w == 0 || current != getExclusiveOwnerThread())
  19. return false;
  20. if (w + exclusiveCount(acquires) > MAX_COUNT)
  21. throw new Error("Maximum lock count exceeded");
  22. // Reentrant acquire
  23. setState(c + acquires);
  24. return true;
  25. }
  26. if (writerShouldBlock() ||
  27. !compareAndSetState(c, c + acquires))
  28. return false;
  29. setExclusiveOwnerThread(current);
  30. return true;
  31. }

方法内有大段的注释,翻译一下:

1、如果读锁数量不为0 或者 写锁数量不为零,并且 线程拥有者不是当前线程,获取失败
2、如果锁的数量已经达到最大值,获取失败
3、以上不满足,说明 可以申请锁,如果设置state成功,说明获取成功

这3条,囊括了tryAcquire()方法c!=0的大部分逻辑。

在代码中,if (w == 0 || current != getExclusiveOwnerThread()) 的潜在含义有2点:

1、当读写锁中存在读锁时,是不能直接申请到写锁的;
2、读锁是可以重入的

当c==0的情况下,也就是读写锁都为0,此时做了如下处理:

  1. if (writerShouldBlock() ||
  2. !compareAndSetState(c, c + acquires))
  3. return false;
  4. setExclusiveOwnerThread(current);
  5. return true;

由于当前分析的是非公平同步器的实现,因此writerShouldBlock()==false。那么这段的逻辑就是 尝试直接修改state的值,如果设置成功,那么申请锁成功,设置排他线程拥有者、返回true。

在调用完tryAcquire(),后续流程就是AQS标准流程,这个在前文中已经详细描述过,这里不再赘述。

排它锁的tryLock

在WriteLock中实现的tryLock()方法,它的主逻辑是由Sync实现的。

  1. public boolean tryLock( ) {
  2. return sync.tryWriteLock();
  3. }
  4. final boolean tryWriteLock() {
  5. Thread current = Thread.currentThread(); // 获取当前线程
  6. int c = getState();// 获取资源
  7. if (c != 0) { // 有锁
  8. int w = exclusiveCount(c); // 计算排它锁数量
  9. if (w == 0 || current != getExclusiveOwnerThread()) // 与tryAcquire逻辑一致
  10. return false;
  11. if (w == MAX_COUNT)
  12. throw new Error("Maximum lock count exceeded");
  13. }
  14. if (!compareAndSetState(c, c + 1))
  15. return false;
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }

在tryWriteLock()方法中,相当于是单独执行了一次tryAcquire(),并且只是对state做+1操作。只是不需要考虑排它锁的阻塞情况,这也进一步说明“非公平”的特点。

排它锁带超时的tryLock

带有超时时间的tryLock,实现流程较为复杂,先看一下基本的调用关系:

  1. public boolean tryLock(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  4. }
  5. public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. return tryAcquire(arg) ||
  9. doAcquireNanos(arg, nanosTimeout);
  10. }

执行同步器的tryAcquireNanos()方法,具体的会执行tryAcquire()尝试直接申请一次锁,如果不成功会执行AQS的doAcquireNanos()。

  1. private boolean doAcquireNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. final long deadline = System.nanoTime() + nanosTimeout;
  6. final Node node = addWaiter(Node.EXCLUSIVE);
  7. boolean failed = true;
  8. try {
  9. for (;;) {
  10. final Node p = node.predecessor();
  11. if (p == head && tryAcquire(arg)) {
  12. setHead(node);
  13. p.next = null; // help GC
  14. failed = false;
  15. return true;
  16. }
  17. nanosTimeout = deadline - System.nanoTime();
  18. if (nanosTimeout <= 0L)
  19. return false;
  20. if (shouldParkAfterFailedAcquire(p, node) &&
  21. nanosTimeout > spinForTimeoutThreshold)
  22. LockSupport.parkNanos(this, nanosTimeout);
  23. if (Thread.interrupted())
  24. throw new InterruptedException();
  25. }
  26. } finally {
  27. if (failed)
  28. cancelAcquire(node);
  29. }
  30. }

这个方法在分析AQS时做过详细分析,这里再写一下粗略的过程:

1、将当前线程封装为node节点,并加入同步队列,计算超时时间点
2、如果node的前置节点是head,则尝试申请锁tryAcquire(),否则进入3
3、当前时间如果未超时,并且剩余时间大于阈值,则将node的线程挂起,等待被唤醒或超时中断
4、被唤醒后重复申请直到申请成功或线程被中断

排它锁解锁流程分析

调用流程与加锁差不多,通过AQS的release实现,并且自定义了tryRelease():

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. protected final boolean tryRelease(int releases) {
  5. if (!isHeldExclusively()) // 根据持有线程,拦截非法释放操作
  6. throw new IllegalMonitorStateException();
  7. // 后续代码,都是有效线程内操作
  8. int nextc = getState() - releases; // 计算释放后资源的值
  9. boolean free = exclusiveCount(nextc) == 0; // 计算释放后,排它锁的数量
  10. if (free) // 如果排它锁数量为0,则清空持有的线程
  11. setExclusiveOwnerThread(null);
  12. setState(nextc); // 设置资源state的值
  13. return free; // 返回排它锁是否还持有
  14. }
  15. // 当前线程是否持有排它锁
  16. protected final boolean isHeldExclusively() {
  17. return getExclusiveOwnerThread() == Thread.currentThread();
  18. }

由此可知tryRelease()的逻辑很简单,只是操作state的值,以及释放后对 写锁值的检查。

到这里,WriteLock中主要成员已经分析完成。

非公平同步器实现的读锁

定义了一个Sync类型对象sync,由构造函数传入,也就是这里定义了当前写锁是由“公平同步器实现”还是由“非公平同步器实现”实现的。

  1. private final Sync sync;
  2. protected ReadLock(ReentrantReadWriteLock lock) {
  3. sync = lock.sync;
  4. }

共享锁加锁流程分析

在WriteLock中,加锁方法直接使用的是sync.acquire(1); 追溯起来就是AQS的acquireShared()申请流程。

  1. public void lock() {
  2. sync.acquireShared(1);
  3. }
  4. public final void acquireShared(int arg) {
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireShared(arg);
  7. }

那么tryAcquireShared()的实现,则是Sync当中了:

  1. protected final int tryAcquireShared(int unused) {
  2. /*
  3. * Walkthrough:
  4. * 1. If write lock held by another thread, fail.
  5. * 2. Otherwise, this thread is eligible for
  6. * lock wrt state, so ask if it should block
  7. * because of queue policy. If not, try
  8. * to grant by CASing state and updating count.
  9. * Note that step does not check for reentrant
  10. * acquires, which is postponed to full version
  11. * to avoid having to check hold count in
  12. * the more typical non-reentrant case.
  13. * 3. If step 2 fails either because thread
  14. * apparently not eligible or CAS fails or count
  15. * saturated, chain to version with full retry loop.
  16. */
  17. Thread current = Thread.currentThread();
  18. int c = getState();
  19. if (exclusiveCount(c) != 0 &&
  20. getExclusiveOwnerThread() != current)
  21. return -1;
  22. int r = sharedCount(c);
  23. if (!readerShouldBlock() &&
  24. r < MAX_COUNT &&
  25. compareAndSetState(c, c + SHARED_UNIT)) {
  26. if (r == 0) {
  27. firstReader = current;
  28. firstReaderHoldCount = 1;
  29. } else if (firstReader == current) {
  30. firstReaderHoldCount++;
  31. } else {
  32. HoldCounter rh = cachedHoldCounter;
  33. if (rh == null || rh.tid != getThreadId(current))
  34. cachedHoldCounter = rh = readHolds.get();
  35. else if (rh.count == 0)
  36. readHolds.set(rh);
  37. rh.count++;
  38. }
  39. return 1;
  40. }
  41. return fullTryAcquireShared(current);
  42. }

同样,方法内有大段的注释,翻译一下:

1、如果存在写锁并被其他线程持有,获取失败
2、读锁是否阻塞、读锁数量是否超过最大值,能否直接修改state
3、如果修改state值成功,根据读锁数量、线程对象等信息,记录一些状态值

其中第二点在修改state时,采用的是compareAndSetState(c, c + SHARED_UNIT),其中c + SHARED_UNIT的含义是:

由于c是当前state的值,它包含读锁(高16位)和写锁(低16位)两部分,如果直接c+1得到的是写锁数量+1,而我们需要对读锁+1时,就要针对高16位进行操作,而SHARED_UNIT=1<<16,它就是读锁的+1标准值。

因此compareAndSetState(c, c + SHARED_UNIT) 的含义类似于compareAndSetState(r, r + 1).

当成个获取到读锁后,需要记录一些信息,在前文我们简单介绍过几个变量及方法,在这里都用到了。

  1. static final class HoldCounter {
  2. int count = 0; // 数量
  3. final long tid = getThreadId(Thread.currentThread()); // 线程id
  4. }
  5. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  6. public HoldCounter initialValue() {
  7. return new HoldCounter();
  8. }
  9. }
  10. Sync() {
  11. readHolds = new ThreadLocalHoldCounter();
  12. //...
  13. }
  14. private transient ThreadLocalHoldCounter readHolds; // 记录线程共享锁信息:HoldCounter
  15. private transient HoldCounter cachedHoldCounter; // 当前操作读锁的计数器
  16. private transient Thread firstReader = null; // 首个获取读锁的线程
  17. private transient int firstReaderHoldCount; // 首个获取读锁的线程的 读锁计数器
  18. static final long getThreadId(Thread thread) {
  19. return UNSAFE.getLongVolatile(thread, TID_OFFSET); // 获取线程ID
  20. }

回到tryAcquireShared()当中:

  1. if (r == 0) { // 读锁在申请前数量为0
  2. firstReader = current; // 记录当前线程
  3. firstReaderHoldCount = 1; // 记录读锁申请数量
  4. } else if (firstReader == current) { // 如果r!=0 并且第一个读锁申请者是当前线程
  5. firstReaderHoldCount++; // 累加读锁数量
  6. } else {
  7. HoldCounter rh = cachedHoldCounter; // 获取状态缓存对象
  8. if (rh == null || rh.tid != getThreadId(current)) // 如果线程id与缓存中不一致
  9. cachedHoldCounter = rh = readHolds.get(); // 从ThreadLocal中获取当前线程的计数器对象
  10. else if (rh.count == 0)
  11. readHolds.set(rh); // 设置rh
  12. rh.count++; // 累加读锁数量
  13. }

也就是说,共享锁在持有过程中,同步器会记录:

1、持有共享锁的线程、持有的数量
2、首个持有共享锁的线程、持有的数量

在申请到共享锁后,会更新它们的值;猜测释放时,也会更新。

在tryAcquireShared()方法的底部,还有一个fullTryAcquireShared()的调用,如果当前同步队列的头结点不为空、队列中有等待的节点、等待节点申请的锁是排它锁时,那么共享锁的申请需要进行排队,而不是直接申请。而排队申请的处理,就在fullTryAcquireShared()方法中。

  1. final int fullTryAcquireShared(Thread current) {
  2. /*
  3. * This code is in part redundant with that in
  4. * tryAcquireShared but is simpler overall by not
  5. * complicating tryAcquireShared with interactions between
  6. * retries and lazily reading hold counts.
  7. */
  8. HoldCounter rh = null;
  9. for (;;) { // 自旋
  10. int c = getState();
  11. if (exclusiveCount(c) != 0) { // 存在排它锁
  12. if (getExclusiveOwnerThread() != current) // 并不是当前线程持有排它锁
  13. return -1; // 无法申请读锁
  14. } else if (readerShouldBlock()) { // 读锁申请是否阻塞,也就是同步队列中是否存在写锁申请
  15. // 如果申请读锁阻塞了,也就是有写锁在前面排队,那么就要考虑中断申请读锁了。
  16. if (firstReader == current) {
  17. } else {
  18. if (rh == null) {
  19. rh = cachedHoldCounter;
  20. if (rh == null || rh.tid != getThreadId(current)) {
  21. rh = readHolds.get();
  22. if (rh.count == 0)
  23. readHolds.remove();
  24. }
  25. // 清理无效缓存
  26. }
  27. if (rh.count == 0) // 如果当前线程读锁计数器为0,说明未申请到锁
  28. return -1;
  29. }
  30. }
  31. // 整理、统计 读锁计数器
  32. if (sharedCount(c) == MAX_COUNT)
  33. throw new Error("Maximum lock count exceeded");
  34. // 这里的else if(xxx)判断,则是整个for(;;)自旋的基础
  35. if (compareAndSetState(c, c + SHARED_UNIT)) { // 申请读锁资源
  36. if (sharedCount(c) == 0) { //如果申请前读锁数量为0
  37. firstReader = current;
  38. firstReaderHoldCount = 1;
  39. } else if (firstReader == current) { // 如果首个读锁申请者等于当前线程
  40. firstReaderHoldCount++;
  41. } else { // 当前线程并非首个申请读锁的线程,从ThreadLocal对象中获取计数器并更新
  42. if (rh == null)
  43. rh = cachedHoldCounter;
  44. if (rh == null || rh.tid != getThreadId(current))
  45. rh = readHolds.get();
  46. else if (rh.count == 0)
  47. readHolds.set(rh);
  48. rh.count++;
  49. cachedHoldCounter = rh; // cache for release
  50. }
  51. return 1;
  52. }
  53. }
  54. }

当遇到readerShouldBlock时,ReadLock进入自旋方式,尝试申请锁。

共享锁的tryLock

在ReadLock中实现的tryLock()方法,它的主逻辑是由Sync实现的。

  1. public boolean tryLock() {
  2. return sync.tryReadLock();
  3. }
  4. final boolean tryReadLock() {
  5. Thread current = Thread.currentThread();
  6. for (;;) {
  7. int c = getState();
  8. if (exclusiveCount(c) != 0 &&
  9. getExclusiveOwnerThread() != current)
  10. return false;
  11. int r = sharedCount(c);
  12. if (r == MAX_COUNT)
  13. throw new Error("Maximum lock count exceeded");
  14. if (compareAndSetState(c, c + SHARED_UNIT)) {
  15. if (r == 0) {
  16. firstReader = current;
  17. firstReaderHoldCount = 1;
  18. } else if (firstReader == current) {
  19. firstReaderHoldCount++;
  20. } else {
  21. HoldCounter rh = cachedHoldCounter;
  22. if (rh == null || rh.tid != getThreadId(current))
  23. cachedHoldCounter = rh = readHolds.get();
  24. else if (rh.count == 0)
  25. readHolds.set(rh);
  26. rh.count++;
  27. }
  28. return true;
  29. }
  30. }
  31. }

在tryReadLock()方法中,相当于是单独执行了一次tryAcquireShared(),并且只是对state高16位做+1操作。只是不需要考虑共享锁的阻塞情况,也就没有自旋等待过程,这也进一步说明“非公平”的特点。

共享锁带超时的tryLock

带有超时时间的tryLock,实现流程较为复杂,先看一下基本的调用关系:

  1. public boolean tryLock(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  4. }
  5. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
  6. if (Thread.interrupted())
  7. throw new InterruptedException();
  8. return tryAcquireShared(arg) >= 0 ||
  9. doAcquireSharedNanos(arg, nanosTimeout);
  10. }

执行同步器的tryAcquireSharedNanos()方法,具体的会执行tryAcquireShared()尝试直接申请一次锁,如果不成功会执行AQS的doAcquireSharedNanos()。

  1. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. final long deadline = System.nanoTime() + nanosTimeout;
  6. final Node node = addWaiter(Node.SHARED);
  7. boolean failed = true;
  8. try {
  9. for (;;) {
  10. final Node p = node.predecessor();
  11. if (p == head) {
  12. int r = tryAcquireShared(arg);
  13. if (r >= 0) {
  14. setHeadAndPropagate(node, r);
  15. p.next = null; // help GC
  16. failed = false;
  17. return true;
  18. }
  19. }
  20. nanosTimeout = deadline - System.nanoTime();
  21. if (nanosTimeout <= 0L)
  22. return false;
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. nanosTimeout > spinForTimeoutThreshold)
  25. LockSupport.parkNanos(this, nanosTimeout);
  26. if (Thread.interrupted())
  27. throw new InterruptedException();
  28. }
  29. } finally {
  30. if (failed)
  31. cancelAcquire(node);
  32. }
  33. }

这个方法在分析AQS时做过详细分析,这里再写一下粗略的过程:

1、将当前线程封装为node节点,并加入同步队列,计算超时时间点
2、如果node的前置节点是head,则尝试申请锁tryAcquireShared(),设置头节点、唤醒后继节点申请共享锁;否则进入3
3、当前时间如果未超时,并且剩余时间大于阈值,则将node的线程挂起,等待被唤醒或超时中断
4、被唤醒后重复申请直到申请成功或线程被中断

共享锁解锁流程分析

ReadLock的解锁通过AQS的releaseShared()实现,并且自定义了tryRelease():

  1. public void unlock() {
  2. sync.releaseShared(1);
  3. }
  4. protected final boolean tryReleaseShared(int unused) {
  5. Thread current = Thread.currentThread();
  6. if (firstReader == current) { // 判断是否为firstReader,设置计数器
  7. if (firstReaderHoldCount == 1)
  8. firstReader = null;
  9. else
  10. firstReaderHoldCount--;
  11. } else { // 非首次申请读锁的线程,从ThreadLocal中获取计数器并更新
  12. HoldCounter rh = cachedHoldCounter;
  13. if (rh == null || rh.tid != getThreadId(current))
  14. rh = readHolds.get();
  15. int count = rh.count;
  16. if (count <= 1) {
  17. readHolds.remove();
  18. if (count <= 0)
  19. throw unmatchedUnlockException();
  20. }
  21. --rh.count;
  22. }
  23. for (;;) {
  24. int c = getState();
  25. int nextc = c - SHARED_UNIT; // 高16位减一,得到新的读锁数量
  26. if (compareAndSetState(c, nextc))
  27. // Releasing the read lock has no effect on readers,
  28. // but it may allow waiting writers to proceed if
  29. // both read and write locks are now free.
  30. return nextc == 0; // 当state的值为0时,读锁完全释放(有读锁时不会存在写锁)
  31. }
  32. }

关注方法中的注释。值得注意的是,读锁的释放,会先修改计数器数值,然后自旋方式去释放state资源。

尾声

初看ReentrantReadWriteLock类,里面复杂的代码很容易放弃,需要学会对类进行拆解,其核心还是利用AQS来达到读写锁既要分离操作,又要互相影响的目的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/966854
推荐阅读
相关标签
  

闽ICP备14008679号