赞
踩
AQS 是什么?
AQS 是 AbstractQueuedSynchronizer 的简称,也被称为抽象给队列同步器,它是一个抽象类。它提供了一个框架,用于实现依赖先进先 出(FIFO)等待队列的阻塞锁和相关的同步器(信号量、事件等),这个类被设计为大多数类型的同步器的有用依据。
AQS 的数据结构什么?
AQS 的实现是基于FIFO队列的,它是一个双向队列(里面还有单向的条件队列)
AQS 内部类有哪些?
AQS 的内部类主要是 Node 和 ConditionObject
AQS 的状态属性:
(AQS对象)属性名 | 作用 |
---|---|
head | 头节点 |
tail | 尾节点 |
state | 同步状态或理解为锁的状态也行,但注意 这个别与 Node 节点对象的 waitStatus 和 nextWaiter 混淆了 |
(Node 节点对象)属性名 | 作用 |
---|---|
CANCELLED | 值为1;在队列中等待的线程超时或被中断时,从队列取消 |
IGNAL | 值为-1;后继节点等待当前节点的唤醒 |
ONDITION | 值为-2;节点在等待队列中,当 condition 被 signal() 后,会从等待队列转到同步队列 |
ROPAGATE | 值为-3;表示下一次共享或同步状态获取将会被无条件传播下取 |
0 | 初始状态 |
HARED | Node节点对象,表示共享模式 |
XCLUSIVE | Node节点对象,表示独占(排它)模式 |
waitStatus | 表示当前节点所表示的线程状态 |
prev | 当前节点的前驱 |
next | 当前节点的后继 |
thread | 线程对象 |
nextWaiter | 下一个条件等待节点对象 |
这个类序列化和反序列化后会怎样?
这个类序列化仅存储底层维持状态的原子整数,因此反序列化对象具有空线程队列
继承该类后,主要重写哪些方法?
tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively等
以上这几个方法在 AQS 中直接抛出异常,需要子类自行实现。
关于默认插入策略
对于默认插入策略,吞吐量和可扩展性通常最高,尽管这不能保证是公平的或则无饥饿的
关于 CLH():
CLH 是一种基于单项链表的高性能、公平的自旋锁。申请加锁的线程通过前驱节点的变量进行自旋。在前置节点解锁后,当前节点会结束自旋,并进行加锁。在 SMP 架构下, CHL 更具有优势,在 NUMA 架构下,如果当前节点与前驱节点,不在同一 CPU 模块下,跨 CPU 模块会带来额外的系统开销,而 MCS 锁更适用于 NUMA 架构, 参考来源,里面有对 (NUMA 和 SMP 的介绍)
它的加锁逻辑:
Ⅰ、获取当前线程的锁节点,如果为空则进行初始化
Ⅱ、通过同步方法获取尾节点,并将当前节点置为为节点,此时获取到的为节点为当前节点的前驱节点
Ⅲ、如果节点为空,则表示当前节点为第一个节点,加锁成功
Ⅳ、如果尾节点不为空,则基于前驱节点的锁值进行自旋(locked = = true ),直到前驱节点的锁值( locked = = false),
它的解锁逻辑:
①、获取当前线程的锁节点,如果节点为空或锁值( locked = = false),则无需解锁,直接返回
②、如果同步方法为尾节点赋空值,赋值不成功则表示当前节点不是尾节点,需要将当前节点的 locked = = false
保证已解锁该节点。如果当前节点为尾节点,则无需设置该节点的锁值。因为该节点没有后置节点,即使设置了
也没什么意义
为什么头节点不在构造器中就被创建
CLH 队列需要虚拟的头节点才能开始,但不需要在构造器中创建他们,因为没有竞争,它将会被浪费
== 注意: == 线程条件等待使用的是相同的节点,但使用的是额外的 link。条件值需要在简单(非并行)链接队列中
链接节点,因为只有在专用时才可以访问它们,等待时,一个节点将插入队列。收到信号时,该节点转移到主队
列,状态字段的指定值用于编辑节点所在的队列。说的直白一点,有两个队列,同步队列 和 条件队列。收到信号,
节点从条件队列转到同步队列。有点儿那种生产者和消费者的意思。① 可能没仔细 Node 定义的你会有点儿困惑,
当你仔细阅读源码时,你会发现,在除了节点前驱( 源码中定义为: volatile Node prev )、节点后继( 源码中定义
为:volatile Node next; ) 外,还存在一个 nextWaiter ( 源码中定义为:Node nextWaiter; )定义,所以一个节点
除了前驱和后继节点外,还可能有一个节点;② 同步队列是双向链表,条件队列是单链表,为了帮助理解,
可以看下面示意的图(这个图是有问题的,这只是一个示意图):
transient 的作用
java 语言关键字,变量修饰符,如果用 transient 声明一个实例变量,当对象存储时,它的值不需要维持。
java 的serialization 提供了一种持久化对象的机制。当持久化对象时,可能有一个特殊的对象数据成员,我们不想用 serialization 机制来保存它。为了在一个特定对象的一个域上关闭 serialization,可以在这个域上加上关键字 transient,当一个对象序列化的时候,transient 型变量的值不包括在序列化的表示中,然而非 transient 型变量时被包括进去的:
添加节点的操作,== 特别要注意的是 addWaiter 方法中的 “ Node node = new Node(Thread.currentThread(), mode);” 这一行代码, Node 构造器中为 :this.nextWaiter = mode; 而不是 this.next = mode; 别弄混淆了, 特别要注意,注意 ,注意!!!==
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ /** * 这个是在 tail 后添加新的节点,这个是在主队列中 * 参数 node : 插入的节点 * 返回 节点的前驱 */ private Node enq(final Node node) { for (;;) { //死循环 Node t = tail; //获取尾部节点对象 if (t == null) { // Must initialize 尾部节点对象为 null,即尾部节点不存在,说明此时队列为空 if (compareAndSetHead(new Node())) // 创建一个节点,并将其设置为头节点 tail = head; // 此时头尾节点为同一节点 } else { // 尾节点不为空 node.prev = t; // 插入节点的前驱指向 尾节点 (这里开始向队列添加节点) if (compareAndSetTail(t, node)) { // 将 新增的节点设置为 尾节点 t.next = node; // t(原尾节点)的后继指向新节点 return t; } } } }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ /** * 为当前线程和给定的模式创建节点并且入队 * 参数mode: Node.EXCLUSIVE 用于独占 Node.SHARED 同于共享 * 返回 新的节点 */ private Node addWaiter(Node mode) { // 这个要注意,初始化方法为有一行代码为: this.nextWaiter = mode; 并不是 this.next = mode; Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试 end 的快速路径; 失败时备份到完整的 enq Node pred = tail; if (pred != null) { // pred 不为空,即 tail 不为空 node.prev = pred; if (compareAndSetTail(pred, node)) { // CAS 方式设置尾节点,执行失败的话会进 enq 方法 pred.next = node; return node; } } enq(node); return node; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒节点的后继,如果存在的话 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 如果状态为 负/否定 的话(即可能需要信号),请尝试清除预期发出信号 // 如果失败或等待线程更改状态是可以的 int ws = node.waitStatus; // 获取节点的等待状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //设置等待状态 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 被释放线程保留在后续线程中,该线程通常只是下一个节点 // 但如果取消或显然为空,请从尾部向后移动以找到实际未取消后继 Node s = node.next; // 获取后继节点对象 if (s == null || s.waitStatus > 0) { //判断后继节点是否为 null 或者 s 后继节点的等待状态值 > 0 s = null; // 将后继节点置为 null // 从尾部tail向前遍历, 注意这里的遍历条件 t != null && t != node; // 这里最多遍历到 node 节点这里 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // t 的等待状态值 <= 0 s = t; // 将 t 赋给 s } if (s != null) LockSupport.unpark(s.thread); // 真正的唤醒操作 }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ /** * 设置队列头,并且在共享模式下,检查后继节点是否处于等待状态 * 如果是,则传播,如果传播 > 0 或 PROPAGATE 已经被设置 * 参数 node: 节点 * 参数 propagate :tryAcquireShared的返回值 */ // 为了帮助理解,一下为 tryAcquireShared 返回值的含义: // 负值 说明失败 // 0 表示在共享模式下获取成功但后续无法获取成功 // 正值 以共享模式获取成功并且随后的共享模式获取可能成功 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below 记录老的头节点以便在下面进行检查 setHead(node); // 将 node 设置为头节点 /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ /** * 如果出现以下情况,尝试发信号通知下一个队列的节点: * 传播被调用者指示,或由上一个操作记录(作为 setHead 之前或 之后的 h.waitStatus ) * (注意:这使用了 waitStatus 的签名检查(sign-check),因为 PROPAGATE 状态可能转换为 SIGNAL) * 并且下一个节点正在共享模式下等待,或则我们不知道,因为它显示为空 * * 这两项检查中的保守性可能导致不必要的唤醒,但仅当有多个获取/发布时, 因此不论现在 * 还是不久后,大多数都需要发出信号 */ // 参考:https://blog.csdn.net/anlian523/article/details/106319294/ // propagate > 0 说明还有剩余共享锁可以获取 // h == null (h = head) == null 和 s == null 是为了防止空指针异常发生的标准写法,但这不代表就 // 一定会发现它们为空的情况。这里的话, h == null 和 (h = head) == null 是不可能成立,因为只要执行 // addWaiter ,CHL 队列至少也会有一个 node 存在;但 s == null 是可能发生的,比如:node 已经是队列最后一个节点 // 如果 propagate > 0 不成立 ,而h.waitStatus < 0 成立。这说明旧 head 的 status < 0 成立。 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //1. node不再关联到任何线程 node.thread = null; //2. 跳过被cancel的前继node,找到一个有效的前继节点pred // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; //3. 将node的waitStatus置为CANCELLED // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; //4. 如果node是tail,更新tail为pred,并使pred.next指向null // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. // int ws; //5. 如果node既不是tail,又不是head的后继节点 //则将node的前继节点的waitStatus置为SIGNAL //并使node的前继节点指向node的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //6. 如果node是head的后继节点,则直接唤醒node的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
// 这个方法是 AQS 内部类 ConditionObject 的方法 private Node addConditionWaiter() { Node t = lastWaiter; // lastWaiter 表示的是条件队列的最后一个节点 // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); // 这个方法是去除条件队列中所有非 CONDITION 的节点 t = lastWaiter; // 然后 将最后一个节点赋给 t } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建一个 Node 节点 if (t == null) // 对 t 进行判空 firstWaiter = node; // 如果t为空的话,此时队列条件队中没有节点,该 node 将成为首节点 else t.nextWaiter = node; // 将 node 节点添加到条件队列的尾部 lastWaiter = node; // node节点是条件队列的尾部了,需将 lastWaiter 更改为 node return node; // 将新增的 node 节点返回 }
private void unlinkCancelledWaiters() { Node t = firstWaiter; // 条件队列的第一个节点 Node trail = null; while (t != null) { Node next = t.nextWaiter; // 将节点 t 的后继节点赋给 next if (t.waitStatus != Node.CONDITION) { // 若节点t的 waitStatus 不为 CONDITION(以下便是摘掉这个节点的操作) t.nextWaiter = null; // 将 节点 t 的 nextWaiter 设为 null /** * trail 为 null,表明 next 之前的节点等待状态均为 CANCELLED,此时更新 firstWaiter 引用的指向 * trail不为null,表明 next 之前有节点的等待状态为 CONDITION,这时将 trail.nextWaiter 指向 next节点 */ if (trail == null) // 判断 trail 是否等于 null firstWaiter = next; // trail 为 null 成立,则将 next 赋给firstWaiter else trail.nextWaiter = next; // 否则 trail 的 nextWaiter 设为 next if (next == null) // next 为null,表明遍历到队列尾部了,此时将 lastWaiter 指向 trail lastWaiter = trail; } else // 其实就是用 trail 去记录已遍历节点中最后一个 CONDITION 的节点 trail = t; // t.waitStatus = Node.CONDITION,则将 trail 指向 t t = next; } }
在阅读源码和查阅资料的过程中,我发现在别人的文章中有几个有趣的问题(侵权联系删除),也在这里分享出来,
原文地址我忘记录下来了,对这位大兄弟表示抱歉,这位大兄弟的部分内容如下:
① 状态 1 是被中断的, 那 CompareAndSetState(0,1) 不是设置为1 就获取到锁了么,在这里貌似还是等待?
答: 状态 1 是被中断的,但注意这里说的是 waitstatus,而 CompareAndSetState(0,1)这里说的是 state.在AQS里是
两个变量而不是一个变量,waitstatus 用于记录节点的状态,state 用于记录描述 AQS的状态(用于标记是否处于
同步中,以及记录重入的次数)
② 状态 -1 ,后继节点处于等待状态,当前节点在干啥,不是等待么?
答: 状态 -1,后继介蒂安等待,当前节点不一定
如果当前节点为 head,可能在等待(正好被新来的节点抢走了),也可能在执行
如果当前节点不是 head ,肯定在等待
调用 await() 方法,会节点对从同步队列转到条件队列中,详细可参考
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ /** * 实现可打断的条件等待 * 1. 如果当前线程被中断,抛出 InterruptedException * 2. 保存 getState 返回的锁状态 * 3. 以保存的状态作为参数调用 release ,如果失败则抛出 IllegalMonitorStateException * 4. 阻塞直到被通知或被打断 * 5. 通过调用 acquire 的指定版本(以保存的状态作为参数)来重新获取 * 6. 如果当在第 4 步时被中断,抛出 InterruptedException */ public final void await() throws InterruptedException { if (Thread.interrupted()) // 如果当前线程被中断,抛出 InterruptedException throw new InterruptedException(); // 添加到等待队列中 Node node = addConditionWaiter(); //释放当前线程获取的资源 int savedState = fullyRelease(node); //保存 getState 返回的锁状态 int interruptMode = 0; // 此处逻辑为: // 第一次循环时,由于节点不在同步队列中,因此会进入到 while 内部代码中,使用 lockSupport.park 使 // 线程阻塞,要唤醒线程,基本有两种方式:一种是使用sign()或signAll()方法唤醒;另一种则是发生中断 // 判断当前节点是否在同步队列中,当使用sign 或 signAll 唤醒、或发生中断,节点都会进入同步队列中, // 才会跳过 while 循环,执行后续代码 while (!isOnSyncQueue(node)) { // 阻塞直到被通知或被打断 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //通过调用 acquire 的指定版本(以保存的状态作为参数)来重新获取 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。