当前位置:   article > 正文

【Java多线程】JUC之并发容器—深入剖析并发队列SynchronousQueue(七)_fulfilling|mode

fulfilling|mode

前言

并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信

【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列


什么是栈

【Java基础】Stack基础

一.JDK提供的并发容器

在这里插入图片描述

二.特点

  • SynchronousQueue: 一个 不存储元素(没有容量) 的阻塞队列,,每个插入操作(put)必须等到另一个线程调用移除操作(take) (即: 写入元素必须被移除后才能继续写入新的元素),否则写入操作一直处于阻塞状态。支持公平锁(TransferQueue-FIFO)和非公平锁(TransferStack-LIFO)。相较于其他可缓存元素队列,适用于单线程同步传递性场景比如:消费者没拿走当前的产品,生产者是不能再给产品的,这样可以控制生产者生产的速率和消费者一致

特点如下:

  • 基于一种名为“Dual stack and Dual queue”的无锁算法实现。与其他阻塞队列不同该队列不依赖与AQS实现,而是直接使用CAS操作实现的,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理

  • 入队线程写入元素时,之前的元素没有被取走的时入队线程阻塞,出队线程获取不到元素的时出队线程阻塞

    • 队头、或者栈顶是第一个的put线程试图添加到队列中的元素
  • 支持公平/非公平策略。公平模式内部数据结构基于“队列”。非公平模式内部数据结构基于“栈”来实现,有一个头节点(栈顶)

  • 应用场景:线程池newCachedThreadPool()就使用SynchronousQueue,这个线程池新任务到了如果有空闲线程则使用空闲线程执行(复用),没有就创建新线程,不会对任务进行缓存。不推荐使用

  • 无效操作

    • SynchronousQueue完全没有容量的概念了,很多常用操作比如isEmpty()、size()、clear()、remove(Object)、contains(Object)、peek()都不支持了。
    • 可以用的公共方法只有:
      • 入队:put(E)、offer(E)、offer(E, long, TimeUnit)
      • 出队:take()、poll()、poll(long, TimeUnit)

注意:上述的特点1,和我们之前介绍的Exchanger其实非常相似,可以类比Exchanger的功能来理解。

三.继承关系

在这里插入图片描述

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

  • 1
  • 2
  • 3

四.主要属性

  	//用于自旋控制的可用cpu数量 
    static final int NCPUS = Runtime.getRuntime().availableProcessors();


//为什么需要自旋这个操作?
//因为线程 挂起 唤醒站在cpu角度去看的话,是非常耗费资源的,涉及到用户态和内核态的切换...

//自旋的好处,自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了
//如果自旋期间未被匹配到,自旋次数达到某个指标后,还是会将当前线程挂起的...

	//NCPUS:当一个平台只有一个CPU时,你觉得还需要自旋么?
	//答:肯定不需要自旋了,因为一个cpu同一时刻只能执行一个线程,自旋没有意义了...而且你还站着cpu 其它线程没办法执行..这个
	//栈的状态更不会改变了.. 当只有一个cpu时 会直接选择 LockSupport.park() 挂起等待者线程。

  	// 最大空旋时间
	//		表示未指定超时的话,线程等待匹配时,自旋次数。
	//		是指定超时限制的请求的自旋次数的16倍.
	//		32是一个经验值
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;//单核CPU自旋倍数为0,多核cpu自旋倍数为32

 	// 无限时的等待的最大空旋时间
	//		表示未指定超时的话,线程等待匹配时,自旋次数。
	//		是指定超时限制的请求的自旋次数的16倍
    static final int maxUntimedSpins = maxTimedSpins * 16;

  	 // 超时自旋等待阈值
	//		如果请求是指定超时的话,如果超时nanos参数是< 1000纳秒时,禁止挂起。
	//			挂起再唤醒的成本太高了..还不如选择自旋空转呢...
    static final long spinForTimeoutThreshold = 1000L;
    
     //tranferer对象, 构造时根据策略类型确定.
    private transient volatile Transferer<E> transferer;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • SynchronousQueue内部维护了volatile修饰的Transferer变量,它的核心操作都将委托给transferer

五.构造方法

	//默认构造器-TransferStack(非公平策略)
    public SynchronousQueue() {
        this(false);
    }
   
	// 通过 fair 值来决定内部用 使用 queue 还是 stack 存储线程节点的构造器
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 可以看到,对于公平策略,内部构造了一个TransferQueue,而非公平策略则是构造了TransferStack,默认就是非公平模式。
    • 这两个类都继承自抽象内部类Transferer,SynchronousQueue中的所有方法,其实都是委托调用了TransferQueue/TransferStack的方法

六.抽象静态内部类–Transferer

    abstract static class Transferer<E> {   
    /**
     * 写入或取出元素
     * @param e   null时表示这个请求是一个 REQUEST 类型的请求       消费者 -> 生产者
     *            非null,说明这个请求是一个 DATA 类型的请求。	  生产者 -> 消费者
     *
     * @param timed 如果为true 表示指定了超时时间 ,如果为false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断。
     * @param nanos 超时时间限制 单位 纳秒
     *
     * @return E 1.如果是REQUEST类型的请求,返回值不为null,表示匹配成功,返回null,表示请求超时 或 被中断。
     *           2.如果是DATA类型的请求,返回值不为null,表示匹配成功,返回当前线程put的数据。
     *           3.如果返回值为null 表示,DATA类型的请求超时 或者 被中断..都会返回Null。
     *
     */
     @return 非null表示传递的数据; null表示传递失败(超时或中断)
        abstract E transfer(E e, boolean timed, long nanos);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Transferer类中定义了抽象方法transfer,该方法用转移元素,是最最核心的方法

  • 参数e:
    • 当调用put方法,也就是生产者将数据传递给消费者时,传递的参数为e,是一个非null的元素
    • 而调用take方法,也就是消费者希望生产者提供元素时,传递的参数为null
  • timed:表示是否设置超时,true表示指定了超时时间 ,false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断。
  • nanos :表示超时时间限制 单位 纳秒
  • 返回值:非null表示消费者从生产者那得到的值,null表示超时或者中断,具体需要通过检测中断状态得到。

transfer是根据这一点来判断是读还是写线程,接着决定是否匹配等
在这里插入图片描述

在这里插入图片描述

七.静态内部类-TransferQueue(公平模式)

  • 公平模式下,底层数据结构是一个队列有一个头节点和尾节点,对应实现类为:TransferQueue
  • Dual Queue 特点
    • 整个队列有 head, tail 两个节点
    • 队列初始化时会有个 dummy 节点
    • 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也)

关于TransferQueue的transfer方法其思路和TransferStack大致相同,总之就是入队/出队必须一一匹配,否则任意一方就会加入队列并等待匹配线程唤醒

队列与栈都是通过链表来实现的。具体的数据结构如下
在这里插入图片描述

TransferQueue-公共成员

		//-----------------TransferQueue成员属性start--------------------
        //队首 
        transient volatile QNode head;
        //队尾 
        transient volatile QNode tail;

		/**
		 * 标记节点:用于表示被删除节点的前驱节点。因为入队操作是 两步完成的,
		 * 第一步:t.next = newNode
		 * 第二步:tail = newNode
		 * 所以,队尾节点出队,是一种非常特殊的情况,需要特殊处理,下面有讲!
		 * 
		 * clean方法中使用,用来保存 需要延后移除的节点的前驱
		 */
        transient volatile QNode cleanMe;

		//UNSAFE获取的成员内存偏移量
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;    	//head字段偏移量
        private static final long tailOffset;    	//tail字段偏移量
        private static final long cleanMeOffset;    //cleanMe字段偏移量

		//-------------省略UNSAFE相关获取属性偏移量代码----------------------
		//-----------------TransferQueue成员属性 end--------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

