赞
踩
并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
每个插入操作(put)必须等到另一个线程调用移除操作(take)
(即: 写入元素必须被移除后才能继续写入新的元素
),否则写入操作一直处于阻塞状态。支持公平锁(TransferQueue-FIFO)和非公平锁(TransferStack-LIFO)。相较于其他可缓存元素队列,适用于单线程同步传递性场景
,比如:消费者没拿走当前的产品,生产者是不能再给产品的,这样可以控制生产者生产的速率和消费者一致
。特点如下:
基于一种名为“Dual stack and Dual queue
”的无锁算法实现。与其他阻塞队列不同,该队列不依赖与AQS实现,而是直接使用CAS操作实现的
,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理。
入队线程写入元素时,之前的元素没有被取走的时入队线程阻塞
,出队线程获取不到元素的时出队线程阻塞
;
支持公平/非公平策略。公平模式内部数据结构基于“队列”
。非公平模式内部数据结构基于“栈”
来实现,有一个头节点(栈顶)
应用场景:线程池newCachedThreadPool()
就使用SynchronousQueue,这个线程池新任务到了
如果有空闲线程则使用空闲线程执行(复用
),没有就创建新线程,不会对任务进行缓存。不推荐使用
无效操作
isEmpty()、size()、clear()、remove(Object)、contains(Object)、peek()
都不支持了。注意:上述的特点1,和我们之前介绍的Exchanger其实非常相似,可以类比Exchanger的功能来理解。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//用于自旋控制的可用cpu数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
//为什么需要自旋这个操作?
//因为线程 挂起 唤醒站在cpu角度去看的话,是非常耗费资源的,涉及到用户态和内核态的切换...
//自旋的好处,自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了
//如果自旋期间未被匹配到,自旋次数达到某个指标后,还是会将当前线程挂起的...
//NCPUS:当一个平台只有一个CPU时,你觉得还需要自旋么?
//答:肯定不需要自旋了,因为一个cpu同一时刻只能执行一个线程,自旋没有意义了...而且你还站着cpu 其它线程没办法执行..这个
//栈的状态更不会改变了.. 当只有一个cpu时 会直接选择 LockSupport.park() 挂起等待者线程。
// 最大空旋时间
// 表示未指定超时的话,线程等待匹配时,自旋次数。
// 是指定超时限制的请求的自旋次数的16倍.
// 32是一个经验值
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;//单核CPU自旋倍数为0,多核cpu自旋倍数为32
// 无限时的等待的最大空旋时间
// 表示未指定超时的话,线程等待匹配时,自旋次数。
// 是指定超时限制的请求的自旋次数的16倍
static final int maxUntimedSpins = maxTimedSpins * 16;
// 超时自旋等待阈值
// 如果请求是指定超时的话,如果超时nanos参数是< 1000纳秒时,禁止挂起。
// 挂起再唤醒的成本太高了..还不如选择自旋空转呢...
static final long spinForTimeoutThreshold = 1000L;
//tranferer对象, 构造时根据策略类型确定.
private transient volatile Transferer<E> transferer;
volatile修饰的Transferer变量
,它的核心操作都将委托给transferer
。 //默认构造器-TransferStack(非公平策略)
public SynchronousQueue() {
this(false);
}
// 通过 fair 值来决定内部用 使用 queue 还是 stack 存储线程节点的构造器
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue
,而非公平策略则是构造了TransferStack
,默认就是非公平模式。
抽象内部类Transferer
,SynchronousQueue中的所有方法,其实都是委托调用了TransferQueue/TransferStack
的方法 abstract static class Transferer<E> {
/**
* 写入或取出元素
* @param e null时表示这个请求是一个 REQUEST 类型的请求 消费者 -> 生产者
* 非null,说明这个请求是一个 DATA 类型的请求。 生产者 -> 消费者
*
* @param timed 如果为true 表示指定了超时时间 ,如果为false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断。
* @param nanos 超时时间限制 单位 纳秒
*
* @return E 1.如果是REQUEST类型的请求,返回值不为null,表示匹配成功,返回null,表示请求超时 或 被中断。
* 2.如果是DATA类型的请求,返回值不为null,表示匹配成功,返回当前线程put的数据。
* 3.如果返回值为null 表示,DATA类型的请求超时 或者 被中断..都会返回Null。
*
*/
@return 非null表示传递的数据; null表示传递失败(超时或中断)
abstract E transfer(E e, boolean timed, long nanos);
}
Transferer类中定义了抽象方法transfer,该方法用转移元素
,是最最核心的方法
参数e
:
生产者将数据传递给消费者时
,传递的参数为e
,是一个非null的元素
。消费者希望生产者提供元素时
,传递的参数为null
。timed
:表示是否设置超时,true表示指定了超时时间 ,false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断。nanos
:表示超时时间限制
单位 纳秒
返回值
:非null表示消费者从生产者那得到的值,null表示超时或者中断,具体需要通过检测中断状态得到。transfer是根据这一点来判断是读还是写线程
,接着决定是否匹配等
队列
,有一个头节点和尾节点,对应实现类为:TransferQueue
。关于TransferQueue的transfer方法其思路和TransferStack大致相同,总之就是入队/出队必须一一匹配,否则任意一方就会加入队列并等待匹配线程唤醒
。
队列与栈都是通过链表
来实现的。具体的数据结构如下
//-----------------TransferQueue成员属性start--------------------
//队首
transient volatile QNode head;
//队尾
transient volatile QNode tail;
/**
* 标记节点:用于表示被删除节点的前驱节点。因为入队操作是 两步完成的,
* 第一步:t.next = newNode
* 第二步:tail = newNode
* 所以,队尾节点出队,是一种非常特殊的情况,需要特殊处理,下面有讲!
*
* clean方法中使用,用来保存 需要延后移除的节点的前驱
*/
transient volatile QNode cleanMe;
//UNSAFE获取的成员内存偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset; //head字段偏移量
private static final long tailOffset; //tail字段偏移量
private static final long cleanMeOffset; //cleanMe字段偏移量
//-------------省略UNSAFE相关获取属性偏移量代码----------------------
//-----------------TransferQueue成员属性 end--------------------
需要注意点cleanMe属性
标记它的下个节点要删除
节点 node 是队列的末尾
, 则开始用这个节点,为什么呢?
A.CASNext(B, B.next)
就可以,但是当 节点 B 是整个队列中的末尾元素时
。 线程A删除节点B, 线程B在节点B之后插入节点 ,这样操作容易致使插入的节点丢失
, 这个cleanMe很像 ConcurrentSkipListMap 中的 删除添加的 marker 节点
, 他们都是起着相同的作用哨兵节点( dummy node)
,然后头尾节点都指向这个哨兵节点
。 //-----------------TransferQueue构造方法 start--------------------
TransferQueue() {
//构造一个哨兵节点, 而整个 queue 中永远会存在这样一个 dummy node
// dummy node 的存在使得 代码中不存在复杂的 if 条件判断
QNode h = new QNode(null, false); // initialize to dummy node.
//头尾节点都指向这个哨兵节点
head = h;
tail = h;
}
//-----------------TransferQueue构造方法 end--------------------
//-----------------TransferQueue静态内部类 QNode-start--------------------
static final class QNode {
//--------成员属性start---------
//指向下一个节点。
volatile QNode next;
//节点数据,用CAS设置
volatile Object item;
// 标记在该节点上等待的线程是哪个
//当Node对应的线程未匹配到节点时,对应的线程 最终会挂起,挂起之前会保留 线程引用到waiter ,
//方法 其它Node匹配当前节点时 唤醒 当前线程..
volatile Thread waiter;
// isData == true表示put线程节点,反之take线程节点
final boolean isData;
// Unsafe获取的item、next属性的内存偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
//-------------省略UNSAFE相关获取属性偏移量代码----------------------
//--------成员属性end---------
//--------构造方法start---------
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//--------构造方法end---------
//--------CAS更新成员属性方法start-------
//主要用于删除节点,CAS更新当前节点next为val
boolean casNext(QNode cmp, QNode val) {
return next == cmp && //next等于cmp && this+nextOffset=cmp,更新为val
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//CAS更新当前节点item为val
boolean casItem(Object cmp, Object val) {
return item == cmp && //item等于cmp && this+itemOffset=cmp,更新为val
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//取消当前节点.将当前节点item指向自身
void tryCancel(Object cmp) {
//item等于cmp && this+itemOffset=cmp,更新为this
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
//--------CAS更新成员属性方法end-----------
//判断当前Node是否为取消状态
boolean isCancelled() {
return item == this;//当前节点item指向自身,表示当前节点被取消
}
//判断当前节点是否 “不在” 队列内,当next指向自己时,说明节点已经出队。
boolean isOffList() {
return next == this;//当前节点的next指向自身,说明节点已出队,返回true
}
}
//-----------------TransferQueue静态内部类 QNode-end--------------------
若队列为空 且 队列中的尾节点和自己的 类型相同, 则添加 node 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配。
队列不为空, 且队列的head.next
节点是当前节点匹配的节点,
advanceHead()
方法帮助 先前 阻塞的节点 dequeue匹配成功时返回非null值,匹配失败时返回null值
。 /**
*TransferQueue公平模式-写入或取出元素方法
*
* @param e null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
* @param timed true 如果定时等待,则为True
* @param nanos 超时时间,单位纳秒
* @return 匹配成功时返回非null值,匹配失败时返回null值
*/
E transfer(E e, boolean timed, long nanos) {
// s表示新创建的节点
QNode s = null;
// null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
boolean isData = (e != null);
//-----自旋1
for (;;) {
QNode t = tail;//队尾节点
QNode h = head;//队头节点
//初始化的队头和队尾都是指向的一个"空" 节点,即队列未初始化.重新开始自旋
if (t == null || h == null) continue;
//-----1.若队列为空 或 队列中的尾节点和当前节点的类型相同, 则执行入队操作------
//条件一:成立,说明head和tail同时指向dummy节点,即当前队列是空队列。此时当前线程需要做入队操作,因为没有任何节点 可以去匹配。
//条件二:队列非空,队尾节点与当前节点类型相同。说明类型不匹配,此时当前节点只能入队...
// (注意这里是和队尾节点比较, 下面进行匹配时是和队头 head.next 进行比较)
if (h == t || t.isData == isData) {
//1.1.获取当前队尾t的后继节点next
QNode tn = t.next;
//1.2.存在并发:队尾被改变了,说明有其他线程入队节点,导致读到的tail不一致,重新开始自旋
if (t != tail) continue;
// 5.
//1.3.存在并发:读的tail一致,其他线程添加了 tail.next, 所以帮助推进tail,尝试将tn设置为新尾节点,重新开始自旋
//条件成立:说明已经有线程 入队了,且只完成了 入队的 第一步:设置t.next = newNode, 第二步可能尚未完成..
if (tn != null) {
advanceTail(t, tn);//新尾节点为tn
continue;
}
//1.4.如果是超时等待操作,但不支持超时,则直接返回NULL
//条件成立:说明当前调用transfer方法的 上层方法 可能是 offer() 无参的这种方法进来的,这种方法不支持 阻塞等待...
if (timed && nanos <= 0) return null;
//1.5.条件成立:说明当前请求尚未创建对应的node, 则构建一个新节点,并赋值给s
if (s == null) s = new QNode(e, isData);
//1.6.新节点入队: 将s新节点CAS链接到tail的后继next ,入队失败, 则开始下一轮自旋
//条件不成立:!t.casNext(null, s) 说明当前t仍然是tail,当前线程对应的Node入队的第一步 完成!
if (!t.casNext(null, s)) continue;
//1.7.更新队尾 为新节点s。
advanceTail(t, s);
//----------------
//当前请求为DATA模式时:e为写入数据
//item == this 当前SNode对应的线程 取消状态
//item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。
//当前请求为REQUEST模式时:e为null
//item == this 当前SNode对应的线程 取消状态
//item != null 且 item != this 表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
//----------------
//1.8.自旋或者阻塞直到s节点被匹配
Object x = awaitFulfill(s, e, timed, nanos);//自旋或阻塞线程,直到满足s.item != e
//1.9x == s 表示节点为取消状态(中断或超时),需要做出队逻辑。
if (x == s) {
// 对节点 s 进行清除,
//1.若s不是链表的最后一个节点, 则直接 CAS 进行 节点的删除
//2.若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe
clean(t, s);
return null;
}
//----执行到这里说明 当前Node 匹配成功了...
//a.当前线程在awaitFulfill()内,已经挂起了...此时运行到这里时是被匹配节点的线程使用LockSupport.unpark() 唤醒的..
//被唤醒:当前请求对应的节点,肯定已经出队了,因为匹配者线程 是先让当前Node出队的,再唤醒当前Node对应线程的。
//b.当前线程在awaitFulfill方法内,处于自旋状态...此时匹配节点 匹配后,它检查发现了,然后返回到上层transfer()
//自旋状态返回时:当前请求对应的节点,不一定就出队了...
//1.10 条件成立:说明当前Node仍然在队列内,需要做 匹配成功后 出队逻辑。
//被唤醒时:s.isOffList() 条件会成立。 !s.isOffList() 不会成立。
if (!s.isOffList()) { //isOffList用于判断节点是否已经出队 next == this
//其实这里面做的事情,就是防止当前Node是自旋检查状态时发现 被匹配了,然后当前线程 需要将
//当前线程对应的Node做出队逻辑.
//1.10.1. t为原队尾,是当前s节点的前驱节点,更新dummy节点为 s节点。表示head.next节点t已经出队了...
advanceHead(t,s);
//x != null 且 item != this 表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
//1.10.2. 因为s节点已经出队了,所以需要把它的s.item指向自身,表示它是个取消出队状态。
if (x != null) s.item = s;
//1.10.3. 设置s节点的waiter为null
s.waiter = null;
}
//1.11. 如果x != null,说明当前请求是REQUEST类型,返回匹配到的数据x,否则返回当前DATA请求提交的元素e
return (x != null) ? (E)x : e;
}
//2.----队列不为空 且当前入队节点与队尾节点类型不同,说明类型匹配(队尾->DATA,请求类型->REQUEST) (队尾->REQUEST, 请求类型->DATA),进行出队操作---
//进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注队列刚开始构建时有个 dummynode, 而且 head 节点永远是个 dummynode 这个和AQS一样的)
else {
//2.1.获取队头节点的后继节点next(m是匹配的节点)。
// 为什么这里是从头节点head.next下一个节点开始? 因为TransferQueue总是会存在一个dummy node节点
// h.next节点 其实是真正的队头,TransferQueue是一个公平模式,与LIFO模式不同,需要与队头发生匹配
QNode m = h.next;
//2.2. 存在并发:读取不一致取,说明有其他线程改变了队列的结构,重新自旋
//条件一:t != tail 成立时机? 肯定是并发导致的,其它线程已经修改过tail了,有其它线程入队过了..当前线程看到的是过期数据,需要重新循环
//条件二:m == null 成立时机? 肯定是其它请求先当前请求一步,匹配走了head.next节点。
//条件三:条件成立,说明已经有其它请求匹配走head.next了。。。当前线程看到的是过期数据。。。重新循环...
if (t != tail || m == null || h != head) continue;
/** producer 和 consumer 匹配操作
* 1. 获取 m的 item (注意这里的m是head的next节点)
* 2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
* 3. x == m 判断是否 节点m 是否被取消, 具体看(QNOde#tryCancel)
* 4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
* 5. 若 cas操作成功则将h节点dequeue
*
* 疑惑: 为什么将h进行 dequeue, 而不是 m节点
* 答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
*/
//2.3.不存在并发获取数据没有改变,获取匹配节点m的元素值
Object x = m.item;
if (isData == (x != null) || // 模式是否匹配
x == m || // m节点是否被取消
!m.casItem(x, e)) { // CAS更新m节点值item为e失败
// 上面三种情况,任意一种发生,都进行h的出队操作,新队头更新为m,然后重新开始自旋
advanceHead(h, m);
continue;
}
//匹配成功,设置m新的头节点,将真正的头节点h 出队。让这个真正的头节点成为dummy节点(next指向自身)
advanceHead(h, m);
//唤醒在m上等待的线程
LockSupport.unpark(m.waiter);
//10.如果x != null,说明当前请求是REQUEST类型,返回匹配到的数据x,否则返回当前DATA请求提交的元素e
return (x != null) ? (E)x : e;
}
}
}
主要逻辑:
判断当前节点类型
。( e为null 时表示消费(take 操作); e 不为 null 时表示生产(put 操作))
进入无限循环
获取头尾节点 head/tail
,若没有初始化,则 continue 继续
。
如果队列为空
或者 尾节点的类型与当前节点类型
相同,则进行入队操作
。执行下面<子步骤>
,否则 执行>>第 5 步骤
尾节点
的 next 节点赋值tn
continue
继续。尝试将tn设置为新尾节点
,然后 continue 继续。超时
,则直接返回 null
队列尾节点的 next 节点
。如果入队成功 执行>>第 4.7 步骤,否则, continue 继续。队尾tail
自旋或者阻塞直到s节点被匹配
。返回 null
如果出队返回元素
,方法结束;否则辅助节点 s 出队。执行至此,说明队列不为空,且当前节点
与 上次入队节点
类型相同(put 匹配 take ,take 匹配 put),进行出队
操作。
赋值 m 节点
continue 继续
。item 赋值x
m 节点已被匹配
或m 节点被取消
或者CAS 更新 m 节点的 item 为e失败
,则CAS更新m为新队头
,并continue 继续
。m为新队头
自旋或者阻塞
,直到满足某些条件。 /**
* TransferQueue-自旋或者阻塞,直到s节点匹配完成
*
* 主逻辑: 若节点是 head.next 则进行 spins 一会,
* 若不是, 则调用 LockSupport.park / parkNanos(), 直到其他的线程对其进行唤醒
*
* @param s 等待节点
* @param e 用于检查匹配的比较值
* @param timed true 如果定时等待,则为True
* @param nanos 超时时间,单位纳秒
* @return 返回匹配值,如果取消节点,则返回s
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
//根据timed标识计算截止时间= 如果true ? 当前纳秒+nanos :0 (只有 timed 为true 时才有用)
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 计算需要自旋的次数
// 当前节点是head.next才自旋,说明恰好s正好是第一个入队节点,则会自旋一段时间,避免阻塞,提高效率,因为其他情况是会涉及到 park挂起线程的
// maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// maxUntimedSpins = maxTimedSpins * 16;
int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋
for (;;) {
//1.如果当前线程发生中断,那么尝试取消节点s, 将 item指向自身this, 在 transfer 中会对返回值进行判断
if (w.isInterrupted()) s.tryCancel(e);
//----------------------------------
//item有几种情况呢?
//当SNode模式为DATA模式时:
// 1.item != null 且 item != this 表示请求要传递的数据 put(E e)
// 2.item == this 当前SNode对应的线程 取消状态
// 3.item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。
//当SNode模式为REQUEST模式时:
// 1.item == null 时,正常状态,当前请求仍然未匹配到对应的DATA请求。
// 2.item == this 当前SNode对应的线程 取消状态
// 3.item != null 且 item != this 表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
//条件成立:
//当前请求为DATA模式时:e为写入数据
//item == this 当前SNode对应的线程 取消状态
//item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。
//当前请求为REQUEST模式时:e为null
//item == this 当前SNode对应的线程 取消状态
//item != null 且 item != this 表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
//----------------------------------
//2.获取s的元素值
Object x = s.item;
//3. x != e,才会退出循环也是唯一的出口,如果进行线程阻塞->唤醒 或中断,x != e 就会成立
//在生成s节点的时候,s.item==e,当取消操作或者匹配了操作的时s会变化。
if (x != e) return x;
//如果设置了超时等待
if (timed) {
// 计算继续等待的时间
nanos = deadline - System.nanoTime();
// 如果超时,尝试取消该节点,然后重新开始自旋,下一次在 x!=e时退出循环
if (nanos <= 0L) {
s.tryCancel(e); //将s的item指向this自身实现取消操作
continue;
}
}
//1.自旋控制:每次减少自旋次数,如果自旋次数大于0,则继续自旋
if (spins > 0) --spins;
//2.自旋次数用完了,s的等待线程为空,设置s的等待线程为当前线程
else if (s.waiter == null) s.waiter = w;// 设置等待线程
//3.s等待线程不为空,当前是非超时等待操作,则当前一直阻塞当前线程,直到被唤醒
else if (!timed) LockSupport.park(this);
//4.s等待线程不为空,当前是超时等待操作,且剩余时间小于spinForTimeoutThreshold = 1000L的时候
//使用剩余时间阻塞当前线程,这样自旋性能的效率更高
else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos);
}
}
这个方法应该比较好理解,就是等待匹配操作的到来
,
timed=true
,那么就等待一定时间parkNanos(this, nanos)
,tryCancel(e)
,park(this)
,直到匹配的操作到来,或者发生中断(取消该操作)。这边总结一下一些注意点:
head.next
,如果是会优先选择自旋
而不是阻塞,自旋次数到了才阻塞
,主要是考虑到阻塞、唤醒需要消耗更多的资源。tryCancel()
的时候就会导致x!=e
,因为该方法会将s的item设置为this
。且在线程被中断,超时的时候都会调用tryCancel()
方法,这些条件下将会退出。用于移除已经被取消的节点
/**
* 对 中断的 或 等待超时的 节点进行清除操作
*
* @param pred pred 是s的前驱节点
* @param s 要移除节点
*/
void clean(QNode pred, QNode s) {
//将清理节点的waiter置为null
s.waiter = null;
/*
* 无论何时,队列中的最后一个节点不能被删除(这里的删除指 通过 cas 直接删除, 因为这样直接删除会有多删除其他节点的风险)
* 当节点 s 是最后一个节点时, 将 s.pred 保存为 cleamMe 节点, 下次再进行清除操作
*/
//pred.next == s 表示节点 s 还在队列中,下面的操作可能导致 pred.next = next
while (pred.next == s) {
// 获取头节点
QNode h = head;
//获取头节点的后继节点next
QNode hn = h.next;
//1.head.next节点被取消,那么CAS更新新队头为原队头的后继节点next,即hn。
if (hn != null && hn.isCancelled()) {
// hn中断或者超时,则推进 head 指针,若这时h是pred则 loop条 "pred.next == s" 不满足, 退出循环
advanceHead(h, hn);
continue;
}
//获取尾节点,确保对tail的读的一致性
QNode t = tail;
//2.队列为空, 说明其他的线程进行操作, 移除了节点(注意这里永远会有个 dummy node),退出
if (t == h) return;
//队列不为空, 获取尾节点的后继节点next
QNode tn = t.next;
//3.t不为尾节点,两次读不一致,说明存在并发.队尾改变了,重新开始自旋
if (t != tail) continue;
//4.尾节点后继节点tn理应为null ,如果不为空,说明存在并发,有其它线程进行了入队操作
if (tn != null) {
//尾节点后继节点tn为tail,重新开始自旋
advanceTail(t, tn);
continue;
}
//5.tn为null,如果删除的节点s不是尾节点t,则直接进行CAS删除s,直接将pred.next = s.next(在队列中间删除是没有风险的)
if (s != t) {
QNode sn = s.next;
// 如果s.next ==s ,则已经离开队列
// || 设置pred的后继为s的后继,将s从队列中删除
if (sn == s || pred.casNext(s, sn)) return; // 删除完毕,退出
}
//6.如果删除的s节点是尾节点t,那么暂时不能删除,需要使用cleanMe,检查 cleanMe 是否已经被标记了
//cleanMe标识的是需要删除节点的前驱
QNode dp = cleanMe;
//7.如果cleanMe不为空,说明cleanMe已经被标记了,如果标记有效则直接CAS删除
if (dp != null) {
/*
* cleamMe 失效的情况有:
* (1)cleanMe的后继为空(cleanMe 标记的是需要删除节点的前驱)
* (2)cleanMe的后继等于自身(这个前面有分析过)
* (3)需要删除节点的操作没有被取消
* (4)被删除的节点不是尾节点且其后继节点有效,并将待删除节点删除
*/
QNode d = dp.next; // d这里指的就是 要删除的节点
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
// 清除 cleanMe 节点
casCleanMe(dp, null);
//如dp == pred 若成立, 说明清除节点s成功,直接退出, 否则再次循环, 执行到casCleanMe(null, pred), 设置这次的 cleanMe 然后再返回
if (dp == pred) return;
}
//8.原来的 cleanMe 是 null, 则将被删除节点的前驱 pred 标记为 cleamMe 为为下次循环时清除s节点做标识
else if (casCleanMe(null, pred)) return;
}
}
链表的删除只需要设置节点指针的关系就可以了,这个 clean 的方法就在链表基础上增加了一个逻辑:
将pred.next = s.next
cleanMe 保存需要删除的节点的前驱
s.pred 这样在下一轮的 clean
的过程将会清除打了标记的节点
。整体逻辑:
新队头为原队头的后继节点next
,即hn,continue重新开始
。删除的节点s不是尾节点,即s!=t
那么进行 cas 删除操作,将pred.next = s.next先检查 cleanMe 是否已经被标记
了检查标记是否还有效
cleanMe 没有被标
记,那么就标记为被删除节点的前驱 pred
。 //-----------------TransferQueue其他方法 -start--------------------
//推进head节点,新head为oldHead.next
//CAS更新队头节点,蕴含操作:老的头节点出队,即:将 老节点的 oldNode.next = this
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; //后继节点指向自身
}
//CAS更新队尾节点(t 老的队尾,新的队尾)
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
//CAS更新cleanMe为val
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
//-----------------TransferQueue其他方法 -end--------------------
}
公平模式下,底层实现使用队列,它有一个head和tail指针
,用于指向当前正在等待匹配的线程节点。
初始化时的 TransferQueue
线程put1
执行 put(1)操作 ,由于当前没有配对的消费线程,所以 put1线程入队
,自旋一小会后睡眠等待
接着,线程put2
执行 put(2)操作,put2线程入队
,自旋一小会后睡眠等待
这时来了一个线程 take1
,执行了 take操作 ,由于tail 指向 put2 线程
,put2线程
跟 take1 线程匹配,这时take1 线程不需要入队
唤醒
的线程并不是 put2,而是put1
.
谁先入队,谁优先被唤醒,这里显然 put1 应优先被唤醒.
put1 线程被唤醒后
,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信最后,再来一个线程take2
,执行take操作,这时候只有put2线程在等候
,而且2个线程匹配上了,线程put2被唤醒,take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点
transfer就是在一个循环中,不断地去做下面这些事情:
类型相同
【t.isData== isData】,将当前线程加入队列
,自旋的方式等待匹配。直到被匹配或超时,或中断或取消。存在可以匹配当前线程的节点
,将匹配的线程出队,重新设置队头
,返回数据。注意:无论是上面哪种情况,都会不断检测是否有其他线程在进行操作,如果有的话,会帮助其他线程执行入队出队操作。
栈
,有一个头节点(栈顶),对应实现类为:TransferStack
。队列与栈都是通过链表
来实现的。具体的数据结构如下
//-----------------TransferStack成员属性start--------------------
//表示栈顶节点,因为是栈结构,所以只需要用一个指针来保存栈顶
volatile SNode head;
// 未配对的消费者,即表示Node类型为 请求类型(消费者)
static final int REQUEST = 0;
//未配对的生产者,即 表示Node类型为 数据类型(生产者)
static final int DATA = 1;
// 配对成功的消费者/生产者 ,示该操作节点处于真正匹配状态
/** 表示Node类型为 匹配中类型,也就是说配对需要take配put 或 put配take
* 假设栈顶元素为 REQUEST-NODE,当前请求类型为DATA的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
* 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
*/
static final int FULFILLING = 2;
// Unsafe更新成员属性方法
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
//-------------省略UNSAFE相关获取属性偏移量代码----------------------
//-----------------TransferStack成员属性end--------------------
3种节点类型
,任何线程对TransferStack的操作都会创建下述3种类型的某种节点:
出队
操作时,会创建一个mode值为REQUEST
的SNode节点 )入队
操作时,会创建一个mode值为DATA
的SNode节点 )表示匹配另一个生产者或消费者
//-----------------TransferStack内部类-SNode-start--------------------
static final class SNode {
//--------成员属性start---------
//栈顶的下一个节点
volatile SNode next;
//与当前node匹配的节点
// 1.null:还没有任何匹配
// 2.等于自己:表示当前为取消状态
// 3.等于别的Snode:当前为匹配状态
volatile SNode match;
//阻塞的线程
// 假设当前node对应的线程 自旋期间未被匹配成功,那么node对应的线程需要挂起,挂起前 waiter 保存对应的线程引用,
// 方便 匹配成功后,被唤醒。
volatile Thread waiter;
//节点数据,1.data不为空 表示当前Node对应的请求类型为 DATA类型。 2.反之则表示Node为 REQUEST类型。
Object item;
/**
* 当前节点类型。有4种可能。
* 1) REQUEST 0000
* 2) DATA 0001
* 3) REQUEST(0000)| FULFILLING(0010) = 0010 表示消费者匹配到了生产者。REQUEST为此次操作,即出队
* 4)DATA(0001)| FULFILLING(0010) = 0011 表示生产者匹配到了消费者。DATA为此次操作,即入队
* 后2种是在成功匹配后,将节点入队时设置的节点类型。通过:FULFILLING|mode 计算节点类型
*/
int mode;
// Unsafe获取SNode成员属性
//-------------省略UNSAFE相关获取属性偏移量代码----------------------
//--------成员属性end---------
//--------有参构造方法start--------
SNode(Object item) {
this.item = item;//初始化节点存储数据
}
//--------有参构造方法end---------
//--------CAS更新成员属性方法start-------
/**
* 尝试匹配:调用tryMatch的对象是 栈顶节点的下一个节点,即与栈顶匹配的节点。
* 两个作用:
* 1.唤醒被阻塞的栈头m,2.把当前节点s赋值给 m 的 match 属性
* 这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s,其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
* @return ture true匹配成功,唤醒s 否则匹配失败..
*/
boolean tryMatch(SNode s) {
//条件一:match == null 成立,说明当前Node尚未与任何节点发生过匹配...
//条件二 成立:使用CAS方式更新match字段为节点s,表示当前Node已经被匹配了
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
//------match更新成功----
//获取本节点的等待线程: 当前节点s如果自旋结束,那么会使用LockSupport.park()挂起,挂起之前会将Node对应的Thread 保留到 waiter字段。
Thread w = waiter;
//w不为空,说明节点s对应的Thread已经挂起,则通过unpark(w)唤醒线程。
if (w != null) {
// 将本节点的等待线程重新置为null
waiter = null;
LockSupport.unpark(w);
}
return true;
}
//如果match不为空,且指向的是当前Node本身,则说明当前节点已经被匹配或者被取消,匹配成功,反之 则非取消状态。
return match == s;
}
//--------CAS更新成员属性方法end-------
//CAS更新next为val
boolean casNext(QNode cmp, QNode val) {
//优化:cmp == next 为什么要判断?
//因为cas指令 在平台执行时,同一时刻只能有一个cas指令被执行。
//有了java层面的这一次判断,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。
return next == cmp && //next等于cmp && this+nextOffset=cmp,更新为val
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//CAS更新match为this,即将match指向自身,表示当前节点被取消
void tryCancel() {
//如果match等于null && this, matchOffset=null,更新为this
//match字段 指向当前Node对象本身,表示这个Node是取消状态,取消状态的Node,最终会被强制移除出栈。
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
//如果match指向的是当前Node本身,那表示当前Node是取消状态,反之 则 非取消状态。
boolean isCancelled() {
return match == this;
}
}
//-----------------TransferStack内部类-SNode-end--------------------
匹配成功时返回非null值,匹配失败时返回null值
。 /**
*TransferStack非公平模式-压栈或弹栈方法
*
* @param e null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
* @param timed true 如果定时等待,则为True
* @param nanos 超时时间,单位纳秒
* @return 匹配成功时返回非null值,匹配失败时返回null值
*/
E transfer(E e, boolean timed, long nanos) {
// s表示新创建的节点
SNode s = null;
//根据e确定此次转移的类型, 入参e==null, 说明当前是take线程(消费者), 否则是put线程(生产者)
int mode = (e == null) ? REQUEST : DATA;
//--------自旋1
for (;;) {
//获取栈顶节点
// 头节点情况分类
// 1:为空,说明栈中还没有数据
// 2:非空 且 mode=REQUEST,说明头节点线程正等着拿数据
// 3:非空 且 mode=DATA, 说明头节点线程正等着写入数据
SNode h = head;
//------1.栈为空或当前压栈节点 与 栈头节点类型相同,即相同的操作taketake/putput,将节点压入栈-------
//比如都是 put,就把本次 put 放到该栈头的前面即可,让本次put能够先执行,take同理
if (h == null || h.mode == mode) {
//1.如果是超时等待操作,但不支持超时,则直接返回NULL
//条件成立:说明当前调用transfer方法的 上层方法 可能是 offer() 无参的这种方法进来的,这种方法不支持 阻塞等待...
if (timed && nanos <= 0) {
//栈顶节点被取消,协助栈顶出栈后,再一次自旋尝试。
if (h != null && h.isCancelled())
// 丢弃栈头,CAS更新栈头的后一个节点作为栈头(弹出之前的头节点)
casHead(h, h.next);
else
// 栈顶为null或者栈顶节点没有被取消,直接返回null
return null;
}
//1.2.未超时情况,生成SNode节点,CAS尝试把 e 作为新的栈头
//节点s压入栈: snode():生成一个SNode节点;将原来的头节点h设置为该节点s的next节点;casHead():将head头节点设置为该节点
//snode(SNode s, Object e, SNode next, int mode)
else if (casHead(h, s = snode(s, e, h, mode))) {
//---------------入栈成功,等待被匹配---------
//1.2.1自旋或者阻塞直到s节点被匹配
//1.正常情况:返回匹配的节点
//2.取消情况:返回当前节点 s节点进去,返回s节点...
SNode m = awaitFulfill(s, timed, nanos);
//1.2.2.如果匹配对象指向自身, 表示节点s被取消(中断或超时),返回null
if (m == s) {
//条件成立:说明当前Node状态是 取消状态...
clean(s); //将取消状态的节点 出栈...
return null; //取消状态 最终返回null
}
//---------执行到这里 说明当前Node已经被匹配了...
//1.2.3.h重新赋值为head头节点,并且不为null;头节点的next域为s节点,表示有节点插入到s节点之前,完成了匹配
//说明本来s是栈头的,现在s不是栈头了,表示存在并发,有节点压栈到s节点上面,把新的节点更新为栈头next
if ((h = head) != null && h.next == s) casHead(h, s.next);//移除插入在s之前的节点和s节点
//1.2.4.根据此次转移的类型返回元素, 如果是take线程,返回匹配节点m.item, 否则返回s.item,即当前请求提交的元素e
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
}
//2.---------栈不为空 且当前节点与栈顶节点类型不同,即不同操作(put-take或take-put),进行出栈操作--------
// 栈头正在等待其他线程 put 或 take
// 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
else if (!isFulfilling(h.mode)) {
//2.1.条件成立:栈头已经被取消,把下一个元素作为栈头(因中断或超时)
if (h.isCancelled()) casHead(h, h.next);// 弹出头节点
//2.2.将当前节点s压入栈中:将当前节点mode标记为 FULFILLING|mode(表示h正在匹配), 并设置s为栈顶head
// snode方法第3个参数 h 代表栈头,赋值给 s 的 next 属性
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 条件成立:说明当前节点压栈成功,入栈一个 FULFILLING|mode的NODE
//----------自旋,fulfill 节点 和 fulfill.next 节点进行匹配工作
for (;;) {
//2.2.1.获取s节点的后继节点(m就是栈头,通过上面 snode方法刚刚赋值)。
// 当前节点s压栈后,s.next指向原栈顶节点(也就是与当前节点匹配的节点)
// 为什么这里是从栈顶head.next下一个节点开始?因为TransferStack总是会存在一个dummy node节点
SNode m = s.next;
//2.2.2.如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了
// 就清空栈并跳出内部循环,到外部循环再重新入栈判断
if (m == null) { // 如果发现没有匹配对象
casHead(s, null); // 清空整个栈
s = null; // 清空s因为旧s的成员不对
break; //重启主循环,下一次循环将作为普通节点再入队
}
//-----------FULFILLING匹配节点不为null,进行真正的匹配工作-------
//2.2.3.获取待匹配节点的下一个节点,匹配成功,新头节点更新为mn
//链表结构s -> m -> mn
SNode mn = m.next;
//2.2.4.尝试匹配,匹配成功,则将栈头 和 m 一起出栈
// tryMatch两个作用:
// 1.唤醒被阻塞的栈头m,2.把当前节点s赋值给 m 的 match 属性
// 这样栈头m 被唤醒时,就能从 m.match 中得到本次操作s
// 其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
if (m.tryMatch(s)) {
//条件成立,结对出栈: 将匹配的两个节点s、m全部弹出栈,更新头节点为mn
casHead(s, mn); //弹出s节点和m节点
//根据此次转移的类型返回元素, 如果是take线程,返回匹配m.item, 否则返回s.item
return (E) ((mode == REQUEST) ? m.item : s.item);
}
else
//2.2.5.尝试匹配失败,说明m已经先一步被其它线程匹配了
// ,移除原待匹配节点m,更新新待匹配节点为mh,进入下一个自旋,再一次进行尝试
s.casNext(m, mn); //弹出m节点
}
}
}
//3.------栈顶类型为 FULFILLING类型(m & FULFILLING) != 0,表示栈顶和栈顶下面的栈帧正在发生匹配,当前请求需要做 协助 工作。
/// 走到这,表示有其他线程在进行匹配,处于正在匹配中状态,则帮助进行匹配,接着执行出栈操作
else {
//3.1.h 表示的是 FULFILLING节点,m FULFILLING匹配的节点。
//栈顶的后继节点
SNode m = h.next;
//m == null 什么时候可能成立呢?
//3.2.当s.next节点 超时或者被外部线程中断唤醒后,会执行 clean 操作 将 自己清理出栈,此时
//站在匹配者线程 来看,真有可能拿到一个null。则直接弹出栈顶, 重新进入下一次自旋
if (m == null) casHead(h, null); //弹出h节点
//3.3.大部分情况:走else分支。 栈顶的next不为空 ,尝试和其它线程竞争匹配
else {
//成功, 获取栈顶匹配节点的 下一个节点mn
SNode mn = m.next;
if (m.tryMatch(h))
//3.3.1.条件成立:说明 m 和 栈顶 匹配成功,CAS更新head域(弹出h和m节点)
casHead(h, mn); //弹出h和m节点
else
//3.3.2.尝试匹配失败,说明m已经先一步被其它线程匹配了
// ,移除原待匹配节点m,更新新待匹配节点为mn,进入下一个自旋,再一次进行尝试
h.casNext(m, mn); //移除m节点
}
}
}
}
整个transfer方法考虑了限时等待
的情况,且入队/出队其实都是调用了同一个方法
,其主干逻辑就是在一个自旋中完成以下3种情况之一的操作,直到成功,或者被中断或超时取消
:
大体逻辑如下:
当前节点
类型 与 栈顶节点
类型相同
。则尝试将节点加入栈内,同时通过阻塞
**(或自旋一段时间,如果有超时设置,则进行超时等待)**等待节点匹配,最后返回匹配的节点或者本身(被取消)当前节点
类型 与 栈顶节点
类型不相同
,则尝试将该节点打上FULFILLING 标记
,然后加入栈
中,与相应的节点匹配,成功后将这2个节点弹出栈
并返回匹配节点的数据有节点在匹配
,那么帮助这个节点完成匹配和出栈操作
,然后在主循环中继续执行
。 /**
* TransferStack-自旋或者阻塞,直到s节点匹配完成
*
* @param s 当前请求Node
* @param timed true 如果支撑定时等待,则为True
* @param nanos 超时时间,单位纳秒
* @return 返回匹配值,如果取消节点,则返回s
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//根据timed标识计算截止时间= 如果true ? 当前纳秒+nanos :0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 计算需要自旋的次数
// 当前节点=head.next,说明恰好 s 正好是第一个加入的节点,则会自旋一段时间,避免阻塞,因为其他情况是会导致线程被挂起park()
// maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// maxUntimedSpins = maxTimedSpins * 16;
int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//---------自旋检查逻辑:1.是否匹配 2.是否超时 3.是否被中断..
for (;;) {
//1.如果当前线程发生中断,那么尝试取消该节点
// Node对象的 match 指向 当前Node 说明该Node状态就是 取消状态。 在 transfer 中会对返回值SNode进行判断
if (w.isInterrupted()) s.tryCancel();
//2.s.match保存当前节点的匹配节点
// 0.s.match==null说明还没有匹配节点
// 1.正常情况:有一个请求 与 当前Node 匹配成功,这个时候 s.match 指向 匹配节点。
// 2.取消情况:当前match 指向 当前Node...,说明当前节点s对应的线程被中断、超时了
SNode m = s.match;
//3.如果 m 不为空,则说明节点s已被匹配,直接返回 m
if (m != null) return m;
//4.如果匹配不成功,如果设置了超时等待
if (timed) {
//计算表示距离超时 还有多少纳秒..
nanos = deadline - System.nanoTime();
// 如果超时,尝试取消该节点,然后重新开始自旋,下一次在 x!=e时退出循环
if (nanos <= 0L) {
//Node对象的 match 指向 当前Node 说明该Node状态就是 取消状态
//设置当前Node状态为 取消状态.. match-->当前Node
s.tryCancel();
continue;
}
}
//4.说明当前线程还可以进行自旋检查...减少自旋次数,如果自旋次数大于0,则继续自旋
if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0;
//5.自旋次数用完了,s的等待线程为空,设置s的等待线程为当前线程
//当一个 节点/线程将要阻塞时,它会先设置其waiter段,然后在下次自旋判断waiter不为null时进行阻塞
else if (s.waiter == null) s.waiter = w;
//6.s等待线程不为空,当前请求是非超时等待操作,则当前一直阻塞当前线程,直到被唤醒
else if (!timed) LockSupport.park(this);
//nanos > 1000 纳秒的值,只有这种情况下,才允许挂起当前线程..
//7. s等待线程不为空,当前是超时等待操作,且剩余时间> 1000 纳秒的时,才允许挂起当前线程..
//否则 说明超时给的太少了...挂起和唤醒的成本 远大于 空转自旋..
// 使用剩余时间阻塞当前线程,这样自旋性能的效率更高
else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos);
}
}
这个方法应该比较好理解,就是等待匹配操作的到来
,整体逻辑如下:
超时截止时间
,当前线程
及 自旋次数
中断
。如果是将节点 s 取消
,即将节点 s 的 match指向自身this
,否则进入下一步。match 值 m
。如果 m 不为空,则说明节点 s 已被匹配,直接返回 m
;否则,进行下一步。超时时间timed
则判断是否超时。如果超时则将节点s取消 s.tryCancel
,continue继续循环。自旋
一段时间,最后将当前线程阻塞park(this)
,直到匹配的操作到来,或者发生中断(取消该操作)。从栈顶节点开始到该节点
(不包括 s)之间的所有已取消节点
。 void clean(SNode s) {
//1.清空数据域
// s节点的item设置为null
s.item = null;
// s节点的waiter设置为null
s.waiter = null;
//检查取消节点的截止位置
SNode past = s.next;//被删除节点的后继节点
//如果x后继节点 past 不为 null 且已被取消,则先删除该节点节点
if (past != null && past.isCancelled())
//重新设置past
past = past.next;
SNode p;
//从栈顶头节点开始到past节点(不包括),将连续的取消节点移除
while ((p = head) != null && p != past && p.isCancelled()) //头节点赋值给p,且p不为空,且p不等于s的后继节点,且p被取消
casHead(p, p.next);//更新头节点为删除节点的后继节点next,以便下一次判断(弹出取消的节点)
//因为是单向链表,因此需要从head 开始,遍历到被删除节点s的后继节点next,如有被取消了的操作的节点,那么就移除掉。
while (p != null && p != past) {// 移除上一步骤没有移除的非连续的取消节点
// 获取p的next节点
SNode n = p.next;
// n不为null并且n被取消
if (n != null && n.isCancelled())
p.casNext(n, n.next);//CAS更新p的next为n.next
else
p = n;
}
}
主要功能:从栈顶删除节点 s
。大体逻辑如下:
置为 null
节点 past
past 已被取消
,则先删除其后继节点past
head 操作节点
也被取消,那么就重新更新头节点为删除节点的后继节点next,以便下一次判断从 head开始 遍历到被删除节点 s 的后继
,如有被取消了的操作的节点,那么就移除掉。
//-----------------TransferStack其他方法-start--------------------
/**
* 构造一个SNode
* @param s SNode引用,当这个引用指向空时,snode方法会创建一个SNode对象 并且赋值给这个引用
* @param e SNode对象的item字段
* @param next 指向当前栈帧的下一个栈帧
* @param mode REQUEST/DATA/FULFILLING
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
//CAS更新栈顶元素,同样使用到了优化
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
//判断当前模式是否为 匹配中状态
static boolean isFulfilling(int m) {
return (m & FULFILLING) != 0;
}
//判断节点s是否需要自旋
boolean shouldSpin(SNode s) {
//获取栈顶
SNode h = head;
//条件一 h == s :条件成立 说明当前s 就是栈顶,允许自旋检查...
//条件二 h == null : 当前s节点 自旋检查期间,又来了一个 与当前s 节点匹配的请求,双双出栈了...条件会成立。
//条件三 isFulfilling(h.mode) : 前提 当前 s 不是 栈顶元素。并且当前栈顶正在匹配中,这种状态 栈顶下面的元素,都允许自旋检查。
return (h == s || h == null || isFulfilling(h.mode));
}
//-----------------TransferStack其他方法-end--------------------
}
线程put1
执行 put(1)操作 ,由于当前无配对的消费线程,所以put1线程入栈
,自旋一小会后睡眠等待
接着,线程put2
再次执行了put(2)操作 ,put2线程入栈
,自旋一小会后睡眠等待
如果这时来了一个线程take1
,执行take(1)操作 ,这时候发现栈顶为put2线程,匹配成功
,但是实现会先把take1线程
入栈,然后take1线程
循环执行匹配put2线程
逻辑,一旦发现没有并发冲突
,就会把栈顶指针直接指向 put1线程
最后,再来一个线程take2
,执行take(2)操作,这跟上一步的逻辑基本一致,take2线程
入栈,然后在循环中匹配put1线程
,最终全部匹配完毕,栈空
从上面流程看出,虽然put1线程先入栈了,但是却是后匹配
,这就是非公平策略.
transfer方法其实就是在一个循环中持续地去做下面3件事情:
空
的,或者当前线程类型和head节点类型相同
,则将当前线程加入栈中
,通过自旋的方式等待匹配
。最后返回匹配的节点,如果被取消,则返回null
。有节点可以和当前线程进行匹配
【put与take表示匹配,mode不相等】,CAS加上FULFILLING标记
,将当前线程压入栈顶,和栈中的节点进行匹配,匹配成功,出栈这两个节点
。isFulfilling(h.mode)
,则帮助它进行匹配并出栈
,再执行后续操作。可以看到,SynchronousQueue一样不支持入队null元素,实际的入队/出队操作都是委托给了transfer方法
,该方法返回null表示出/入队失败
(通常是线程被中断或超时):
阻塞该入队线程
.
public void put(E e) throws InterruptedException {
//不允许写入为null的元素
if (e == null) throw new NullPointerException();
//transfer()返回 null 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常
if (transferer.transfer(e, false, 0) == null) {// 进行转移操作
Thread.interrupted();
throw new InterruptedException();
}
}
put(E e)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
超时功能
,传入一个timeout
,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回false
。 public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//元素e为空,抛异常
if (e == null) throw new NullPointerException();
//进行转移操作
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
// 当前线程没有被中断
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
阻塞该入队线程
.
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
//返回 null 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常
Thread.interrupted();
throw new InterruptedException();
}
public E poll() {
return transferer.transfer(null, true, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
//返回 null 或没有被中断 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常
throw new InterruptedException();
}
不存储实际元素
,而是在内部通过栈或队列
结构保存阻塞线程的阻塞队列,每个put的操作必须等待另一个线程进行相应的take操作,反之亦然
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。