赞
踩
StampedLock类,在JDK1.8引入,是对ReenrantReadWriteLock的补充增强,它不是依赖AQS框架实现的锁,但还是等待队列这种并发实现机制,和ReenrantReadWriteLock有很多相似之处。StampedLock增加了一些新的功能,如读/写锁的转换、还能实现乐观锁,在竞争少的情况下能提高并发的性能。
如果能熟练灵活使用,搭配其他并发组件,可提高系统性能。使用此类需要对它的实现有一定了解,否则可能出现副作用,出现死锁和其他的问题。另外要注意StampedLock不支持锁重入。
类注释说明1:
StampedLock是一个基于三种模式来控制读/写操作的锁。 StampedLock的状态由版本号和(读/写)模式组成。锁获取方法返回一个表示并控制相对于锁状态的stamp;这些方法的“try”版本可以改为返回特殊值零,以表示获取锁失败。锁释放和转换方法需要使用stamp作为参数,如果它们与锁的状态不匹配,则相应的操作会失败。
三种模式分别是:
①Writing模式 :方法writeLock可能会阻止等待独占访问,其返回的stamp可以在方法unlockWrite中作为入参使用以释放锁 ,还提供了tryWriteLock的非定时版本和定时版本。当锁保持在写模式时,可能不会获得任何读锁,并且所有乐观读验证(使用validate方法)都将失败。
②Reading模式: (悲观读)方法readLock可能会阻止等待非排他性访问,其返回的stamp可以在方法unlockRead中作为入参使用以释放锁, 另外还提供了tryReadLock的非定时版本和定时版本。
③Optimistic Reading模式:(乐观读)仅当锁当前未处于写入模式时,方法tryOptimisticRead才返回非零的stamp。如果自获取给定标记以来未在写入模式下获取锁,则方法validate返回true。可以将这种模式视为悲观读锁的极弱版本,它可能随时被写锁线程给破坏掉。在短的只读代码段中使用乐观读模式通常可以减少竞争并提高吞吐量。然而,这种状态本质上是很脆弱的(容易被破坏)。乐观的读取部分应仅读取全局变量并将其保存在局部变量中,以供验证后使用。在乐观读模式下读取的全局变量可能不一致,因此仅当您熟悉使用数据的表示形式去检查一致性或重复调用方法validate时,此用法才适用。乐观读锁通常需要执行这些步骤,应首先读取对象/数组的引用,然后再去访问其成员变量 、元素(针对数组)或方法。
类注释说明2:
此类还支持有条件地在三种模式之间提供转换的方法。例如,方法tryConvertToWriteLock尝试“升级”模式,如果①已经处于写模式或②处于读模式并且没有其他读锁,或者③处于乐观读模式并且该锁可用,则返回有效的写模式的stamp。这些方法的形式旨在帮助减少某些基于重试的设计中发生的代码膨胀。
StampedLocks设计为在线程安全组件的开发中用作内部实用程序。它们的使用取决于对数据的内部属性、对象、方法的了解程度。它们不是可重入的,因此锁定的主体不应调用可能尝试重新获取锁的其他未知方法(尽管您可以将stamp传递给可以使用或转换模式的其他方法)。读锁模式的使用依赖于相关的代码段无副作用。未经验证的乐观读部分无法调用未知的方法来容忍潜在的不一致。Stamps使用有限表示,并且不是加密安全的(即,有效的stamp可能是可猜测的)。stamp值可能会在连续运行(不超过一年)之后回收。未使用或未验证而持有的stamp超过此期限可能无法正确验证。 StampedLocks是可序列化的,但始终反序列化为初始的unlocked状态,因此它们对于远程锁无用。
StampedLock的调度策略并不总侧重于读线程抑或写线程,反之亦然。所有“try”方法都是尽力而为,不一定符合任何调度或公平性策略。任何用于获取或转换锁的“try”方法的返回值为零时,此时的返回值不会带有关于锁状态的任何信息,随后的调用可能会成功。
因为它支持跨多种锁定模式的协调使用,所以此类不直接实现Lock或ReadWriteLock接口。但是,在仅需要相关功能集的应用程序中,StampedLock可以调用asReadLock(),asWriteLock()或asReadWriteLock()方法间接使用所需的锁。
/** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** The number of bits to use for reader count before overflowing */ private static final int LG_READERS = 7; // Values for lock state and stamp operations private static final long RUNIT = 1L; //,CAS更新state的增加量 private static final long WBIT = 1L << LG_READERS; // 128 二进制形式:0b10000000 private static final long RBITS = WBIT - 1L;// 127 二进制形式:0b01111111 private static final long RFULL = RBITS - 1L;// 126 二进制形式:0b01111110 private static final long ABITS = RBITS | WBIT; // 255 二进制形式:0b11111111 private static final long SBITS = ~RBITS; // -128 二进制形式:0b11111111_11111111_11111111_10000000 // Initial value for lock state; avoid failure value zero private static final long ORIGIN = WBIT << 1; // 256 二进制形式:0b100000000 // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; // Values for node status; order matters private static final int WAITING = -1; private static final int CANCELLED = 1; // Modes for nodes (int not boolean to allow arithmetic) private static final int RMODE = 0; private static final int WMODE = 1;
RUNIT: 值为1,更新读锁状态会用到的常量,“next = s + RUNIT”一个新线程获取获取到了读锁那么state将增加1.
ABITS:值为255,计算写锁状态时会用到的常量,位运算“((s = state) & ABITS)”来取state的低8位。
WBIT: 值为123,更新写锁状态会用到的常量,“next = s + WBIT”一个线程获取到写锁就将state属性加128。
RBITS:值为127,计算读锁状态会到的常量,“s & RBITS”用来取state的低7位。
RFULL:值为126,获取到读锁的最大线程数,“(readers = s & RBITS) >= RFULL”。
SBITS:计算写锁变化量会用到的常量,“(s & SBITS)”将s的低7位全部置0,只关注s的高25位。
ORIGIN:state的初始值,二进制形式只有第9位为1,其他位全为0。主要是不让statet等于0,只有尝试获取锁失败时,state才能为0.
/** Head of CLH queue */
private transient volatile WNode whead; //等待队列头节点
/** Tail (last) of CLH queue */
private transient volatile WNode wtail; //等待队列尾节点
// views
transient ReadLockView readLockView; //读锁视图
transient WriteLockView writeLockView;//写锁视图
transient ReadWriteLockView readWriteLockView;//读写锁视图
/** Lock sequence/state */
private transient volatile long state; //锁状态
/** extra reader count when state read count saturated */
private transient int readerOverflow; //当读锁线程数超过RFULL时,额外记录超出的的线程数
上面有一个特别重要的成员变量,它就是state,它表示了锁的状态。state是int类型变量,它有32个比特位,它的二进制形式的低7位表示读锁状态,它的二进制形式的第8位表示当前是否持有写锁(1表示持有写锁,0表示没持有写锁)。一个线程获取到读锁,其低7位就要加1,一个线程释放读锁,其低7位就要减1,因此在StampedLock的一些方法中常见“ s - RUNIT”和“s - RUNIT”这样的代码片段。
使用位运算"state&0b11111111"可取出state二进制的低8位,而常量ABITS的值则是0b11111111,所以在StampedLock的方法中有很多“(s = state) & ABITS”的代码片段。当取出state的低8位是0b10000000时表示当前持有写锁,而当取出的state的低8位小于0b10000000且不为0时表示当前至少持有1个读锁。
另外state的高24位并不是不用它,每获取或释放一次写锁,都是在state的第8位上加1,加1之后可能就需要向高位进位。如当前是持有写锁的,那么state二进制第8位是1,而此刻要释放写锁,就要在state二进制第8位加1,之前第8位就已经是1了,所以state第9位要进位,而进位后第8位就是0了。此时锁已被释放了,而state的第8位也是0,这也与预设规定“state的第8位若为1表示持有写锁,若为0表示没持有写锁”一致。不得不说,这个类的作者对位运算的特点运用得炉火纯青。
由此可看出,state的高25位都是描述写锁状态的,不管是获取还是释放一次写锁,其高25位都会在原基础上加1。这其实可以看作写锁的一个版本号,这有点类似于集合类的modCount属性(只要结构被修改了,modCount自增1),它本为就是为了解决ABA的问题而产生的。在读锁的释放和锁的转换时均要先比较写锁的这个版本号,通过位运算“state&SBITS”来取state的高25位,若版本号不一致,表明这个期间写锁的状态可能被改变过,这是不正常的状态,读锁的释放或锁的转换直接失败。
WNode类是记录等待节点的内部类,与AQS的Node节点有些类似,最大不同之处在于有cowait这个读模式节点的链表。
static final class WNode {
//前驱节点
volatile WNode prev;
//后继节点
volatile WNode next;
//等待的读模式节点
volatile WNode cowait; // list of linked readers
//当前等待节点对应的线程
volatile Thread thread; // non-null while possibly parked
//记录节点状态
volatile int status; // 0, WAITING, or CANCELLED
//记录节点读、写模式
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
ReadLockView 、WriteLockView和ReadWriteLockView均是成员内部员,ReadLockView 、WriteLockView的实现基本上是委托外部类StampedLock的相应方法完成的,而ReadWriteLockView又完全是委托ReadLockView 、WriteLockView这两上类实现的。值得注意的是ReadLockView 、WriteLockView均不支持等待条件Condition,执行newCondition方法都要直接抛出异常。
final class ReadLockView implements Lock { public void lock() { readLock(); } public void lockInterruptibly() throws InterruptedException { readLockInterruptibly(); } public boolean tryLock() { return tryReadLock() != 0L; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryReadLock(time, unit) != 0L; } public void unlock() { unstampedUnlockRead(); } public Condition newCondition() {//不支持条件 throw new UnsupportedOperationException(); } } final class WriteLockView implements Lock { public void lock() { writeLock(); } public void lockInterruptibly() throws InterruptedException { writeLockInterruptibly(); } public boolean tryLock() { return tryWriteLock() != 0L; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryWriteLock(time, unit) != 0L; } public void unlock() { unstampedUnlockWrite(); } public Condition newCondition() {//不支持条件 throw new UnsupportedOperationException(); } } final class ReadWriteLockView implements ReadWriteLock { public Lock readLock() { return asReadLock(); } //返回一个ReadLockView public Lock writeLock() { return asWriteLock(); }//返回一个WriteLockView }
public StampedLock() {
state = ORIGIN;//256,state二进制的第9位设为0,避免state为0,在stampedLock中0这种返回值表示相关操作失败。
}
writeLock方法的基本逻辑:
①若写/读锁均没有被任何线程持有,当前线程可以进行CAS更新state后,CAS更新成功就抢锁成功。②若CAS更新失败则调用acquireWrite完整地CAS更新state,acquireWrite方法会阻塞线程直到抢锁成功才能返回。③写/读锁任何之一被某些线程持有,直接调用acquireWrite阻塞当前线程,直到写锁被其他线程释放后且当前线程抢锁成功方法才得以返回
public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only /** * ABITS的低8位全为1,二进制形式0b011111111,这里(s&ABITS)位运算就是获取state的低8位。 * 尝试CAS更新state(在state进制位的第8位加1)成功则返回新的state, * * state的低8位是表示读锁和写锁的状态,低8位是0表明写锁和读锁均没有被任何线程持有。 * 写/读锁均没有被任何线程持有,当前线程可以进行CAS更新state后,CAS更新成功就抢锁成功, * 若CAS更新失败则调用acquireWrite完整地CAS更新state,直到更新成功acquireWrite的死循环自旋才能返回. * 写/读锁任何之一被某些线程持有,直接调用acquireWrite会阻塞当前线程,直到写锁被其他线程释放后且当前线程抢锁成功方法才得以返回. */ return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); }
完整版本地获取写锁acquireWrite(boolean,long)方法分析
private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; //node表示待入队节点,p表示尾节点 /** * 将待入队节点添加的队列的尾部(作为尾节点) */ for (int spins = -1;;) { // spin while enqueuing long m, s, ns; if ((m = (s = state) & ABITS) == 0L) { //无任何读/写锁时 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) //CAS获取写锁成功,可直接返回 return ns; } else if (spins < 0) //持有写锁且等待队列只有一个节点(或无任何节点) ,重设自旋次数 spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins;//自旋次数自减 } else if ((p = wtail) == null) { //空链表 initialize queue //初始化头尾节点 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd;//只有一个节点时,队列的头尾节点的引用指向同一节点 } else if (node == null) //初始化待入队节点node,并使node的前驱节点是旧尾节点,node插入到了队列尾部 node = new WNode(WMODE, p); else if (node.prev != p)//有线程插入了节点到队列尾部 node.prev = p;//保证当前入队节点node是旧尾节点 else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//将当前入队节点node设为新的尾节点 p.next = node;//更新原尾节点的后继节点。 //至此 新的尾节点设置成功、新旧尾节点间的链接关系更新完成,表明入队成功,可退出入队的自旋 break; } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) {//队列中只有一个或零个节点时 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1;//自旋次数过大,让其减半 for (int k = spins;;) { // spin at head 自旋更新state,并设置新的头节点。 long s, ns; if (((s = state) & ABITS) == 0L) {//无锁时 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { //CAS更新state成功,将入队后的node节点作为头节点。 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) //自旋次数用完后,退出头节点的自旋。 break; } } else if (h != null) { // help release stale waiters 队列中有两个及以上节点 WNode c; Thread w; //唤醒头节点上所有cowait读模式锁 while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) {//没有其他线程修改头节点 if ((np = node.prev) != p) { //尾节点变化了,就将单向更新链接关系 if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) //若p是刚初始化的节点,就CAS更新为等待状态, U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) {//若p是被取消的节点,就将其从队列中移除 if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) //获取锁超时,就从队列中移除那些CANCELLED状态的节点 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this);//设置当前线程的被休眠阻塞者 node.thread = wt; //p.status<0表示处于WAITING状态 //(p != h || (state & ABITS) != 0L) 持有锁 // whead == h头节点稳定,未被其他线程修改, // node.prev == p 新旧尾节点关系稳定 if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park 休眠阻塞 node.thread = null;//被唤醒后清除线程 U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); //取消被中断节点 } } } }
unlockWrite方法的基本流程:
①先检查锁状态是否正常,若不正常则抛出异常 。②无条件更新state,将表示写锁是否持有的state二进制第8位置为0。 ③唤醒等待队列头节点的后继节点
public void unlockWrite(long stamp) { WNode h; //state!=stamp表示预期的state与实际的state不等,表明状态不对 // (stamp & WBIT) == 0L 表示写/读锁均没有被任何线程持有。写锁都没被获取到就释放锁显然不是正常的状态。 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //无条件更新state(在state进制位的第8位加1,因为之前第8位就是1,更新后在第9位进1位,而第8位变成0). //若更新后的state是0,则将其默认初始值ORIGIN(二进制的第9位为1,其他位全是0). //这里state不能使用0,是因为state为0时表示获取锁失败的state值, state = (stamp += WBIT) == 0L ? ORIGIN : stamp; //“(h = whead) != null”头节点为空表明等待队列中还有等待节点,即有线程在等待获取锁 //status为零表示对应节点代表的线程刚释放锁。 if ((h = whead) != null && h.status != 0) //等待队列中有线程在等待抢锁,且此头节点不是刚释放锁 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); }
release(Node)唤醒等待节点的方法逻辑非常简单:先尝试将头节点的state设为0,从后向前找到一个有效后继节点q,然后唤醒这个节点代表的线程。
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
if ((q = h.next) == null || q.status == CANCELLED) {
//从后向前遍历,找到最近的有效节点(非CANCELLED节点)
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
readLock方法的基本逻辑:
①若写锁未被任何线程持有且读模式线程数小于规定的最大值,尝试CAS更新state(将state加1)。②若CAS更新失败则调用acquireRead完整地CAS更新state,acquireRead方法会阻塞线程直到抢锁成功才能返回。③若写锁已被某线程持有且读模式线程数大于等于规定的最大值,直接调用acquireWrite阻塞当前线程,直到抢锁成功才能返回。
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
//state的第8位是写锁状态,state的低7位是读锁状态,而“(s & ABITS)”是取state的低8位,
// 则可以包含了写锁的状态。但实际情况是,“(s & ABITS) < RFULL”一定能确定没有写锁被获取到。
// 如果此时有写锁,那么state的第8位是1。(s & ABITS)的结果是128(常量WBIT),而RFULL的值是126。
//要想表达式“(s & ABITS) < RFULL”成立,state的第8位不能是1,只能是0.
//而state的第8位是0,就表明当前写锁没有被任何线程持有。
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
//若写锁未被任何线程持有且获取到读锁的线程数小于规定的最大值RFULL,
//可以进行CAS更新state(将state加1),若CAS更新state成功,就获取到读锁,然后返回新的state
//若CAS更新state失败,就进入acquireRead完整的获取读锁,直到获取锁成功,acquireRead方法才能返回
next : acquireRead(false, 0L));
}
完整版本地获取读锁acquireRead(boolean,long)方法分析
private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p;//node表示当前线程对应的节点,表示队列的尾节点 /** * 第一段自旋,将当前线程对应的节点入队列 */ for (int spins = -1;;) { WNode h; //头节点和尾节点是同一节点,表明队列中只有一个节点(或0个节点),可以很快获轮到当前线程抢锁了 if ((h = whead) == (p = wtail)) { for (long m, s, ns;;) { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //持有读锁(数量不多),能CAS再获取读锁,读锁状态加1 //读锁的数量太多,且tryIncReaderOverflow成功,也可获取再获取读锁。 return ns; else if (m >= WBIT) { //有线程事先已抢得写锁 if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins;//随机探测,可将自旋次数自减 } else { if (spins == 0) {//自旋剩余次数为0,检测是否可退出 WNode nh = whead, np = wtail; //同步队列未发生变化退出自旋 if ((nh == h && np == p) || (h = nh) != (p = np)) //头尾节点稳定不变或队列中已经不只一个节点了 break; } spins = SPINS;//不能退出的话,重置spins后继续自旋 } } } } if (p == null) { // initialize queue //初始化头尾节点,且指向同一节点 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null)//初始化待入队节点,并使node的前驱节点是旧尾节点(插入队列尾部) node = new WNode(RMODE, p); //队列中只有一个节点或尾节点是写模式节点 else if (h == p || p.mode != RMODE) { if (node.prev != p)//有线程插入了节点到队列尾部 node.prev = p;//保证当前入队节点node的前驱节点是旧尾节点 else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//将当前入队节点node设为新的尾节点 //至此 新的尾节点设置成功、新旧尾节点间的链接关系更新完成,表明入队成功,可退出入队的自旋 p.next = node; break; } } //队列中有多个节点且尾节点是读模式节点,将尾节点的cowait指向当前线程node节点 else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) //CAS更新失败,将当前线程对应的node节点的cowait重置为null node.cowait = null; else {//上面的CAS成功 for (;;) { WNode pp, c; Thread w; //头节点是读模式,尝试唤醒头节点cowait链上所有的读节点 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) // help release U.unpark(w); //队列中只两个或一个节点(此时node还没完成入队,不算在内)时,可以抢读锁了。 //(只有一个节点时,h==p或pp=null,而两个节点时h=p.prev) if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } //执行上面的while循环时,其他线程未修改队列 if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { //status>0是CANCELLED, node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); //超时就在队列中移除此节点 Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; //(有3个以上节点或持有写锁)且(头节点未变化且尾节点前驱未变化)休眠当前线程 if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null;//被唤醒后清空线程 U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true);//若中断,在队列中移除此节点 } } } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) {//队列中只有一个或零个节点时 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head 在头节点自旋, long m, s, ns; //不持有写锁时 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) ://尝试获取读锁 (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node;//CAS更新成功后,将当前节点设置新的头节点 node.prev = null; while ((c = node.cowait) != null) {//尝试唤醒头节点cowait链上所有读节点 if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; //如果持有写锁,放弃头节点自旋抢读锁。 } } else if (h != null) {//队列中有两个及以上节点,头节点且不为空 WNode c; Thread w; while ((c = h.cowait) != null) {//尝试唤醒头节点cowait链上所有读节点 if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) {//头节点未被修改过 if ((np = node.prev) != p) {// 尾节点被修改过,就将单向更新链接关系 if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) //若p是刚初始化的节点,就CAS更新为等待状态 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) {//若p是被取消的节点,就将其从队列中移除 node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) //获取锁超时,就在队列中取消node节点 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; //p.status<0表示处于WAITING状态 //(p != h || (state & ABITS) != 0L) 持有锁 // whead == h头节点稳定,未被其他线程修改, // node.prev == p 新旧尾节点关系稳定 if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null;//被唤醒后清除线程 U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true);//取消被中断节点 } } } }
unlockRead方法释放读锁的基本流程:
①先检查锁状态是否正常,若不正常则抛出异常 。②若获取到读锁的线程数小于规定的最大值时,CAS更新state(将state减1),若读模式线程数为0就唤醒等待队列头节点的后继节点。③若获取到读锁的线程数大于等于规定的最大值,调用tryDecReaderOverflow方法,尝试减少成员变量readerOverflow的值,并将其减少的值增加到state上。
public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { //(s = state) & SBITS) != (stamp & SBITS)获取读锁到释放读锁期间,写锁状态变化了。这是不正常状态 // (stamp & ABITS) == 0L表明没有任何线程获取到了读/写锁,没得到锁就释放锁,这也是不正常状态 //"(m = s & ABITS) == 0L || m == WBIT"表明没有线程获取到读/写锁或有线程获取到了写锁,这还是不正常状态。 if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)//m=WBIT表明有线程获取了写锁 throw new IllegalMonitorStateException();//不正常状态需要抛出异常 if (m < RFULL) { //获取到读锁的线程数小于规定的最大值。 //CAS更新state(将state减1)。此处与写锁不同,不能无条件更新state, // 因为此时可能有多个线程同时释放读锁,而写锁在某一刻只可能有唯一的一个线程释放锁。 if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) //CAS更新成功,state已减至1(马上为就为0),且等待队列中还有可用的等待线程 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); break; } } //获取到读锁的线程数大于等于规定的最大值 //调用tryDecReaderOverflow方法,尝试减少readerOverflow的值,并将其减少的值增加到state上。 else if (tryDecReaderOverflow(s) != 0L) break; } }
tryDecReaderOverflow方法基本逻辑:
①若获取到读锁的线程数等于规定的最大值 ,将成员变量readerOverflow的值减少1,将成员变量state的值加1,最终使得state二进制的低7位全为1,返回最新的state.②若获取到读锁的线程数大于规定的最大值,尝试减少readerOverflow失败,返回0
private long tryDecReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; if ((s & ABITS) == RFULL) { //如果获取到读锁的线程数恰好达到规定的最大值 //CAS更新state,这里不太好理解,如果能联系这些常量的二进制值班就好容易理解些。 //"(s & ABITS) == RFULL"成立,可得出s的低8位是:01111110,而RBITS二进制是01111111 //那么这里的位运算“s | RBITS”的效果就是将s的最低位设为1、其他位不变,即s自增1. if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { //CAS更新state成功,更新后state的二进制的低7位全为1 int r; long next; // readerOverflow大于0,表明之前读锁线程数超过了RFULL, // 并将超出的数量放在了readerOverflow成员变量中。 if ((r = readerOverflow) > 0) { //之前已经将state加1了,这里将readerOverflow减1,一加一减刚好平衡 readerOverflow = r - 1; next = s; } else //readerOverflow 本身就为0,它没有记录读锁线程超出数, //不能将readerOverflow减1,只能将state减1.这前对state加1,这里又对state减1,也能加减平衡 next = s - RUNIT;//对state无条件减1 state = next;// return next; } } //获取到读锁的线程数已经大于规定的最大值了,不能decReaderOverflow else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) Thread.yield();//随机种子合适,当前线程礼让 return 0L;//获取到读锁的线程数已经大于规定的最大值,tryDecReaderOverflow失败,返回常量0 }
tryOptimisticRead()的基本逻辑是:只要当前写锁被未被任何线程获取到,就返回一个有效的stamp,这是stamp是写锁的累计变化次数。若写锁被某线程获取到了,就返回一个失败状态的零。
public long tryOptimisticRead() {
long s;
//当写锁被某线程持有时,state二进制形式的第8位是1,反之第8位是0。
// 而WBIT的二进制形式(只有第8位是1,其他位均是0)10000000。
//因此只要state的第8位是0,表达式”(s = state) & WBIT“的结果就是0
//当前没有写锁被任何线程持有时,state的第8位是0,可以获取乐观读锁,
//“(s = state) & WBIT”为零,使用“s & SBITS”(将s的低7位全部置0)计算出stamp.
//当前已有线程获取了写锁时,state的第8位是1,不能获取乐观读锁,“(s = state) & WBIT”非零,
//尝试获取乐观读锁失败,返回0.
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;//(s&SBITS)是写锁的变化次数
}
tryOptimisticRead方法常与validate结合使用,我们可以分析下validate如何实现的。
validate方法的逻辑很简单,就是检查从获取到stamp到当前这期间内,写锁的stamp是写锁的累计变化次数是否改变了。
public boolean validate(long stamp) {
U.loadFence();//内存屏障,确保此屏障之前的读操作都已完成。
//当成功获取乐观读锁时,方法tryOptimisticRead返回(s&SBITS),
//而传入的参数stamp是方法tryOptimisticRead的返回值,即stamp=(s&SBITS),
//因数字SBITS的特殊性而有等式(stamp&SBITS)=(s&SBITS)&SBITS=(s&SBITS) 此时下面条件成立返回true.
//当获取乐观读锁失败时,方法tryOptimisticRead返回常量0,
//而传入的参数stamp为0时,此时下面条件不成立返回false
return (stamp & SBITS) == (state & SBITS);
}
tryConvertToOptimisticRead()方法:
所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配,不匹配直接返回0. ①首先检查写锁的累计变化次数是否改变了,若发生改变了直接返回0。 ②若当前state表示未持有任何锁,则直接返回当前的state。 ③若当前state表示持有写锁,则无条件更新state,并唤醒后继节点,再返回最新的state 。④若当前state表示持有读锁,且持有读锁的线程数小于规定的最大值,则CAS更新state(将state减1),返回写锁的累计变化次数,若state为1时还需唤醒后继节点 。⑤获取到读锁的线程数大于等于规定的最大值,则调用tryDecReaderOverflow尝试减少readerOverflow的值,若尝试成功就返回写锁的累计变化次数,若尝试失败就返回0。
public long tryConvertToOptimisticRead(long stamp) { //a表示期望的state的低8位,m实际的state的低8位,s表示当前的state,h是等待队列的头节点 long a = stamp & ABITS, m, s, next; WNode h; U.loadFence();//内存屏障,确保此屏障之前的读操作都已完成。 for (;;) { //写锁的变化量不一致 if (((s = state) & SBITS) != (stamp & SBITS)) break; //返回0,将失败 if ((m = s & ABITS) == 0L) {//实际上state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) //实际的state的低8位是0,而预期的state的低8位不为0.两者状态不一致,返回0 break; //期望与实际的state的低8位相同,均为0,写/读锁均没有被任何线程持有. // 既然现在没有任何锁,可以马上获取任何锁,返回状态state return s; } else if (m == WBIT) { //表示当前写锁被某线程持有了 if (a != m)//预期与实际状态不一致,将返回0 break; //无条件更新state,state的第8位加1,因为此时之前state的第8位已经是1了, // 所以在更新后state的第9位将进一位,而第8位就变成0了. state = next = (s += WBIT) == 0L ? ORIGIN : s; if ((h = whead) != null && h.status != 0) //有可用的等待节点,调用release方法更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next; } //实际的state的低8位不为0也不为WBIT时 else if (a == 0L || a >= WBIT) //预期的state的低8位是0,状态不一致 //a的理论最大值是WBIT,参数stamp不正确。 //将返回0 break; else if (m < RFULL) {//获取到读锁的线程数小于规定的最大值,还可以继续获取读锁。 //CAS更新state(将state减1)。此处与写锁不同,不能无条件更新state, // 因为此时可能有多个线程同时释放读锁,而写锁在某一刻只可能有唯一的一个线程释放锁。 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) //CAS更新成功,state已减至1(马上为就为0),且等待队列中还有可用的等待线程 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next & SBITS; //返回写锁的累计变化次数 } } //获取到读锁的线程数大于等于规定的最大值 //调用tryDecReaderOverflow方法,尝试减少readerOverflow的值,并将其减少的值增加到state上。 else if ((next = tryDecReaderOverflow(s)) != 0L) //尝试减少成员变量readerOverflow的值成功 //返回写锁的累计变化次数 return next & SBITS; } return 0L; }
tryConvertToReadLock方法的主要逻辑:
①所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配。②若当前state表示未持有任何锁,则获取一个读锁,再返回最新的state 。③若当前的state表示持有写锁,则释放写锁并获得读锁,返回最新的state。④ 若当前的state表示持有读锁,则将直接将这个传入的stamp返回。 ⑤在所有其他情况下,此方法均返回零
public long tryConvertToReadLock(long stamp) { long a = stamp & ABITS, m, s, next; WNode h; //写锁的变化量没变,即期间没有获取/释放写锁,传入的参数stamp与实际的state相匹配才尝试转为读锁 while (((s = state) & SBITS) == (stamp & SBITS)) { // if ((m = s & ABITS) == 0L) {//实际上state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) 实际的state的低8位是0,而预期的state的低8位不为0.两者状态不一致,返回0 break; else if (m < RFULL) {//这是一个BUG,此时m=0,m一定小于RFULL // 既然现在没有任何锁,可以马上获取任何锁, if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) //CAS更新成功,获取了一个读锁,返回最新的state return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } else if (m == WBIT) {//表示当前写锁被某线程持有了 if (a != m) break;//预期与实际状态不一致,将返回0 //释放写锁并获取一个读锁 state = next = s + (WBIT + RUNIT); //写锁状态加1(在state的第8位加1),读锁状态也加1 if ((h = whead) != null && h.status != 0) //有可用的等待节点,调用release方法更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next;//返回最新的state } else if (a != 0L && a < WBIT) //已经是读锁了,直接返回这个读锁 return stamp; else break; } return 0L;//写锁的变化量变了,锁转换失败,返回0 }
tryConvertToWriteLock的主要逻辑:
所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配 。① 若当前state表示未持有任何锁(可读取乐观)CAS更新state,获取一个读锁,返回最新的state 。② 若当前state表示持有写锁,则直接将传入的参数stamp返回。③若state表示持有读锁,则CAS更新state(同时释放该读锁状态再获取写锁状态),返回最新的state。④在所有其他情况下,此方法均返回零。
public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; while (((s = state) & SBITS) == (stamp & SBITS)) {//写锁的变化量没变,即期间没有获取/释放写锁,才尝试转为读锁 if ((m = s & ABITS) == 0L) {//实际上(当前)state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) break;//预期与实际状态不一致,将返回0 if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) //CAS更新成功,获取了一个写锁,返回最新的state return next; } else if (m == WBIT) { //表示当前写锁被某线程持有了 if (a != m) break;//预期与实际状态不一致,将返回0 return stamp;//直接返回这个写锁 } else if (m == RUNIT && a != 0L) {//有一个读锁了 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))//释放一个读锁(减RUNT)再获取一个写锁(加WBIT) //CAS更新state成功,返回新的state return next; } else break; } return 0L; }
这里以StampedLock类使用说明上的Point类为例,Point表示在二维坐标上的一个点,它能移动、可以计算距原点距离、设置新的坐标位置。
class Point { private double x, y; private final StampedLock sl = new StampedLock(); public Point(){ this.x = 0; this.y = 0; } public Point(double x, double y) { this.x = x; this.y = y; } void move(double deltaX, double deltaY) { // 写操作,使用写锁 an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } double distanceFromOrigin() { // A read-only method 乐观锁, //适用于只读。先不加锁去获取写锁的版本号,直接读出值, // 在要使用此值时再检测从读出值至现在这段时间内是否持有写锁 long stamp = sl.tryOptimisticRead();//如果持有写锁返回0,反之则获取写锁的版本号, double currentX = x, currentY = y; if (!sl.validate(stamp)) {//版本号变化了表明之前有线程抢到写锁,去获取悲观读锁 stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { //锁升级适用于,先读取值、然后根据读出的值判断是否要更新值的情况(前后有依赖关系)。 // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); if (ws != 0L) { //锁升级成功,进行写操作后,退出循环 stamp = ws; x = newX; y = newY; break; } else {//锁升级失败, sl.unlockRead(stamp);//释放原读锁 //在获取写锁后,再次锁转换一定成功(此时的锁本就是写锁, // tryConvertToWriteLock锁转换时会直接返回原stamp), stamp = sl.writeLock(); } } } finally { // 释放锁(可能是读锁也可能是写锁) sl.unlock(stamp); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。