需要注意点cleanMe属性

  • cleanMe对应 被删除节点前继节点(中断或超时),这个节点存在的意义是标记它的下个节点要删除
  • 何时使用: 当你要删除节点 node, 若节点 node 是队列的末尾, 则开始用这个节点,

为什么呢?

  • 大家知道 删除一个节点 直接A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时。 线程A删除节点B, 线程B在节点B之后插入节点 ,这样操作容易致使插入的节点丢失, 这个cleanMe很像 ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用

TransferQueue-构造方法

  • TransferQueue 的构造函数,构造了一个哨兵节点( dummy node),然后头尾节点都指向这个哨兵节点
		//-----------------TransferQueue构造方法 start--------------------		
        TransferQueue() {
    		//构造一个哨兵节点, 而整个 queue 中永远会存在这样一个 dummy node
    		//	dummy node 的存在使得 代码中不存在复杂的 if 条件判断
            QNode h = new QNode(null, false); // initialize to dummy node.
            
            //头尾节点都指向这个哨兵节点
            head = h;
            tail = h;
        }
		//-----------------TransferQueue构造方法 end--------------------		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

TransferQueue-QNode(队列元素)

  • 用于TransferQueue的存储数据节点类
	//-----------------TransferQueue静态内部类 QNode-start--------------------
        static final class QNode {
        	//--------成员属性start---------
        	//指向下一个节点。
            volatile QNode next;          
            //节点数据,用CAS设置
            volatile Object item;         
            
            // 标记在该节点上等待的线程是哪个
            //当Node对应的线程未匹配到节点时,对应的线程 最终会挂起,挂起之前会保留 线程引用到waiter ,
    		//方法 其它Node匹配当前节点时 唤醒 当前线程..
            volatile Thread waiter;    
              
            // isData == true表示put线程节点,反之take线程节点
            final boolean isData; 		
            
            // Unsafe获取的item、next属性的内存偏移量
            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;


		//-------------省略UNSAFE相关获取属性偏移量代码----------------------
        	//--------成员属性end---------
        	
        	//--------构造方法start---------
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }
            //--------构造方法end---------

			//--------CAS更新成员属性方法start-------
			//主要用于删除节点,CAS更新当前节点next为val
            boolean casNext(QNode cmp, QNode val) {
                return next == cmp && //next等于cmp  && this+nextOffset=cmp,更新为val
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
			//CAS更新当前节点item为val
            boolean casItem(Object cmp, Object val) {
                return item == cmp && //item等于cmp  && this+itemOffset=cmp,更新为val
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }
			//取消当前节点.将当前节点item指向自身
            void tryCancel(Object cmp) {
                //item等于cmp  && this+itemOffset=cmp,更新为this
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }
			//--------CAS更新成员属性方法end-----------

			//判断当前Node是否为取消状态
            boolean isCancelled() {
                return item == this;//当前节点item指向自身,表示当前节点被取消
            }

			//判断当前节点是否 “不在” 队列内,当next指向自己时,说明节点已经出队。
            boolean isOffList() {
                return next == this;//当前节点的next指向自身,说明节点已出队,返回true
            }
        }
		//-----------------TransferQueue静态内部类 QNode-end--------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

