赞
踩
static final class Node { /** * 同步队列的头 初始化 或者setHead方法可修改 */ static final Node SHARED = new Node(); /** * 标识这个节点用于 独占模式(排它 反正一个意思) */ static final Node EXCLUSIVE = null; /** 下面是 waitStatus 的几个常量值 */ /** * 表明等待线程已经取消 */ static final int CANCELLED = 1; /** * 表述如果当前节点的前一个节点状态是 SIGNAL 那么就可以阻塞当前自己的线程 不用去争抢资源了 没用 不然会一直尝试去获取资源 */ static final int SIGNAL = -1; /** * 线程在条件队列中等待 */ static final int CONDITION = -2; /** * 共享模式下 无条件传播 该状态的进程处于可运行状态 */ static final int PROPAGATE = -3; /** * 当前node 状态 */ volatile int waitStatus; /** * 同步队列的前置节点 */ volatile Node prev; /** * 同步队列的后置节点 */ volatile Node next; /** * 当前节点所属的线程 */ volatile Thread thread; /** * 用于条件队列 是条件队列的下一个节点 */ Node nextWaiter; /** * 是否是共享模式 这个方法只会在同步队列中使用 nextWaiter在同步队列中复用了 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 获取当前节点的前置节点 没有就抛出异常 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } final class ConditionObject implements Condition { /** * 条件队列的头节点 */ private transient Node firstWaiter; /** * 条件队列的尾节点 */ private transient Node lastWaiter; /** * ConditionObject 默认的构造函数 */ public ConditionObject() { } }
第一篇文章的时候 我和大家也描述过 Condition Queue 实际上是一个单向链表 在分析Node节点的时候 我描述过prev和next都是给Sync Queue使用的 实际上对于Condition Queue node节点 有效的字段 就是 nextWaiter ,waitStatus和thread字段
public void await() throws InterruptedException { if (Thread.interrupted())//判断当前线程 是否被中断了 如果中断了 抛出中断异常 throw new InterruptedException(); Node node = addConditionWaiter();//新增一个新的等待节点到条件队列中 int savedState = fullyRelease(node);//释放当前节点占用的资源 并返回线程持有的状态值 int interruptMode = 0; while (!isOnSyncQueue(node)) {//判断当线程节点是否在同步队列中 LockSupport.park(this);//如果不在同步队列中 那就阻塞当前线程 等待唤醒 /* * 能执行到下面的代码 说明线程从阻塞状态中唤醒了 唤醒可能有2种情况 * 1:是线程发生了中断 * 2:是线程接受到signal信号 从阻塞状态中被唤醒 * checkInterruptWhileWaiting 返回值有3个 * 0表示:线程没有被中断 * 1 REINTERRUPT表示:中断在signal之后发生的 * -1 THROW_IE表示:中断在signal之前发生的 */ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//检查是否发生过线程中断 0表示没有发生 break;// 如果线程没有中断 说明被signal唤醒 那就继续判断是否唤醒了当前线程 如果是当前线程 会进入到同步队列中 } /* * 这边的代码 就是当前的node已经在Sync Queue 中了 * acquireQueued 我们在之前独占锁加锁的时候 也分析过 就是去获取资源 获取不到的话 就排队等待继续阻塞 * acquireQueued返回true 说明在进入Sync队列中 等待的过程中锁的过程中也发生了中断 *acquireQueued返回true 返回false 说明没有发送过中断 那下面的赋值就不会走到 *如果acquireQueued返回true 而且interruptMode是非THROW_IE 那个整个方法就是REINTERRUPT的结果 因为都不需要抛出异常 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;//记录线程中断的表示位 /* *这边的意思就是如果当前节点nextWaiter不是等于null的说明 node节点还是和Condition * queue 关联着的 那就执行一下清理操作 吧condition queue里面的非等待节点剔除 * 那种情况下会走到这步呢 那就是当前的interruptMode是THROW_IE的时候 * 为什么呢 因为THROW_IE的意思 是中断发送在signal之前 signal * 因为如果是signal的话 当前节点的nextWaiter为被置为null的 可以回看下代码 */ if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0)//这边0 说明一直没发生过中断 reportInterruptAfterWait(interruptMode); }
/** * 新增一个新的等待节点到等待的条件队列中 * * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter;//等待条件的队列的最后一个 //如果最后的lastwaiter 节点状态是非Condition 说明已经取消 就清理ConditionQueue的方法 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter;//unlinkCancelledWaiters方法里面lastWaiter可能又重写赋值了 } Node node = new Node(Thread.currentThread(), Node.CONDITION);//当前线程包装成node节点 /* * t是null 说明尾节点为null 说明条件队列中没有值 所以node 成了firstWaiter * t不为null 那就加入到队尾 * */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node;//lastWaiter 重写赋值 因为node是最后加入的 node就是lastWaiter return node; } /** * 条件队列从头部开始 移除非CONDITION节点 */ private void unlinkCancelledWaiters() { Node t = firstWaiter;//头节点赋值给t Node trail = null;//trail是t的next节点的上一个为CONDITION的节点 //这个循环做的是从头节点开始移除不是CONDITION的节点 while (t != null) { Node next = t.nextWaiter;//next为t的下一个节点 if (t.waitStatus != Node.CONDITION) {//如果t的状态不是CONDITION 说明不应该在条件队列中 取消了 要移除 t.nextWaiter = null;//把t的下一个节点设置为null 这样让t 和整个条件队列链表断开 也方便GC /** *trail 是null 说明是第一次进来吧 但是第一次的t是firstWaiter 这个时候firstWaiter的节点为CONDITION * 所以下面有个赋值把 firstWaiter的下一个节点 赋值给firstWaiter 意思就是说 让下个节点成为头节点 * 如果trail不是为null 那就把当地节点的next赋值给trail的下个节点 因为当前节点t 不可用了 所以要将t的 * 下个节点 重新和链表关联起来 也就是说重新指向上一个节点 而trail其所就是t的上一个有效的节点 * 所以有了这个赋值 * */ if (trail == null) firstWaiter = next; else trail.nextWaiter = next; /* * 如果next 等于null 了 说明t没有下个节点了 这个时候trail 应该就是有效的最后一个节点 * */ if (next == null) lastWaiter = trail; } else trail = t;//trail相当于一个临时的变量 这边的赋值就是我上面说的 trail是next的上一个有效的节点值 t = next;//next赋值给t 准备下一次的循环 } }
上面的整个代码 是我注释了addConditionWaiter方法 大家应该能看明白,这个方法主要做的就是包装当前线程为node 然后加入的Condition Queue的队尾 这其中还做了一个条件队列元素清理的工作,清除一些非Condition状态的节点
下面我们来看下第二个方法 fullyRelease 看名字 我们应该也能猜出就是释放当前线程占用的资源,而且是完全释放,为什么是fully呢,那是比如重入锁,可以重入,每次lock的时候同步器的状态State都会+1,可以去看下第一篇的文章,应该有描述过,而且fullyRelease方法是有返回值的 返回的savedState就是当前线程持有的状态值,为什么要记录下来呢,那是后面我们再次争取锁资源的时候 需要用到这个savedState
/** * 释放当前节点持有的所有资源,并且唤醒同步队列中的head节点去获取资源 */ final int fullyRelease(Node node) { boolean failed = true;//表示 是否释放失败 try { int savedState = getState();//获取同步器的状态值state if (release(savedState)) {//就是释放资源 唤醒等待的线程去获取资源 之前已经描述过 不清楚的 看下第二篇文章 failed = false; return savedState; } else { throw new IllegalMonitorStateException();//释放失败 抛出异常 } } finally { if (failed) node.waitStatus = Node.CANCELLED;//如果释放失败 就把当前节点设置去取消 着就解释了 为什么之前加入节点的时候回去做检查 // 丢弃非Condition的节点 } }
isOnSyncQueue 方法 就是判断当前节点是否在同步队列SyncQueue中,如果是的话 就跳出while循环执行后面的方法,如果不在的话 那就要进入while循环体呢 做线程等待了,至于为什么要这样判断,那时因为node 节点加入到ConditionQueue 中,如果执行Signal方法,被唤醒的线程节点,会转移到SyncQueue中,这个具体后面的Signal方法里面 我们具体再说。
看下代码:
/** * 判断当前node 是否在同步队列中 */ final boolean isOnSyncQueue(Node node) { /* *节点的状态是condition一定不再同步队列中 *如果节点加入到同步队列中 使用enq方法 那么当前节点的pre 一定是非空的 *那么如果当前pre是为null *那就不在Sync queue 中 */ if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // 如果当前节点有后继节点 必然是在同步队列中的 因为next是同步队列中的node 才会存在这一的情况 return true; return findNodeFromTail(node);//去同步队列中匹配node 节点 } /** * 从尾部节点开始搜索 看是否能找到当前的node节点 */ private boolean findNodeFromTail(Node node) { Node t = tail;//同步队列的尾部节点 for (; ; ) { if (t == node)// t ==node 说明在同步队列能找到 返回true return true; /* * t==null 第一次循环说明tail节点不存在 说明同步队列就是不存在的 那node更不可能存在于同步队列中返回false * 后面的循环t 就是之前的节点的前pre节点 如果为null 说明已经找到了头部节点了 都没有匹配到node 也返回false */ if (t == null) return false; t = t.prev; } }
每行代码的 具体语义 我都在注释里面了 不清楚的 结和整个方法理解一下
当执行到while 内部的时候,刚才我也分析过,执行到while里面说明 当前的node节点不在SyncQueue中,说明就在ConditionQueue中,首先看到 有个阻塞线程的操作,这个和独占锁 阻塞当前线程是一个道理,这边等待是Signal唤醒当前线程,然后继续往下执行
后面有一个方法checkInterruptWhileWaiting 这个方法其实是要关注一下的,
先看下代码:
/** * Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** * Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * 检查是否发生过线程中断 * 返回0表示:线程没有被中断 * 1表示:中断在signal之后发生的 * -1表示:中断在signal之前发生的 */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. */ final boolean transferAfterCancelledWait(Node node) { /* * 这个地方给大家特别说明下: * 刚才上面我提到过 被唤醒有2中方式 可能是被signalled 或者被interrupted * 下面的有个CAS的操作 就是 将当前节点的状态更新成0 * 如果更新成功说明了 当前节点的状态依旧是CONDITION 也就是说还在条件队列中 那就说明了不是被signal唤醒的 那就是被中断了 * 同理 如果更新失败 则说明当前节点的状态 已经被修改了 那说明就是被signalled了的 因为被signal 会将当前节点状态修改 转移到Sync queue中 */ if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node);//这边更新成功 说明当前线程发生了中断 而且中断在signal之前 这边做一个补偿操作 把节点放入到Sync 队列中 return true; } /* * 这边又判断了下 当前节点是否在同步队列中 为什么还要判断呢 是因为虽然发生了signal * 但是 我们看下transferForSignal的方法能知道 是先执行修改节点状态的CAS操作 然后再执行enq的入队操作 * 所以这边虽然状态已经修改 但是可能线程正在执行enq 方法 所以这边判断了下 如果没有在Sync队列中 * 那当前线程就坐下yield 就是线程执行让出一下 意思就是稍等会儿 */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }
线程在while内部被阻塞 然后被唤醒 只有2中方式:1是线程发生了中断,二是
checkInterruptWhileWaiting 方法返回值有3个 一个是0说明线程从等待到唤醒没有发生过中断
第二个返回值是THROW_IE,它的值是-1,从命名上面我们能知道 这个是要抛出中断异常,它的执行结果其实就是线程的中断在Signal之前发生了
第三个返回结果是REINTERRUPT 它的值是1 意思就是重新做下线程中断,这个是由于中断在Signal之后发生的
这边有个条件就是 如果返回0的话 循环是继续的 不会break 我在网上查询说 这边可能存在“假唤醒”的问题 因为返回0 线程一定是没有中断,那就是被唤醒了,但是被唤醒的node 会进入到SyncQueue中的呀,为什么这边不跳过循环,反而是继续循环判断?这边没搞明白,有知道的小伙伴 可以告知一下!
while 之后的方法 说明当前线程已经在SyncQueue 那就执行和独占锁的获取方法一样的acquireQueued方法,不清楚的这个方法怎么运行的小伙伴,可以回看下第一篇文章,acquireQueued主要做的就是去尝试获取锁资源,如果获取不到线程还是阻塞等待的,直到被唤醒。该方法是有返回值的 如果返回ture 说明在等待过程中发生了中断,如果是false 说明没有。如果返回true 而且上面的interruptMode是非Throw-IE的 那interruptMode值就是ReInterrupt
后面的nextWaiter!=null,说明当前节点还没有和ConditionQueue断开,这边执行下ConditionQueue的清理操作,把非Condition状态的节点从条件队列中剔除出去。最后如果interruptMode非0就执行下对于的状态操作reportInterruptAfterWait
具体代码也很简单:
/**
* 这边就是根据 刚才interruptMode 不同的值 做出不同的回应
* THROW_IE 意思就是抛出异常
* REINTERRUPT 意思就是做出线程重写中断的操作 可以让上层去检测处理
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
await还有几个重构的方法,里面的核心方法上面我都讲了,剩下的有兴趣的可以自己尝试去理解看看,具体方法有什么区别 我在上篇分析Condition接口的时候 接口方法上面都注释了!
第一步 执行await方法执行的时候当前线程一定是获得了锁的,不然执行这个方法的时候回报错的,有兴趣的可以自己写下Demo,自己看下在哪一步报错,偷偷告诉你下在释放tryRelease的时候!
第二步 就是将当前线程封装成node节点 放入ConditionQueue的尾部
第三步 释放当前线程持有的所有同步器State
第四步 判断当前节点是否在SyncQueue中 如果是 就第五步 如果不是 就线程阻塞 等待Signal信号 唤醒
第五步 执行acquireQueued方法 去重新获取锁资源
最后一步 获取到锁后 根据前面的中断状态 做出对应的处理 方法返回
signal方法 是从Contidion头部开始选一个合法的节点 转换到SyncQueue中
public void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * 一个判断 判断是否用于锁的线程和释放线程是同一个 子类从写实现 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * 使得条件队列中的第一个没有被cancel的节点 enq到同步队列的尾部 */ private void doSignal(Node first) { do { /* * 这边说明条件队列只有first 一个节点转移完first节点设置lastWaiter也为null * 设置first的nextWaiter 等于null */ if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null;//first 要被加入到同步队列中 修改nextWaiter==null } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 将node节点从调节队列中转换到同步队列中 如果返回是true 那说明转换成功 */ final boolean transferForSignal(Node node) { /* * 如果当前的CAS操作失败 说明node节点的状态已经不是condition了 可能已经被cancel了 所以返回false */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);//将当前的node节点 加入到同步队列中 独占锁的时候已经分析过 返回的节点p是node节点的prev节点 int ws = p.waitStatus; /* * 这边ws是node的prev 节点p的状态 如果p的ws 大于0 那说明p已经cancel了 那就可以直接唤醒node节点 * 这边不明白的可以去结合shouldParkAfterFailedAcquire 方法看下 这个方法里面有如果node的pre节点是Cancel的话 会做重写寻找pre节点 * 同样的下面的CAS 操作将node的前驱节点P的ws状态修改为signal失败 说明当前的p节点的状态已经被别的线程修改了 * 那就要去唤醒node节点线程去获取资源锁 * 之前我们独占锁的时候都说过 同步队列中 节点都是通过自己的前驱节点去唤醒的 */ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
signal 方法比较简单 上面我也描述过了 有些地方如果看不懂 还是要结合整个await方法互相看下 每一个判断都存在道理
signalAll方法 是将所有ConditiaonQueue中node节点转换到SyncQueue中
public void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * 移除条件队列中所有节点 挨个转移到同步队列中 */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null;//因为所以节点 都已经转移 所以条件队列就为null 了 do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null);//循环转移 直到最后一个nextWaiter等于null }
从代码上 我们也能看到signalAll就是做一个所有节点的转移操作,doSignalAll方法入口就是设置当前的 lastWaiter = firstWaiter = null 保证了一个整体的操作,如果有人想问 为什么不直接把ConditionQueue接到SyncQueue的后面 不就好了么,为什么还要挨个去循环,那是因为2中队列的结构不一样,没法直接全部转移,Sync是用next和prev连接前后节点的但是Condition 是用NextWaiter连接后面的节点的,是一个单向链表,2者没法直接关联!
Sync-Queue:
Condition-Queue:
上面就是SyncQueue和ConditionQueue的流程图
写了5篇文章分析了下AQS的源码 大部分源码都已经做了注解,如果看不明白的,多看几遍 ,对着源码看,第一篇可以看我的注解,第二遍可以尝试自己单独看源码,是否能看明白,最好自己能debug走一遍 看下,一定的能够加深影响,最后文中如果有些的不对的,希望大家能够指正
后面我会整理下,把所有代码放到github 里面 方便大家看
预告:后面几篇我会写下具体实现AQS的java的的类,ReentrantLock,Semaphore,CountDownLatch,DelayQueue等等
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。