赞
踩
ReentrantReadWriteLock是解决计算机场景的读写场景,并发场景有读线程和写线程同时对一个资源操作,且读场景远远多于写场景,多个读线程共享一个资源没有问题,但是多个写线程不能同时共享一个资源。
ReentrantReadWriteLock进入读锁前提:无其他写锁
ReentrantReadWriteLock进入写锁前提:无其他写锁 且 无其他读锁
功能 | 可重入锁 | 不可重入锁 | 公平锁 | 非公平锁 | 独占锁 | 可中断锁 | 锁降级 | 锁升级 |
---|---|---|---|---|---|---|---|---|
ReentrantReadWriteLock | 支持 | 不支持 | 支持 | 支持(默认) | 支持 | 支持 | 支持 | 不支持 |
公平锁:多线程按照申请锁的顺序获取锁,性能差保证顺序。
非公平锁:获取锁顺序和申请锁的顺序无关,大部分的锁都支持非公平锁,性能好。
可重入锁:当一个线程获取资源(锁),别的线程再次获取,不会出现异常(线程等待)。
不可重入锁:当一个线程获取锁后,别的线程无法再次获取锁。(实现需要自定义)
独占锁:一个线程获取资源(锁),其他线程不能获取该资源。(syn,rl都是队列独占锁)
可中断锁:一个线程等待另外一个线程释放资源,超时后不在等待。
锁降级:当线程 先获取写锁->在获取读锁->在释放写锁->最后线程持有读锁,线程由写锁降级为读锁。
锁升级:不支持先获取读锁,在获取写锁,在释放读锁,达到锁升级的目的。错误的:违背前提
内部类解释:
Sync: Sync继承AQS实现锁信号同步
NonfairSync: 非公平锁实现
FairSync:公平锁实现
ReadLock:读锁实现
WriteLock:写锁实现
读写状态设计:
TODO 移位运算相关
锁降级是把持住当前拥有的写锁,再获取到读锁,随后释放先前拥有的写锁。
答案是必要的。锁降级目的就是保证获取数据正确性。主要是为了保证数据的可见性,若果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程T获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新(是说线程A使用数据时,并不知道别的线程已经更改了数据,所以使用的是线程T的修改结果。因此通过锁降级来保证数据每次修改后的可见性)。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁后,线程T才能获取写锁进行数据更新。说白了就是,写锁和读锁的代码块中的操作是要保证串行执行,就相当于synchronized关键字修饰的一段代码块。
ReentrantReadWriteLock
不支持锁升级,因为可能有其他线程同时持有读锁,而读写锁之间是互斥的,因此升级为写锁存在冲突。
- public class ReentrantReadWriteLock
- implements ReadWriteLock, java.io.Serializable {
- private static final long serialVersionUID = -6992448646407690164L;
- /** 读锁 */
- private final ReentrantReadWriteLock.ReadLock readerLock;
- /** 写锁 */
- private final ReentrantReadWriteLock.WriteLock writerLock;
- /** Performs all synchronization mechanics */
- final Sync sync;
-
- /**
- * 默认初始化非公平锁
- */
- public ReentrantReadWriteLock() {
- this(false);
- }
-
- /**
- * true:公平锁
- * false:非公平锁
- */
- public ReentrantReadWriteLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- readerLock = new ReadLock(this);
- writerLock = new WriteLock(this);
- }
-
- public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
- public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
-
- /**
- * Sync构建
- */
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 6317671515068378041L;
-
- //高16位为读锁,低16位为写锁
- static final int SHARED_SHIFT = 16;
- //读锁单位,移位运算2的16次方,TODO移位运算流程
- static final int SHARED_UNIT = (1 << SHARED_SHIFT);
- //读锁最大数量
- static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
- //写锁最大数量
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
-
- /** 读锁计数器,持有读锁线程数,位运算 */
- static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
- /** 写锁获取次数,位运算 */
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
-
- /**
- * 共享线程缓存计数器
- */
- static final class HoldCounter {
- //读锁数量
- int count = 0;
- // 线程id
- final long tid = getThreadId(Thread.currentThread());
- }
-
- /**
- * 本地线程计数器池
- */
- static final class ThreadLocalHoldCounter
- extends ThreadLocal<HoldCounter> {
- public HoldCounter initialValue() {
- return new HoldCounter();
- }
- }
-
- /**
- * 读线程计数器
- */
- private transient ThreadLocalHoldCounter readHolds;
-
- /** 最近一个成功获取读锁的线程的计数。 这省却了ThreadLocal查找 缓存*/
- private transient HoldCounter cachedHoldCounter;
-
- /** 第一个读线程 */
- private transient Thread firstReader = null;
- /** 针对只有一个读锁的优化处理 重入计数器 */
- private transient int firstReaderHoldCount;
-
- Sync() {
- /**初始化重入计数器*/
- readHolds = new ThreadLocalHoldCounter();
- setState(getState()); // ensures visibility of readHolds
- }
-
-
- /**
- * 读公平策略
- */
- abstract boolean readerShouldBlock();
-
- /**
- * 写公平策略
- */
- abstract boolean writerShouldBlock();
-
- /*
- * 尝试释放独占锁
- */
- protected final boolean tryRelease(int releases) {
- /**如果持有锁线程不是当前线程*/
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- /**获取写锁线程状态*/
- int nextc = getState() - releases;
- //如果写锁线程状态为0,返回true
- boolean free = exclusiveCount(nextc) == 0;
- if (free)
- //释放线程,修改线程状态
- setExclusiveOwnerThread(null);
- setState(nextc);
- return free;
- }
-
- //尝试获取独占锁
- protected final boolean tryAcquire(int acquires) {
- /*
- * 获取当前线程
- */
- Thread current = Thread.currentThread();
- //获取线程状态
- int c = getState();
- //写线程数量
- int w = exclusiveCount(c);
- if (c != 0) {
- // 如果线程状态为非0(占用),【写线程数量为0 或者 当前线程不等于持有线程】,返回尝试获取独占锁失败
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- //当前持有线程数+即将持有线程数>最大线容许程数 抛异常
- if (w + exclusiveCount(acquires) > MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- // 修改线程持有状态
- setState(c + acquires);
- return true;
- }
- //返回线程是否持有锁,公平锁判断队列情况,非公平锁直接返回false
- if (writerShouldBlock() ||
- //修改线程状态
- !compareAndSetState(c, c + acquires))
- return false;
- //持有线程
- setExclusiveOwnerThread(current);
- return true;
- }
-
- /**
- * 尝试释放共享锁
- */
- protected final boolean tryReleaseShared(int unused) {
- //获取当前线程
- Thread current = Thread.currentThread();
- //当第一个读线程等于当前线程
- if (firstReader == current) {
- // 读线程计数器长度为1
- if (firstReaderHoldCount == 1)
- //释放此读线程
- firstReader = null;
- else
- //如果线程计数器长度不等于1,计数器自减
- firstReaderHoldCount--;
- } else {
- //获取当前线程计数器
- HoldCounter rh = cachedHoldCounter;
- //当前线程计数器为null 或者 计数器tid不等于当前线程tid
- if (rh == null || rh.tid != getThreadId(current))
- //从readHolds本地线程池获取HoldCounter对象
- rh = readHolds.get();
- //获取资源持有共享锁数量
- int count = rh.count;
- //持有共享锁数量小于等于1,删除共享锁计数器
- if (count <= 1) {
- readHolds.remove();
- if (count <= 0)
- throw unmatchedUnlockException();
- }
- --rh.count;
- }
-
- //死循环修改线程持有锁状态
- for (;;) {
- //线程状态
- int c = getState();
- //TODO 这个赋值难以理解
- int nextc = c - SHARED_UNIT;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
-
- private IllegalMonitorStateException unmatchedUnlockException() {
- return new IllegalMonitorStateException(
- "attempt to unlock read lock, not locked by current thread");
- }
-
- /**
- * 尝试获取共享锁
- */
- protected final int tryAcquireShared(int unused) {
- /*
- * 获取当前线程
- */
- Thread current = Thread.currentThread();
- //获取线程状态
- int c = getState();
- if (exclusiveCount(c) != 0 &&
- //判断持有锁的线程是否为当前线程
- getExclusiveOwnerThread() != current)
- return -1;
- //获取共享锁的数量
- int r = sharedCount(c);
- //独占锁:判断队列第一个节点是不是独占模式 共享锁:判断是否是独占锁,持有锁线程是否为当前线程
- if (!readerShouldBlock() &&
- //共享锁数量要小于锁最大数量
- r < MAX_COUNT &&
- //修改锁状态
- compareAndSetState(c, c + SHARED_UNIT)) {
- //共享锁数目为0
- if (r == 0) {
- //设置当前线程为firstReader,优化部分
- firstReader = current;
- //共享线程缓存计数器加1
- firstReaderHoldCount = 1;
- } else if (firstReader == current) {
- firstReaderHoldCount++;
- } else {
- //获取当前线程计数器
- HoldCounter rh = cachedHoldCounter;
- //如果计数器为null 或 当前线程id不等于线程计数器id
- if (rh == null || rh.tid != getThreadId(current))
- //赋值线程计数器
- cachedHoldCounter = rh = readHolds.get();
- //如果线程计数器线程数等于0
- else if (rh.count == 0)
- //赋值线程计数器
- readHolds.set(rh);
- rh.count++;
- }
- return 1;
- }
- //处理失败后,自旋实现线程重入队列
- return fullTryAcquireShared(current);
- }
-
- /**
- * 自旋方式获取共享锁
- */
- final int fullTryAcquireShared(Thread current) {
- HoldCounter rh = null;
- for (;;) {
- //获取线程占用状态
- int c = getState();
- //如果其他线程获取写锁
- if (exclusiveCount(c) != 0) {
- //持锁线程不等于当前线程
- if (getExclusiveOwnerThread() != current)
- return -1;
- // else we hold the exclusive lock; blocking here
- //独占锁:判断队列第一个节点是不是独占模式 共享锁:判断是否是独占锁,持有锁线程是否为当前线程
- } else if (readerShouldBlock()) {
- // 如果firstReader等于当前线程
- if (firstReader == current) {
- // assert firstReaderHoldCount > 0;
- } else {
- //如果线程计数器池等于null
- if (rh == null) {
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current)) {
- rh = readHolds.get();
- //如果线程计数器为0,删除此计数器
- if (rh.count == 0)
- readHolds.remove();
- }
- }
- if (rh.count == 0)
- return -1;
- }
- }
- //如果共享线程数等于最大线程数
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- //修改共享线程状态
- if (compareAndSetState(c, c + SHARED_UNIT)) {
- //共享线程数等于0
- if (sharedCount(c) == 0) {
- firstReader = current;
- firstReaderHoldCount = 1;
- } else if (firstReader == current) {
- firstReaderHoldCount++;
- } else {
- if (rh == null)
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current))
- rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- cachedHoldCounter = rh; // cache for release
- }
- return 1;
- }
- }
- }
-
- /**
- * 尝试获取写锁(可中断方式,不会傻傻等待)
- */
- final boolean tryWriteLock() {
- //获取当前线程
- Thread current = Thread.currentThread();
- //获取线程状态 1:持有锁 0:未持有锁
- int c = getState();
- if (c != 0) {
- //获取写锁数量
- int w = exclusiveCount(c);
- //如果写锁数量为0 或者 当前线程不等于持锁线程 返回false
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- if (w == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- }
- //CAS更新线程状态
- if (!compareAndSetState(c, c + 1))
- return false;
- //设置当前线程为持锁线程
- setExclusiveOwnerThread(current);
- return true;
- }
-
- /**
- * 尝试获取读锁
- */
- final boolean tryReadLock()
- //获取当前线程
- Thread current = Thread.currentThread();
- for (;;) {
- //获取线程状态
- int c = getState();
- //如果写锁数量不等于0 且 持锁线程不等于当前线程 返回false【说明写锁独占】
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return false;
- //获取读锁数量
- int r = sharedCount(c);
- //读锁数量等于最大容许数
- if (r == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- //修改线程状态
- if (compareAndSetState(c, c + SHARED_UNIT)) {
- //共享线程数等于0
- if (r == 0) {
- //初始化firstReader为当前线程,firstReaderHoldCount=1
- firstReader = current;
- firstReaderHoldCount = 1;
- //如果firstReader当前线程
- } else if (firstReader == current) {
- //线程池数量自增
- firstReaderHoldCount++;
- } else {
- // 获取线程计数器
- HoldCounter rh = cachedHoldCounter;
- //如果线程计数器为null 或者 当前线程id不得能与线程计数器id
- if (rh == null || rh.tid != getThreadId(current))
- //初始化计数器
- cachedHoldCounter = rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- }
- return true;
- }
- }
- }
-
- /**
- * 判断当前线程为持锁线程
- */
- protected final boolean isHeldExclusively() {
- return getExclusiveOwnerThread() == Thread.currentThread();
- }
-
- // 初始化等待队列 TODO 后续研究Condition在补充
- final ConditionObject newCondition() {
- return new ConditionObject();
- }
-
- /**
- * 如果无线程持有写锁返回null,则返回当前持锁线程
- */
- final Thread getOwner() {
- // Must read state before owner to ensure memory consistency
- return ((exclusiveCount(getState()) == 0) ?
- null :
- getExclusiveOwnerThread());
- }
-
- //返回读锁数
- final int getReadLockCount() {
- return sharedCount(getState());
- }
-
- //是否是写锁
- final boolean isWriteLocked() {
- return exclusiveCount(getState()) != 0;
- }
-
- //写锁数量
- final int getWriteHoldCount() {
- return isHeldExclusively() ? exclusiveCount(getState()) : 0;
- }
-
- //返回读锁数量
- final int getReadHoldCount() {
- if (getReadLockCount() == 0)
- return 0;
-
- //如当前线程等于firstReader,返回firstReader读线程数
- Thread current = Thread.currentThread();
- if (firstReader == current)
- return firstReaderHoldCount;
-
- //如果当前线程id等于线程计数器id,返回线程计数器读线程数
- HoldCounter rh = cachedHoldCounter;
- if (rh != null && rh.tid == getThreadId(current))
- return rh.count;
-
- //最后返回缓存中线程计数器读线程数
- int count = readHolds.get().count;
- if (count == 0) readHolds.remove();
- return count;
- }
-
- /**
- * TODO 做什么?
- */
- private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
- s.defaultReadObject();
- readHolds = new ThreadLocalHoldCounter();
- setState(0); // reset to unlocked state
- }
-
- final int getCount() { return getState(); }
- }
-
- /**
- * 非公平锁实现
- */
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -8159625535654395037L;
- //非公平方式获取写锁,不加入队列
- final boolean writerShouldBlock() {
- return false; // writers can always barge
- }
-
- //非公平方式获取读锁 查看AQS源码
- final boolean readerShouldBlock() {
- return apparentlyFirstQueuedIsExclusive();
- }
- }
-
- /**
- * 公平锁实现
- */
- static final class FairSync extends Sync {
- private static final long serialVersionUID = -2274990926593161451L;
- //节点是否加入队列,阻塞方式获取写锁
- final boolean writerShouldBlock() {
- return hasQueuedPredecessors();
- }
- //节点是否加入队列,阻塞方式获取读锁
- final boolean readerShouldBlock() {
- return hasQueuedPredecessors();
- }
- }
-
- /**
- * 读锁定义
- */
- public static class ReadLock implements Lock, java.io.Serializable {
- private static final long serialVersionUID = -5992448646407690164L;
- private final Sync sync;
-
- /**
- * 初始化sync对象
- */
- protected ReadLock(ReentrantReadWriteLock lock) {
- sync = lock.sync;
- }
-
- /**
- * 获取共享锁 参考AQS源码
- */
- public void lock() {
- sync.acquireShared(1);
- }
-
- /**
- * 可中断方式获取共享锁
- */
- public void lockInterruptibly() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
-
- /**
- * 尝试获取读锁
- */
- public boolean tryLock() {
- return sync.tryReadLock();
- }
-
- /**
- * 尝试获取锁 true:获取 false:未获取
- */
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
-
- /**
- * 锁释放
- */
- public void unlock() {
- sync.releaseShared(1);
- }
-
- /**
- * 不支持conditions抛出异常
- */
- public Condition newCondition() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * 锁信息打印
- */
- public String toString() {
- int r = sync.getReadLockCount();
- return super.toString() +
- "[Read locks = " + r + "]";
- }
- }
-
- /**
- * 写锁定义
- */
- public static class WriteLock implements Lock, java.io.Serializable {
- private static final long serialVersionUID = -4992448646407690164L;
- private final Sync sync;
-
- /**
- * 初始化写锁
- */
- protected WriteLock(ReentrantReadWriteLock lock) {
- sync = lock.sync;
- }
-
- /**
- * 尝试获取锁
- */
- public void lock() {
- sync.acquire(1);
- }
-
- /**
- * 可中断方式获取锁 AQS源码分析
- */
- public void lockInterruptibly() throws InterruptedException {
- sync.acquireInterruptibly(1);
- }
-
- /**
- * 尝试获取锁
- */
- public boolean tryLock( ) {
- return sync.tryWriteLock();
- }
-
- /**
- * 尝试超时等待方式获取锁
- */
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- }
-
- /**
- * 释放锁
- */
- public void unlock() {
- sync.release(1);
- }
-
- /**
- * 创建Condition队列
- */
- public Condition newCondition() {
- return sync.newCondition();
- }
-
- /**
- * 打印输出写锁线程信息
- */
- public String toString() {
- Thread o = sync.getOwner();
- return super.toString() + ((o == null) ?
- "[Unlocked]" :
- "[Locked by thread " + o.getName() + "]");
- }
-
- /**
- * 判断持有锁线程是否是当前线程
- */
- public boolean isHeldByCurrentThread() {
- return sync.isHeldExclusively();
- }
-
- /**
- * 获取写锁线程数
- */
- public int getHoldCount() {
- return sync.getWriteHoldCount();
- }
- }
-
- // Instrumentation and status
-
- /**
- * 判断锁释放是公平锁
- */
- public final boolean isFair() {
- return sync instanceof FairSync;
- }
-
- /**
- * 返回持有锁的线程
- */
- protected Thread getOwner() {
- return sync.getOwner();
- }
-
- /**
- * 获取共享锁数量
- */
- public int getReadLockCount() {
- return sync.getReadLockCount();
- }
-
- /**
- * 判断是否是独占锁
- */
- public boolean isWriteLocked() {
- return sync.isWriteLocked();
- }
-
- /**
- * 当前线程是否是持锁线程
- */
- public boolean isWriteLockedByCurrentThread() {
- return sync.isHeldExclusively();
- }
-
- /**
- * 返回独占锁数量
- */
- public int getWriteHoldCount() {
- return sync.getWriteHoldCount();
- }
-
- /**
- * 返回读锁数量
- */
- public int getReadHoldCount() {
- return sync.getReadHoldCount();
- }
-
- /**
- * 返回独占锁线程集合 AQS源码
- */
- protected Collection<Thread> getQueuedWriterThreads() {
- return sync.getExclusiveQueuedThreads();
- }
-
- /**
- * 返回读锁线程集合 AQS源码
- */
- protected Collection<Thread> getQueuedReaderThreads() {
- return sync.getSharedQueuedThreads();
- }
-
- /**
- * 是否有线程排队 AQS源码
- */
- public final boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
-
- /**
- * 线程是否在队列排队 AQS源码
- */
- public final boolean hasQueuedThread(Thread thread) {
- return sync.isQueued(thread);
- }
-
- /**
- * 队列长度 AQS源码
- */
- public final int getQueueLength() {
- return sync.getQueueLength();
- }
-
- /**
- * 队列线程集合 AQS源码
- */
- protected Collection<Thread> getQueuedThreads() {
- return sync.getQueuedThreads();
- }
-
- /**
- * Condition队列是否有线程排队 AQS源码
- */
- public boolean hasWaiters(Condition condition) {
- if (condition == null)
- throw new NullPointerException();
- if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
- throw new IllegalArgumentException("not owner");
- return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
- }
-
- /**
- * Condition队列长度 AQS源码
- */
- public int getWaitQueueLength(Condition condition) {
- if (condition == null)
- throw new NullPointerException();
- if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
- throw new IllegalArgumentException("not owner");
- return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
- }
-
- /**
- * 获取阻塞在Condition队列的线程集合 AQS源码
- */
- protected Collection<Thread> getWaitingThreads(Condition condition) {
- if (condition == null)
- throw new NullPointerException();
- if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
- throw new IllegalArgumentException("not owner");
- return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
- }
-
- /**
- * 打印线程信息
- */
- public String toString() {
- int c = sync.getCount();
- int w = Sync.exclusiveCount(c);
- int r = Sync.sharedCount(c);
-
- return super.toString() +
- "[Write locks = " + w + ", Read locks = " + r + "]";
- }
-
- /**
- * 获取线程ID
- */
- static final long getThreadId(Thread thread) {
- return UNSAFE.getLongVolatile(thread, TID_OFFSET);
- }
-
- // 初始化Unsafe对象
- private static final sun.misc.Unsafe UNSAFE;
- private static final long TID_OFFSET;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> tk = Thread.class;
- TID_OFFSET = UNSAFE.objectFieldOffset
- (tk.getDeclaredField("tid"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
参考文档:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。