TransferQueue(入队出队)-E transfer(E e, boolean timed, long nanos)

  1. 若队列为空 且 队列中的尾节点和自己的 类型相同, 则添加 node 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配。

    • timeout/interrupt时awaitFulfill方法返回的是 node 本身
    • 匹配成功的话, 要么返回 null (producer返回的), 或真正的传递值 (consumer 返回的)
  2. 队列不为空, 且队列的head.next节点是当前节点匹配的节点,

    • 进行数据的传递匹配, 并且通过advanceHead()方法帮助 先前 阻塞的节点 dequeue
  • transfer() 方法在匹配成功时返回非null值,匹配失败时返回null值
    /**
     *TransferQueue公平模式-写入或取出元素方法
     *
     * @param e  null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
     * @param timed true 如果定时等待,则为True
     * @param nanos 超时时间,单位纳秒
     * @return 匹配成功时返回非null值,匹配失败时返回null值
     */ 
     E transfer(E e, boolean timed, long nanos) {
   			// s表示新创建的节点
            QNode s = null;
           // null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
            boolean isData = (e != null);
			
			//-----自旋1
            for (;;) {
                QNode t = tail;//队尾节点
                QNode h = head;//队头节点
                
                  //初始化的队头和队尾都是指向的一个"空" 节点,即队列未初始化.重新开始自旋
                if (t == null || h == null) continue; 
                                      
				//-----1.若队列为空 或 队列中的尾节点和当前节点的类型相同, 则执行入队操作------
       			 //条件一:成立,说明head和tail同时指向dummy节点,即当前队列是空队列。此时当前线程需要做入队操作,因为没有任何节点 可以去匹配。
        		 //条件二:队列非空,队尾节点与当前节点类型相同。说明类型不匹配,此时当前节点只能入队...
        		 // (注意这里是和队尾节点比较, 下面进行匹配时是和队头 head.next 进行比较)
                if (h == t || t.isData == isData) {
                     //1.1.获取当前队尾t的后继节点next 
                    QNode tn = t.next;
                    
                    //1.2.存在并发:队尾被改变了,说明有其他线程入队节点,导致读到的tail不一致,重新开始自旋
                    if (t != tail)  continue; 
              
                    // 5. 
                    //1.3.存在并发:读的tail一致,其他线程添加了 tail.next, 所以帮助推进tail,尝试将tn设置为新尾节点,重新开始自旋
                    //条件成立:说明已经有线程 入队了,且只完成了 入队的 第一步:设置t.next = newNode, 第二步可能尚未完成..
                    if (tn != null) {  
                        advanceTail(t, tn);//新尾节点为tn
                        continue;
                    }
                    
                    //1.4.如果是超时等待操作,但不支持超时,则直接返回NULL
   					//条件成立:说明当前调用transfer方法的 上层方法 可能是 offer() 无参的这种方法进来的,这种方法不支持 阻塞等待...
                    if (timed && nanos <= 0)  return null; 
                 
                     //1.5.条件成立:说明当前请求尚未创建对应的node, 则构建一个新节点,并赋值给s
                    if (s == null) s = new QNode(e, isData);
                     
                     //1.6.新节点入队: 将s新节点CAS链接到tail的后继next ,入队失败, 则开始下一轮自旋
                     //条件不成立:!t.casNext(null, s)  说明当前t仍然是tail,当前线程对应的Node入队的第一步 完成!
                    if (!t.casNext(null, s))  continue; 

					//1.7.更新队尾 为新节点s。
                    advanceTail(t, s);

//----------------

        //当前请求为DATA模式时:e为写入数据
        //item == this 当前SNode对应的线程 取消状态
        //item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。

        //当前请求为REQUEST模式时:e为null
        //item == this 当前SNode对应的线程 取消状态
        //item != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
	//---------------- 
                   	
                    //1.8.自旋或者阻塞直到s节点被匹配
                    Object x = awaitFulfill(s, e, timed, nanos);//自旋或阻塞线程,直到满足s.item != e
   
                    //1.9x == s 表示节点为取消状态(中断或超时),需要做出队逻辑。
                    if (x == s) {    
                    	// 对节点 s 进行清除,
                    	//1.若s不是链表的最后一个节点, 则直接 CAS 进行 节点的删除
                    	//2.若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe           
                        clean(t, s);
                        return null;
                    }
                    
             		//----执行到这里说明 当前Node 匹配成功了...
           		   //a.当前线程在awaitFulfill()内,已经挂起了...此时运行到这里时是被匹配节点的线程使用LockSupport.unpark() 唤醒的..
                   //被唤醒:当前请求对应的节点,肯定已经出队了,因为匹配者线程 是先让当前Node出队的,再唤醒当前Node对应线程的。

           			//b.当前线程在awaitFulfill方法内,处于自旋状态...此时匹配节点 匹配后,它检查发现了,然后返回到上层transfer()
					//自旋状态返回时:当前请求对应的节点,不一定就出队了...

					//1.10 条件成立:说明当前Node仍然在队列内,需要做 匹配成功后 出队逻辑。
            			//被唤醒时:s.isOffList() 条件会成立。  !s.isOffList() 不会成立。
                    if (!s.isOffList()) { //isOffList用于判断节点是否已经出队 next == this    
                      //其实这里面做的事情,就是防止当前Node是自旋检查状态时发现 被匹配了,然后当前线程 需要将
               		  //当前线程对应的Node做出队逻辑.
               		  
                       //1.10.1. t为原队尾,是当前s节点的前驱节点,更新dummy节点为 s节点。表示head.next节点t已经出队了...
                        advanceHead(t,s); 
                             
                        //x != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
                		//1.10.2. 因为s节点已经出队了,所以需要把它的s.item指向自身,表示它是个取消出队状态。      
                        if (x != null)  s.item = s;
                        
                         //1.10.3. 设置s节点的waiter为null
                        s.waiter = null;
                    }

					//1.11. 如果x != null,说明当前请求是REQUEST类型,返回匹配到的数据x,否则返回当前DATA请求提交的元素e
                    return (x != null) ? (E)x : e;
                } 

                //2.----队列不为空 且当前入队节点与队尾节点类型不同,说明类型匹配(队尾->DATA,请求类型->REQUEST)  (队尾->REQUEST, 请求类型->DATA),进行出队操作---
                //进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注队列刚开始构建时有个 dummynode, 而且 head 节点永远是个 dummynode 这个和AQS一样的)
				else {                         

                    //2.1.获取队头节点的后继节点next(m是匹配的节点)。
                   	// 为什么这里是从头节点head.next下一个节点开始? 因为TransferQueue总是会存在一个dummy node节点
                   	//		h.next节点 其实是真正的队头,TransferQueue是一个公平模式,与LIFO模式不同,需要与队头发生匹配
                    QNode m = h.next;          
                    
                   //2.2. 存在并发:读取不一致取,说明有其他线程改变了队列的结构,重新自旋
                    //条件一:t != tail 成立时机? 肯定是并发导致的,其它线程已经修改过tail了,有其它线程入队过了..当前线程看到的是过期数据,需要重新循环
            	    //条件二:m == null 成立时机? 肯定是其它请求先当前请求一步,匹配走了head.next节点。
            	    //条件三:条件成立,说明已经有其它请求匹配走head.next了。。。当前线程看到的是过期数据。。。重新循环...
                    if (t != tail || m == null || h != head)  continue;


                    
            /** producer 和 consumer 匹配操作
             *  1. 获取 m的 item (注意这里的m是head的next节点)
             *  2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
             *  3. x == m 判断是否 节点m 是否被取消, 具体看(QNOde#tryCancel)
             *  4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
             *  5. 若 cas操作成功则将h节点dequeue
             *
             *  疑惑: 为什么将h进行 dequeue, 而不是 m节点
             *  答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
             */
             		//2.3.不存在并发获取数据没有改变,获取匹配节点m的元素值
                    Object x = m.item;
                    
                   
                    if (isData == (x != null) ||     // 模式是否匹配
                        x == m ||                    // m节点是否被取消
                        !m.casItem(x, e)) {          // CAS更新m节点值item为e失败
                        // 上面三种情况,任意一种发生,都进行h的出队操作,新队头更新为m,然后重新开始自旋
                        advanceHead(h, m); 
                        continue;
                    }
					
                    //匹配成功,设置m新的头节点,将真正的头节点h 出队。让这个真正的头节点成为dummy节点(next指向自身)
                    advanceHead(h, m);       
                   //唤醒在m上等待的线程
                    LockSupport.unpark(m.waiter);
                    
                   //10.如果x != null,说明当前请求是REQUEST类型,返回匹配到的数据x,否则返回当前DATA请求提交的元素e
                    return (x != null) ? (E)x : e;
                }
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155

主要逻辑:

  1. 判断当前节点类型。( e为null 时表示消费(take 操作); e 不为 null 时表示生产(put 操作))

  2. 进入无限循环

  3. 获取头尾节点 head/tail,若没有初始化,则 continue 继续

  4. 如果队列为空或者 尾节点的类型与当前节点类型相同,则进行入队操作。执行下面<子步骤>,否则 执行>>第 5 步骤

    • 获取尾节点的 next 节点赋值tn
    • 如果队列存在并发,则 continue继续。
    • 如果 tn 不为null,说明有并发,尝试将tn设置为新尾节点,然后 continue 继续。
    • 如果超时,则直接返回 null
    • 将元素 e 构建成QNode节点 s
    • 将节点 s 设置为队列尾节点的 next 节点。如果入队成功 执行>>第 4.7 步骤,否则, continue 继续。
    • 将节点 s 设置为队尾tail
    • 调用awaitFulfill方法,自旋或者阻塞直到s节点被匹配
    • 如果操作被取消(因中断或者超时),则清除节点 s,并返回 null
    • 判断节点 s 用于判断节点是否已经出队,如果出队返回元素,方法结束;否则辅助节点 s 出队。
  5. 执行至此,说明队列不为空,且当前节点上次入队节点 类型相同(put 匹配 take ,take 匹配 put),进行出队操作

    • 获取头节点的 next 赋值 m 节点
    • 如果有并发情景,则continue 继续
    • 获取 m 节点的item 赋值x
    • 如果 m 节点已被匹配m 节点被取消 或者CAS 更新 m 节点的 item 为e失败,则CAS更新m为新队头,并continue 继续
    • 匹配成功,则CAS更新m为新队头
    • 唤醒匹配操作的线程
    • 返回元素

TransferQueue-SNode awaitFulfill(SNode s, boolean timed, long nanos)

  • 这个方法将会进行自旋或者阻塞,直到满足某些条件。
        /**
         * TransferQueue-自旋或者阻塞,直到s节点匹配完成
         *
         * 主逻辑: 若节点是 head.next 则进行 spins 一会, 
         * 			若不是, 则调用 LockSupport.park / parkNanos(), 直到其他的线程对其进行唤醒
         * 
         * @param s 等待节点  
         * @param e 用于检查匹配的比较值
         * @param timed true 如果定时等待,则为True
         * @param nanos 超时时间,单位纳秒
         * @return 返回匹配值,如果取消节点,则返回s
         */        
        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
        	//根据timed标识计算截止时间= 如果true ? 当前纳秒+nanos :0  (只有 timed 为true 时才有用)
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            
            // 计算需要自旋的次数 
            // 当前节点是head.next才自旋,说明恰好s正好是第一个入队节点,则会自旋一段时间,避免阻塞,提高效率,因为其他情况是会涉及到 park挂起线程的
            // maxTimedSpins = (NCPUS < 2) ? 0 : 32;
            // maxUntimedSpins = maxTimedSpins * 16;
            int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            	
            //自旋
            for (;;) {
              //1.如果当前线程发生中断,那么尝试取消节点s, 将 item指向自身this, 在 transfer 中会对返回值进行判断
                if (w.isInterrupted())	s.tryCancel(e);
//----------------------------------                
        //item有几种情况呢?
        //当SNode模式为DATA模式时:
        //	1.item != null 且 item != this  表示请求要传递的数据 put(E e)
        //	2.item == this 当前SNode对应的线程 取消状态
        //	3.item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。

        //当SNode模式为REQUEST模式时:
        //	1.item == null 时,正常状态,当前请求仍然未匹配到对应的DATA请求。
        //	2.item == this 当前SNode对应的线程 取消状态
        //	3.item != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。

        //条件成立:
        //当前请求为DATA模式时:e为写入数据
        //item == this 当前SNode对应的线程 取消状态
        //item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。

        //当前请求为REQUEST模式时:e为null
        //item == this 当前SNode对应的线程 取消状态
        //item != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。
//----------------------------------    
                 //2.获取s的元素值
                Object x = s.item;
                 
                //3. x != e,才会退出循环也是唯一的出口,如果进行线程阻塞->唤醒 或中断,x != e 就会成立                
                  //在生成s节点的时候,s.item==e,当取消操作或者匹配了操作的时s会变化。
                if (x != e) return x;
                
  				  //如果设置了超时等待
                if (timed) {
                     // 计算继续等待的时间
                    nanos = deadline - System.nanoTime();
                      // 如果超时,尝试取消该节点,然后重新开始自旋,下一次在 x!=e时退出循环
                    if (nanos <= 0L) {
                        s.tryCancel(e);   //将s的item指向this自身实现取消操作
                        continue;
                    }
                }
                
                //1.自旋控制:每次减少自旋次数,如果自旋次数大于0,则继续自旋
                if (spins > 0) 	--spins;
                //2.自旋次数用完了,s的等待线程为空,设置s的等待线程为当前线程
                else if (s.waiter == null) 	s.waiter = w;// 设置等待线程
                //3.s等待线程不为空,当前是非超时等待操作,则当前一直阻塞当前线程,直到被唤醒
                else if (!timed) 	LockSupport.park(this);
                //4.s等待线程不为空,当前是超时等待操作,且剩余时间小于spinForTimeoutThreshold = 1000L的时候
                //使用剩余时间阻塞当前线程,这样自旋性能的效率更高
                else if (nanos > spinForTimeoutThreshold)	LockSupport.parkNanos(this, nanos);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

这个方法应该比较好理解,就是等待匹配操作的到来

  1. 如果设置了超时等待timed=true,那么就等待一定时间parkNanos(this, nanos)
  2. 如果发生超时,那么就取消该操作tryCancel(e)
  3. 如果没有设置超时等待,那么就一直等待park(this),直到匹配的操作到来,或者发生中断(取消该操作)。

这边总结一下一些注意点:

  • 为了优化阻塞,先判断当前的节点s是不是head.next,如果是会优先选择自旋而不是阻塞,自旋次数到了才阻塞主要是考虑到阻塞、唤醒需要消耗更多的资源。
  • 自旋的过程如何退出,也就是何时满足x!=e的条件呢?
    • 其实在tryCancel()的时候就会导致x!=e,因为该方法会将s的item设置为this。且在线程被中断,超时的时候都会调用tryCancel()方法,这些条件下将会退出。

TransferQueue-void clean(SNode s)

用于移除已经被取消的节点

        /**
         *  对 中断的 或 等待超时的 节点进行清除操作
         * 
         * @param pred pred 是s的前驱节点
         * @param s 要移除节点
         */           
        void clean(QNode pred, QNode s) {
          //将清理节点的waiter置为null
            s.waiter = null;
            /*
             * 无论何时,队列中的最后一个节点不能被删除(这里的删除指 通过 cas 直接删除, 因为这样直接删除会有多删除其他节点的风险)
             * 当节点 s 是最后一个节点时, 将 s.pred 保存为 cleamMe 节点, 下次再进行清除操作
             */
             //pred.next == s 表示节点 s 还在队列中,下面的操作可能导致 pred.next = next
            while (pred.next == s) { 
             	// 获取头节点
                QNode h = head;
                 //获取头节点的后继节点next
                QNode hn = h.next;  
                     
                //1.head.next节点被取消,那么CAS更新新队头为原队头的后继节点next,即hn。
                if (hn != null && hn.isCancelled()) { 
               		// hn中断或者超时,则推进 head 指针,若这时h是pred则 loop条 "pred.next == s" 不满足, 退出循环
                    advanceHead(h, hn);
                    continue;
                }
                
                //获取尾节点,确保对tail的读的一致性
                QNode t = tail;    
                
                //2.队列为空, 说明其他的线程进行操作, 移除了节点(注意这里永远会有个 dummy node),退出
                if (t == h) return;
                
                 //队列不为空, 获取尾节点的后继节点next
                QNode tn = t.next; 
                
                //3.t不为尾节点,两次读不一致,说明存在并发.队尾改变了,重新开始自旋
                if (t != tail)  continue;
                
                //4.尾节点后继节点tn理应为null ,如果不为空,说明存在并发,有其它线程进行了入队操作
                if (tn != null) { 
                	//尾节点后继节点tn为tail,重新开始自旋
                    advanceTail(t, tn);
                    continue;
                }
                    
                 //5.tn为null,如果删除的节点s不是尾节点t,则直接进行CAS删除s,直接将pred.next = s.next(在队列中间删除是没有风险的)
                if (s != t) { 
                    QNode sn = s.next;
                     // 如果s.next ==s ,则已经离开队列
                     // || 设置pred的后继为s的后继,将s从队列中删除
                    if (sn == s || pred.casNext(s, sn)) return; // 删除完毕,退出
                }
                
                //6.如果删除的s节点是尾节点t,那么暂时不能删除,需要使用cleanMe,检查 cleanMe 是否已经被标记了
                //cleanMe标识的是需要删除节点的前驱
                QNode dp = cleanMe;
                 
				//7.如果cleanMe不为空,说明cleanMe已经被标记了,如果标记有效则直接CAS删除
                if (dp != null) {
            /*
             * cleamMe 失效的情况有:
             *   (1)cleanMe的后继为空(cleanMe 标记的是需要删除节点的前驱)
             *   (2)cleanMe的后继等于自身(这个前面有分析过)
             *   (3)需要删除节点的操作没有被取消
             *   (4)被删除的节点不是尾节点且其后继节点有效,并将待删除节点删除
             */       
                    QNode d = dp.next; // d这里指的就是 要删除的节点
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       // d unspliced
                        // 清除 cleanMe 节点
                        casCleanMe(dp, null);
                      
                      //如dp == pred 若成立, 说明清除节点s成功,直接退出, 否则再次循环, 执行到casCleanMe(null, pred), 设置这次的 cleanMe 然后再返回  
                    if (dp == pred)   return; 
                } 
                //8.原来的 cleanMe 是 null, 则将被删除节点的前驱 pred 标记为 cleamMe 为为下次循环时清除s节点做标识
				else if (casCleanMe(null, pred))   return; 
                  
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

链表的删除只需要设置节点指针的关系就可以了,这个 clean 的方法就在链表基础上增加了一个逻辑:

  1. 如果删除的节点s不是尾节点t,那么可以直接进行删除,将pred.next = s.next
  2. 如果删除的节点S是尾节点t,那么用cleanMe 保存需要删除的节点的前驱s.pred 这样在下一轮的 clean的过程将会清除打了标记的节点

整体逻辑:

  1. pred.next == s 表示节点 s 还在队列中
  2. 从队头开始,如果节点被取消,那么新队头为原队头的后继节点next,即hn,continue重新开始
  3. 如果队列发生变化,则continue重新开始 或 CAS更新新队尾tail
  4. 如果删除的节点s不是尾节点,即s!=t 那么进行 cas 删除操作,将pred.next = s.next
  5. 删除的节点是尾节点,那么需要先检查 cleanMe 是否已经被标记
  6. 如果 cleanMe 已经被标记了,那么检查标记是否还有效
    cleamMe 失效的情况有:
    • cleanMe 的后继为空(cleanMe 标记的是需要删除节点的前驱)
    • cleanMe 的后继等于自身(这个前面有分析过)
    • 需要删除节点的操作没有被取消
    • 被删除的节点不是尾节点且其后继节点有效。
  7. 如果cleanMe 没有被标记,那么就标记为被删除节点的前驱 pred

TransferQueue-其他重要方法

		//-----------------TransferQueue其他方法 -start--------------------

 		//推进head节点,新head为oldHead.next
		//CAS更新队头节点,蕴含操作:老的头节点出队,即:将 老节点的 oldNode.next = this
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; //后继节点指向自身
        }
	
 		//CAS更新队尾节点(t 老的队尾,新的队尾)
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }

		//CAS更新cleanMe为val
        boolean casCleanMe(QNode cmp, QNode val) {
            return cleanMe == cmp &&
                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
        }
		//-----------------TransferQueue其他方法 -end--------------------
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

TransferQueue-图解公平模型

公平模式下,底层实现使用队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点

  1. 初始化时的 TransferQueue
    在这里插入图片描述

  2. 线程put1 执行 put(1)操作 ,由于当前没有配对的消费线程,所以 put1线程入队,自旋一小会后睡眠等待

在这里插入图片描述

  1. 接着,线程put2 执行 put(2)操作put2线程入队,自旋一小会后睡眠等待
    在这里插入图片描述

  2. 这时来了一个线程 take1,执行了 take操作 ,由于tail 指向 put2 线程put2线程跟 take1 线程匹配,这时take1 线程不需要入队

    • 注意了!! 这时要唤醒的线程并不是 put2,而是put1.
      • 因为现在是公平策略谁先入队,谁优先被唤醒,这里显然 put1 应优先被唤醒.
      • 公平策略总结一句话就是:>>>>>>>队尾匹配,队头出队
    1. put1 线程被唤醒后take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信
      在这里插入图片描述
  3. 最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且2个线程匹配上了,线程put2被唤醒,take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点

在这里插入图片描述

TransferQueue总结

transfer就是在一个循环中,不断地去做下面这些事情:

  1. 当调用transfer()方法时,如果队列为空或队尾节点的类型和当前线程类型相同【t.isData== isData】,将当前线程加入队列自旋的方式等待匹配。直到被匹配或超时,或中断或取消。
  2. 如果队列不为空且队中存在可以匹配当前线程的节点,将匹配的线程出队,重新设置队头,返回数据。

注意:无论是上面哪种情况,都会不断检测是否有其他线程在进行操作,如果有的话,会帮助其他线程执行入队出队操作。
在这里插入图片描述

八静态内部类-TransferStack(非公平模式)

  • 底层数据结构是一个有一个头节点(栈顶),对应实现类为:TransferStack

队列与栈都是通过链表来实现的。具体的数据结构如下
在这里插入图片描述

TransferStack-公共成员

		//-----------------TransferStack成员属性start--------------------
		//表示栈顶节点,因为是栈结构,所以只需要用一个指针来保存栈顶
        volatile SNode head;

        // 未配对的消费者,即表示Node类型为 请求类型(消费者) 
        static final int REQUEST    = 0;
        
        //未配对的生产者,即 表示Node类型为 数据类型(生产者)
        static final int DATA       = 1;
        
        // 配对成功的消费者/生产者 ,示该操作节点处于真正匹配状态
        /** 表示Node类型为 匹配中类型,也就是说配对需要take配put 或 put配take
		 * 假设栈顶元素为 REQUEST-NODE,当前请求类型为DATA的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
		 * 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
		 */
        static final int FULFILLING = 2;

        // Unsafe更新成员属性方法
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;

		//-------------省略UNSAFE相关获取属性偏移量代码----------------------
		//-----------------TransferStack成员属性end--------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • TransferStack一共定义了3种节点类型,任何线程对TransferStack的操作都会创建下述3种类型的某种节点:
    1. REQUEST:表示未匹配的消费者(当线程进行出队操作时,会创建一个mode值为REQUEST的SNode节点 )
    2. DATA:表示未匹配的生产者(当线程进行入队操作时,会创建一个mode值为DATA的SNode节点 )
    3. FULFILLING表示匹配另一个生产者或消费者
    4. 同时还包含一个 head 域,表示栈顶。

TransferStack-SNode(栈元素)

  • SNode表示栈元素:put时往原栈顶中放数据。take 时将栈顶弹出取数据,两者操作都是`在栈顶上操作数据.
//-----------------TransferStack内部类-SNode-start--------------------
        static final class SNode {
            //--------成员属性start---------
            //栈顶的下一个节点
            volatile SNode next;     
            
   			//与当前node匹配的节点
    	    //	1.null:还没有任何匹配   
    	    //  2.等于自己:表示当前为取消状态  
    	    //  3.等于别的Snode:当前为匹配状态
            volatile SNode match;   
            
            //阻塞的线程    
            //	假设当前node对应的线程 自旋期间未被匹配成功,那么node对应的线程需要挂起,挂起前 waiter 保存对应的线程引用,
   			//	方便 匹配成功后,被唤醒。
            volatile Thread waiter; 
             
             //节点数据,1.data不为空 表示当前Node对应的请求类型为 DATA类型。 2.反之则表示Node为 REQUEST类型。
            Object item;   


         	 /**
             * 当前节点类型。有4种可能。
             *  1) REQUEST 0000
             *  2) DATA    0001
             *  3) REQUEST(0000)| FULFILLING(0010) =  0010 表示消费者匹配到了生产者。REQUEST为此次操作,即出队
             *  4)DATA(0001)| FULFILLING(0010) =  0011    表示生产者匹配到了消费者。DATA为此次操作,即入队
             * 		 后2种是在成功匹配后,将节点入队时设置的节点类型。通过:FULFILLING|mode 计算节点类型
             */
            int mode;
            
            // Unsafe获取SNode成员属性

		//-------------省略UNSAFE相关获取属性偏移量代码----------------------
            //--------成员属性end---------

            //--------有参构造方法start--------
            SNode(Object item) {
                this.item = item;//初始化节点存储数据
            }
            //--------有参构造方法end---------


			//--------CAS更新成员属性方法start-------
   		   /**
   			* 尝试匹配:调用tryMatch的对象是 栈顶节点的下一个节点,即与栈顶匹配的节点。
   			* 两个作用:
   			* 1.唤醒被阻塞的栈头m,2.把当前节点s赋值给 m 的 match 属性
   			* 	这样栈头 m 被唤醒时,就能从 m.match 中得到本次操作 s,其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
   			* @return ture true匹配成功,唤醒s 否则匹配失败..
   			*/
            boolean tryMatch(SNode s) {
                 //条件一:match == null 成立,说明当前Node尚未与任何节点发生过匹配...
                 //条件二 成立:使用CAS方式更新match字段为节点s,表示当前Node已经被匹配了
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    //------match更新成功----             
                    //获取本节点的等待线程: 当前节点s如果自旋结束,那么会使用LockSupport.park()挂起,挂起之前会将Node对应的Thread 保留到 waiter字段。
                    Thread w = waiter;
                    
                     //w不为空,说明节点s对应的Thread已经挂起,则通过unpark(w)唤醒线程。
                    if (w != null) {
                   		 // 将本节点的等待线程重新置为null
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                 //如果match不为空,且指向的是当前Node本身,则说明当前节点已经被匹配或者被取消,匹配成功,反之 则非取消状态。
                return match == s;
            }
			//--------CAS更新成员属性方法end-------
			
			//CAS更新next为val
            boolean casNext(QNode cmp, QNode val) {
                    //优化:cmp == next  为什么要判断?
        			//因为cas指令 在平台执行时,同一时刻只能有一个cas指令被执行。
       				 //有了java层面的这一次判断,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。 
                return next == cmp && //next等于cmp  && this+nextOffset=cmp,更新为val
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            //CAS更新match为this,即将match指向自身,表示当前节点被取消
            void tryCancel() {
                 //如果match等于null  && this, matchOffset=null,更新为this
                 //match字段 指向当前Node对象本身,表示这个Node是取消状态,取消状态的Node,最终会被强制移除出栈。
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }
            
			 //如果match指向的是当前Node本身,那表示当前Node是取消状态,反之 则 非取消状态。
            boolean isCancelled() {
                return match == this;
            }
        }
		//-----------------TransferStack内部类-SNode-end--------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

TransferStack(入栈出栈)-E transfer(E e, boolean timed, long nanos)

  • transfer() 方法在匹配成功时返回非null值,匹配失败时返回null值
    /**
     *TransferStack非公平模式-压栈或弹栈方法
     *
     * @param e  null时,则表示消费(take 操作);不为null时,则表示生产(put 操作)
     * @param timed true 如果定时等待,则为True
     * @param nanos 超时时间,单位纳秒
     * @return 匹配成功时返回非null值,匹配失败时返回null值
     */          
     E transfer(E e, boolean timed, long nanos) {
          	// s表示新创建的节点
            SNode s = null;
    		//根据e确定此次转移的类型, 入参e==null, 说明当前是take线程(消费者), 否则是put线程(生产者)
            int mode = (e == null) ? REQUEST : DATA;

			//--------自旋1
            for (;;) {
            	 //获取栈顶节点 
            	 // 头节点情况分类
       			 // 1:为空,说明栈中还没有数据
        		 //  2:非空 且 mode=REQUEST,说明头节点线程正等着拿数据
        		 // 3:非空 且  mode=DATA,  说明头节点线程正等着写入数据
                SNode h = head;
                
                //------1.栈为空或当前压栈节点 与 栈头节点类型相同,即相同的操作taketake/putput,将节点压入栈-------
                //比如都是 put,就把本次 put 放到该栈头的前面即可,让本次put能够先执行,take同理
                if (h == null || h.mode == mode) { 
                    //1.如果是超时等待操作,但不支持超时,则直接返回NULL
   					//条件成立:说明当前调用transfer方法的 上层方法 可能是 offer() 无参的这种方法进来的,这种方法不支持 阻塞等待...
                    if (timed && nanos <= 0) { 
                         //栈顶节点被取消,协助栈顶出栈后,再一次自旋尝试。
                        if (h != null && h.isCancelled())
                          	// 丢弃栈头,CAS更新栈头的后一个节点作为栈头(弹出之前的头节点)
                            casHead(h, h.next);
                        else
                          // 栈顶为null或者栈顶节点没有被取消,直接返回null
                            return null;
                    }
                    
                    //1.2.未超时情况,生成SNode节点,CAS尝试把 e 作为新的栈头
                    //节点s压入栈: snode():生成一个SNode节点;将原来的头节点h设置为该节点s的next节点;casHead():将head头节点设置为该节点
           			 //snode(SNode s, Object e, SNode next, int mode) 
  					 else if (casHead(h, s = snode(s, e, h, mode))) { 
  					 	//---------------入栈成功,等待被匹配---------
  					    //1.2.1自旋或者阻塞直到s节点被匹配
  					    //1.正常情况:返回匹配的节点
               			//2.取消情况:返回当前节点  s节点进去,返回s节点...
                        SNode m = awaitFulfill(s, timed, nanos);
                        
                        //1.2.2.如果匹配对象指向自身, 表示节点s被取消(中断或超时),返回null
                        if (m == s) {
                           //条件成立:说明当前Node状态是 取消状态...
                            clean(s); //将取消状态的节点 出栈...
                            return null;  //取消状态 最终返回null
                        }
       
                       //---------执行到这里 说明当前Node已经被匹配了...
                        //1.2.3.h重新赋值为head头节点,并且不为null;头节点的next域为s节点,表示有节点插入到s节点之前,完成了匹配
                       //说明本来s是栈头的,现在s不是栈头了,表示存在并发,有节点压栈到s节点上面,把新的节点更新为栈头next
                        if ((h = head) != null && h.next == s)  casHead(h, s.next);//移除插入在s之前的节点和s节点

                        //1.2.4.根据此次转移的类型返回元素, 如果是take线程,返回匹配节点m.item, 否则返回s.item,即当前请求提交的元素e
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                   // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
			  }
			  

	          //2.---------栈不为空 且当前节点与栈顶节点类型不同,即不同操作(put-take或take-put),进行出栈操作--------
	             // 栈头正在等待其他线程 put 或 take
        	    // 比如栈头正在阻塞,并且是 put 类型,而此次操作正好是 take 类型,走此处
			  else if (!isFulfilling(h.mode)) { 
			     	 //2.1.条件成立:栈头已经被取消,把下一个元素作为栈头(因中断或超时)
                    if (h.isCancelled())    casHead(h, h.next);// 弹出头节点
                    
                    //2.2.将当前节点s压入栈中:将当前节点mode标记为 FULFILLING|mode(表示h正在匹配), 并设置s为栈顶head
                    // snode方法第3个参数 h 代表栈头,赋值给 s 的 next 属性
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        // 条件成立:说明当前节点压栈成功,入栈一个 FULFILLING|mode的NODE
                    	//----------自旋,fulfill 节点 和 fulfill.next 节点进行匹配工作
                        for (;;) { 
                        	//2.2.1.获取s节点的后继节点(m就是栈头,通过上面 snode方法刚刚赋值)。
                            //	当前节点s压栈后,s.next指向原栈顶节点(也就是与当前节点匹配的节点)
                   			// 为什么这里是从栈顶head.next下一个节点开始?因为TransferStack总是会存在一个dummy node节点
                            SNode m = s.next; 
                            
                           	//2.2.2.如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了
                    		// 就清空栈并跳出内部循环,到外部循环再重新入栈判断
                            if (m == null) {        // 如果发现没有匹配对象
                                casHead(s, null);  // 清空整个栈
                                s = null;          // 清空s因为旧s的成员不对
                                break;             //重启主循环,下一次循环将作为普通节点再入队
                            }
                            
                            //-----------FULFILLING匹配节点不为null,进行真正的匹配工作-------
                            //2.2.3.获取待匹配节点的下一个节点,匹配成功,新头节点更新为mn
                            //链表结构s -> m -> mn
                            SNode mn = m.next;
                            
                            //2.2.4.尝试匹配,匹配成功,则将栈头 和 m 一起出栈
                            // tryMatch两个作用:
                    		// 1.唤醒被阻塞的栈头m,2.把当前节点s赋值给 m 的 match 属性
                     		//    这样栈头m 被唤醒时,就能从 m.match 中得到本次操作s
                     		//    其中 s.item 记录着本次的操作节点,也就是记录本次操作的数据
                            if (m.tryMatch(s)) {
             					//条件成立,结对出栈: 将匹配的两个节点s、m全部弹出栈,更新头节点为mn
                                casHead(s, mn); //弹出s节点和m节点
                                
                                //根据此次转移的类型返回元素, 如果是take线程,返回匹配m.item, 否则返回s.item
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            }
 							else  
 							   //2.2.5.尝试匹配失败,说明m已经先一步被其它线程匹配了
                       		  // ,移除原待匹配节点m,更新新待匹配节点为mh,进入下一个自旋,再一次进行尝试                
                                s.casNext(m, mn); //弹出m节点
                        }
                    }
                }
                
                 //3.------栈顶类型为 FULFILLING类型(m & FULFILLING) != 0,表示栈顶和栈顶下面的栈帧正在发生匹配,当前请求需要做 协助 工作。 
                  /// 走到这,表示有其他线程在进行匹配,处于正在匹配中状态,则帮助进行匹配,接着执行出栈操作
				else {   
				  //3.1.h 表示的是 FULFILLING节点,m FULFILLING匹配的节点。   
				  //栈顶的后继节点                 
                    SNode m = h.next;
                    
					//m == null 什么时候可能成立呢?
            		//3.2.当s.next节点 超时或者被外部线程中断唤醒后,会执行 clean 操作 将 自己清理出栈,此时
            		//站在匹配者线程 来看,真有可能拿到一个null。则直接弹出栈顶, 重新进入下一次自旋           
                    if (m == null)    casHead(h, null);  //弹出h节点
                    
                    //3.3.大部分情况:走else分支。 栈顶的next不为空 ,尝试和其它线程竞争匹配  
                    else {
                    	//成功,  获取栈顶匹配节点的 下一个节点mn 
                        SNode mn = m.next;
                        
                        if (m.tryMatch(h))
                           //3.3.1.条件成立:说明 m 和 栈顶 匹配成功,CAS更新head域(弹出h和m节点)
                           casHead(h, mn);  //弹出h和m节点
                        else 
                         	//3.3.2.尝试匹配失败,说明m已经先一步被其它线程匹配了
                       	 	// ,移除原待匹配节点m,更新新待匹配节点为mn,进入下一个自旋,再一次进行尝试              
                            h.casNext(m, mn);  //移除m节点
                    }
                }
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146

整个transfer方法考虑了限时等待的情况,且入队/出队其实都是调用了同一个方法,其主干逻辑就是在一个自旋中完成以下3种情况之一的操作,直到成功,或者被中断或超时取消

大体逻辑如下

  1. 如果栈为空 且 当前节点类型 与 栈顶节点类型相同。则尝试将节点加入栈内,同时通过阻塞**(或自旋一段时间,如果有超时设置,则进行超时等待)**等待节点匹配,最后返回匹配的节点或者本身(被取消)
  2. 如果栈为不空 且 当前节点类型 与 栈顶节点类型不相同,则尝试将该节点打上FULFILLING 标记,然后加入栈中,与相应的节点匹配,成功后将这2个节点弹出栈并返回匹配节点的数据
  3. 如果有节点在匹配,那么帮助这个节点完成匹配和出栈操作,然后在主循环中继续执行

TransferStack-SNode awaitFulfill(SNode s, boolean timed, long nanos)

        /**
         * TransferStack-自旋或者阻塞,直到s节点匹配完成
         * 
         * @param s 当前请求Node
         * @param timed true 如果支撑定时等待,则为True
         * @param nanos 超时时间,单位纳秒
         * @return 返回匹配值,如果取消节点,则返回s
         */  
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        	//根据timed标识计算截止时间= 如果true ? 当前纳秒+nanos :0
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            
           // 计算需要自旋的次数
           // 当前节点=head.next,说明恰好 s 正好是第一个加入的节点,则会自旋一段时间,避免阻塞,因为其他情况是会导致线程被挂起park()
           // maxTimedSpins = (NCPUS < 2) ? 0 : 32;
           // maxUntimedSpins = maxTimedSpins * 16;
            int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);
            
            //---------自旋检查逻辑:1.是否匹配  2.是否超时  3.是否被中断..
			for (;;) {
			    //1.如果当前线程发生中断,那么尝试取消该节点
            	//	Node对象的 match 指向 当前Node 说明该Node状态就是 取消状态。 在 transfer 中会对返回值SNode进行判断
                if (w.isInterrupted()) s.tryCancel();


         		//2.s.match保存当前节点的匹配节点
         		//	0.s.match==null说明还没有匹配节点
        		//	1.正常情况:有一个请求 与 当前Node 匹配成功,这个时候 s.match 指向 匹配节点。
       			//	2.取消情况:当前match 指向 当前Node...,说明当前节点s对应的线程被中断、超时了       
                SNode m = s.match;
                
                //3.如果 m 不为空,则说明节点s已被匹配,直接返回 m
                if (m != null) return m;
                
                 //4.如果匹配不成功,如果设置了超时等待
                if (timed) {
                    //计算表示距离超时 还有多少纳秒..
                    nanos = deadline - System.nanoTime();
                     // 如果超时,尝试取消该节点,然后重新开始自旋,下一次在 x!=e时退出循环
                    if (nanos <= 0L) {
                        //Node对象的 match 指向 当前Node 说明该Node状态就是 取消状态
                       //设置当前Node状态为 取消状态.. match-->当前Node
                        s.tryCancel();
                        continue;
                    }
                }
                
                //4.说明当前线程还可以进行自旋检查...减少自旋次数,如果自旋次数大于0,则继续自旋
                if (spins > 0)	 spins = shouldSpin(s) ? (spins-1) : 0;
                //5.自旋次数用完了,s的等待线程为空,设置s的等待线程为当前线程
                //当一个 节点/线程将要阻塞时,它会先设置其waiter段,然后在下次自旋判断waiter不为null时进行阻塞
                else if (s.waiter == null) s.waiter = w; 
               //6.s等待线程不为空,当前请求是非超时等待操作,则当前一直阻塞当前线程,直到被唤醒
                else if (!timed) LockSupport.park(this);
                //nanos > 1000 纳秒的值,只有这种情况下,才允许挂起当前线程..
                //7. s等待线程不为空,当前是超时等待操作,且剩余时间> 1000 纳秒的时,才允许挂起当前线程..
               //否则 说明超时给的太少了...挂起和唤醒的成本 远大于 空转自旋..
                //		使用剩余时间阻塞当前线程,这样自旋性能的效率更高
                else if (nanos > spinForTimeoutThreshold)  LockSupport.parkNanos(this, nanos);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

这个方法应该比较好理解,就是等待匹配操作的到来,整体逻辑如下:

  • 获取超时截止时间当前线程自旋次数
  • 进入无限循环
    • 判断当前线程是否被中断。如果是将节点 s 取消,即将节点 s 的 match指向自身this,否则进入下一步
    • 获取节点 s 的match 值 m。如果 m 不为空,则说明节点 s 已被匹配,直接返回 m;否则,进行下一步
    • 如果设置了超时时间timed则判断是否超时。如果超时则将节点s取消 s.tryCancel,continue继续循环。
    • 如果没有设置超时,则自旋一段时间,最后将当前线程阻塞park(this) ,直到匹配的操作到来,或者发生中断(取消该操作)。

TransferStack-void clean(SNode s)

  • 移除从栈顶节点开始到该节点(不包括 s)之间的所有已取消节点
        void clean(SNode s) {
         	//1.清空数据域
         	// s节点的item设置为null
            s.item = null;  
            // s节点的waiter设置为null
            s.waiter = null;
            
            //检查取消节点的截止位置
            SNode past = s.next;//被删除节点的后继节点
             
            //如果x后继节点 past 不为 null 且已被取消,则先删除该节点节点
            if (past != null && past.isCancelled())  
            	//重新设置past
                past = past.next;

            SNode p;
            //从栈顶头节点开始到past节点(不包括),将连续的取消节点移除
            while ((p = head) != null && p != past && p.isCancelled()) //头节点赋值给p,且p不为空,且p不等于s的后继节点,且p被取消
                casHead(p, p.next);//更新头节点为删除节点的后继节点next,以便下一次判断(弹出取消的节点)

             //因为是单向链表,因此需要从head 开始,遍历到被删除节点s的后继节点next,如有被取消了的操作的节点,那么就移除掉。
            while (p != null && p != past) {// 移除上一步骤没有移除的非连续的取消节点
                // 获取p的next节点
                SNode n = p.next;
                 // n不为null并且n被取消
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);//CAS更新p的next为n.next
                else
                    p = n;
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

主要功能:从栈顶删除节点 s。大体逻辑如下:

  • 将待删除节点 s 的 item 、waiter 域置为 null
  • 获取待删除节点 s 的 next赋值给 节点 past
  • 如果节点 past 不为 null 且节点past 已被取消,则先删除其后继节点past
  • 如果head 操作节点也被取消,那么就重新更新头节点为删除节点的后继节点next,以便下一次判断
  • 因为是单链表,因此需要从 head开始 遍历到被删除节点 s 的后继,如有被取消了的操作的节点,那么就移除掉。

TransferStack-其他重要方法


		//-----------------TransferStack其他方法-start--------------------
		/**
		* 构造一个SNode
		* @param s  SNode引用,当这个引用指向空时,snode方法会创建一个SNode对象 并且赋值给这个引用
		* @param e SNode对象的item字段
		* @param next 指向当前栈帧的下一个栈帧
		* @param mode REQUEST/DATA/FULFILLING
		*/
        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }
        
		//CAS更新栈顶元素,同样使用到了优化
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }

        //判断当前模式是否为 匹配中状态
        static boolean isFulfilling(int m) { 
          return (m & FULFILLING) != 0; 
       }

         //判断节点s是否需要自旋
        boolean shouldSpin(SNode s) {
        	//获取栈顶
            SNode h = head;
           //条件一 h == s :条件成立 说明当前s 就是栈顶,允许自旋检查...
    		//条件二 h == null : 当前s节点 自旋检查期间,又来了一个 与当前s 节点匹配的请求,双双出栈了...条件会成立。
    		//条件三 isFulfilling(h.mode) : 前提 当前 s 不是 栈顶元素。并且当前栈顶正在匹配中,这种状态 栈顶下面的元素,都允许自旋检查。
            return (h == s || h == null || isFulfilling(h.mode));
        }

      //-----------------TransferStack其他方法-end-------------------- 
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

TransferStack-图解非公平模型

  1. 线程put1执行 put(1)操作 ,由于当前无配对的消费线程,所以put1线程入栈自旋一小会后睡眠等待
    在这里插入图片描述

  2. 接着,线程put2再次执行了put(2)操作 ,put2线程入栈自旋一小会后睡眠等待
    在这里插入图片描述

  3. 如果这时来了一个线程take1,执行take(1)操作 ,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程
    在这里插入图片描述

  4. 最后,再来一个线程take2,执行take(2)操作,这跟上一步的逻辑基本一致,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈空
    在这里插入图片描述

从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平策略.

TransferStack总结

transfer方法其实就是在一个循环中持续地去做下面3件事情:

  1. 当调用transfer时,如果栈是的,或者当前线程类型和head节点类型相同,则将当前线程加入栈中,通过自旋的方式等待匹配。最后返回匹配的节点,如果被取消,则返回null
  2. 如果栈不为空,且有节点可以和当前线程进行匹配【put与take表示匹配,mode不相等】,CAS加上FULFILLING标记,将当前线程压入栈顶,和栈中的节点进行匹配,匹配成功,出栈这两个节点
  3. 如果栈顶是正在进行匹配的节点isFulfilling(h.mode),则帮助它进行匹配并出栈,再执行后续操作。

在这里插入图片描述

九.写入元素

可以看到,SynchronousQueue一样不支持入队null元素,实际的入队/出队操作都是委托给了transfer方法该方法返回null表示出/入队失败(通常是线程被中断或超时):

阻塞式入队/入栈-void put(E e)

  • put()用于 入队指定元素, 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
    • 即:生产者传递给消费者的元素
    public void put(E e) throws InterruptedException {
    	//不允许写入为null的元素
        if (e == null) throw new NullPointerException();

		//transfer()返回 null 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常
        if (transferer.transfer(e, false, 0) == null) {// 进行转移操作
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

非阻塞入队/入栈-boolean offer(E e)

  • offer(E e)功能同 put(E e)
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

  • 1
  • 2
  • 3
  • 4
  • 5

阻塞式超时入队/入栈-boolean offer(E,long,TimeUnit)

  • 在put()阻塞式插入方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回false
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
         //元素e为空,抛异常
        if (e == null) throw new NullPointerException();
        //进行转移操作
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
            return true;
        // 当前线程没有被中断
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

八.获取元素

阻塞式出队/出栈-E take()

  • take()用于 出队一个元素, 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
    • 即: 消费者等待生产者提供元素
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        //返回 null 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常    
        Thread.interrupted();
        throw new InterruptedException();
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

阻塞式出队/出栈-E poll()

  • 获取并移除此队列的头,以便另一个线程插入它
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
  • 1
  • 2
  • 3

阻塞式超时出队/出栈-E poll(timeout, unit)

  • 获取并移除此队列的头,如有必要则等待指定的时间,以便另一个线程插入它
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = transferer.transfer(null, true, unit.toNanos(timeout));
       
        if (e != null || !Thread.interrupted())
            return e;
         //返回 null 或没有被中断 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常    
        throw new InterruptedException();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

九.总结

  • SynchronousQueue不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程的阻塞队列,每个put的操作必须等待另一个线程进行相应的take操作,反之亦然
    • 该类两种不同的策略实现,TransferQueue实现公平模式、TransferStack实现非公平模式。

SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue性能测试

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/137517
推荐阅读
相关标签
  

闽ICP备14008679号