当前位置:   article > 正文

AQS-Exchanger源码学习

exchanger java 源码

上文:AQS-semaphore&CyclicBarrier&CountDownLatch源码学习

源码下载:https://gitee.com/hong99/jdk8


Exchanger是什么?

3231961cb1b42e4d7f5124a9277aaa36.png

    exchanger是一个极少使用到的交换类,主要用于线程阻塞或者因为阻塞引起但任务又急于执行,这里候就可以进行交换。但是有一个非常的复杂点就是两个并发任务执行过程中交换数据,这一点是非常厉害的,可以看下下面的一些基础实现。

基础功能的学习

  1. package com.aqs;
  2. import java.util.concurrent.Exchanger;
  3. /**
  4.  * @author: csh
  5.  * @Date: 2022/12/17 10:43
  6.  * @Description:Exchanger 学习
  7.  */
  8. public class ExchangerStudy {
  9.     public static void main(String[] args) {
  10.         Exchanger<Integer> exchanger = new Exchanger<Integer>();
  11.         for (int i = 0; i < 10; i++) {
  12.             final int index=i;
  13.             new Thread(new Runnable() {
  14.                 @Override
  15.                 public void run() {
  16.                     System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"下标为:"+index);
  17.                     try {
  18.                         int newIndex = exchanger.exchange(index);
  19.                         //等待100毫秒
  20.                         Thread.sleep(100);
  21.                         System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"原来下标为:"+index+"交换后下标为:"+newIndex);
  22.                     } catch (InterruptedException e) {
  23.                         e.printStackTrace();
  24.                     }
  25.                 }
  26.             }).start();
  27.         }
  28.     }
  29. }

结果

  1. 当前线程:Thread_Thread-2下标为:2
  2. 当前线程:Thread_Thread-1下标为:1
  3. 当前线程:Thread_Thread-4下标为:4
  4. 当前线程:Thread_Thread-3下标为:3
  5. 当前线程:Thread_Thread-5下标为:5
  6. 当前线程:Thread_Thread-6下标为:6
  7. 当前线程:Thread_Thread-0下标为:0
  8. 当前线程:Thread_Thread-7下标为:7
  9. 当前线程:Thread_Thread-8下标为:8
  10. 当前线程:Thread_Thread-9下标为:9
  11. 当前线程:Thread_Thread-4原来下标为:4交换后下标为:3
  12. 当前线程:Thread_Thread-6原来下标为:6交换后下标为:5
  13. 当前线程:Thread_Thread-0原来下标为:0交换后下标为:7
  14. 当前线程:Thread_Thread-9原来下标为:9交换后下标为:8
  15. 当前线程:Thread_Thread-8原来下标为:8交换后下标为:9
  16. 当前线程:Thread_Thread-7原来下标为:7交换后下标为:0
  17. 当前线程:Thread_Thread-5原来下标为:5交换后下标为:6
  18. 当前线程:Thread_Thread-2原来下标为:2交换后下标为:1
  19. 当前线程:Thread_Thread-1原来下标为:1交换后下标为:2
  20. 当前线程:Thread_Thread-3原来下标为:3交换后下标为:4

    可以看到两个线程可以交换执行的下标。是比较厉害。接下来我们看看底层的源码实现。

源码学习

c507f071a1ccf3bbbb8f672bc9afa085.png

java.util.concurrent.Exchanger  源码实现时

  1. //交换机
  2. public class Exchanger<V> {
  3.     //避免伪共享 左移数据下标(获取内存偏移量)
  4.     private static final int ASHIFT = 7;
  5.     //节点最大的数组下标
  6.     private static final int MMASK = 0xff;
  7.     //用于递增,每次加一个seq
  8.     private static final int SEQ = MMASK + 1;
  9.     //获取cpu的核数
  10.     private static final int NCPU = Runtime.getRuntime().availableProcessors();
  11.     //实际组数长度(线程数)
  12.     static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
  13.     //自旋次数 当cpu为单核时,该参为禁用
  14.     private static final int SPINS = 1 << 10;
  15.     //用于提供给其他线程的交换对象
  16.     private static final Object NULL_ITEM = new Object();
  17.     //用于超时的传递对象
  18.     private static final Object TIMED_OUT = new Object();
  19.     //交换节点 保存交换的数据
  20.     @sun.misc.Contended static final class Node {
  21.         int index; // 多槽数据索引
  22.         int bound; // 上一次的边界
  23.         int collides; // 记录边界范围内cas失败次数
  24.         int hash; // 代表hash值 用于自旋优化
  25.         Object item; // 节点带的数据
  26.         volatile Object match; // 未来 配对成功 交换的数据
  27.         volatile Thread parked; // 匹配的线程
  28.     }
  29.     //本地线程类实现初始化值
  30.     static final class Participant extends ThreadLocal<Node> {
  31.         public Node initialValue() { return new Node(); }
  32.     }
  33.     // 存放node节点 保障线程安全
  34.     private final Participant participant;
  35.    //多槽数组
  36.     private volatile Node[] arena;
  37.     /**
  38.      * 交换的槽位
  39.      */
  40.     private volatile Node slot;
  41.     //上次记录
  42.     private volatile int bound;
  43.     //多槽位的交换实现(带过期时间)
  44.     private final Object arenaExchange(Object item, boolean timed, long ns) {
  45.         //获取多槽位数组
  46.         Node[] a = arena;
  47.         //获取当前线程node对象
  48.         Node p = participant.get();
  49.         for (int i = p.index;;) { // access slot at i
  50.             int b, m, c; long j; // 初始化相关的交换变量
  51.             //获取交换节点信息(先获取偏移地址)
  52.             Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
  53.             //如果不为空(证明已经有线程) 进行交换
  54.             if (q != null && U.compareAndSwapObject(a, j, q, null)) {
  55.                 //获取交换q的内容
  56.                 Object v = q.item; // release
  57.                 //将当前线程的内容赋值给q的match
  58.                 q.match = item;
  59.                 //获取被交换线程
  60.                 Thread w = q.parked;
  61.                 //不为空进行唤醒
  62.                 if (w != null)
  63.                     U.unpark(w);
  64.                 //这个是交换后的值
  65.                 return v;
  66.             }
  67.             //槽位还没被占的场景
  68.             else if (i <= (m = (b = bound) & MMASK) && q == null) {
  69.                 //交换对象值
  70.                 p.item = item; // offer
  71.                 //cas交换
  72.                 if (U.compareAndSwapObject(a, j, null, p)) {
  73.                     //计算超时时间
  74.                     long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
  75.                     Thread t = Thread.currentThread(); // wait
  76.                     for (int h = p.hash, spins = SPINS;;) {
  77.                         //用来携带交换线程的数
  78.                         Object v = p.match;
  79.                         //已被交换 则清标识
  80.                         if (v != null) {
  81.                             U.putOrderedObject(p, MATCH, null);
  82.                             p.item = null; // clear for next use
  83.                             p.hash = h;
  84.                             //交换成功
  85.                             return v;
  86.                         }
  87.                         //判断自旋是否大于0
  88.                         else if (spins > 0) {
  89.                             //太复杂了这里 反正就是各各自旋次数获取 然后释放线程资源
  90.                             h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
  91.                             if (h == 0) // initialize hash
  92.                                 h = SPINS | (int)t.getId();
  93.                             else if (h < 0 && // approx 50% true
  94.                                      (--spins & ((SPINS >>> 1) - 1)) == 0)
  95.                                 Thread.yield(); // two yields per wait
  96.                         }
  97.                         //已有交换线程 准备数据中
  98.                         else if (U.getObjectVolatile(a, j) != p)
  99.                             spins = SPINS; // releaser hasn't set match yet
  100.                         // //线程不挂起 不是 多槽 时间没结束
  101.                         else if (!t.isInterrupted() && m == 0 &&
  102.                                  (!timed ||
  103.                                   (ns = end - System.nanoTime()) > 0L)) {
  104.                             U.putObject(t, BLOCKER, this); // emulate LockSupport
  105.                             p.parked = t; // minimize window
  106.                             //注意这里 park则挂起线程
  107.                             if (U.getObjectVolatile(a, j) == p)
  108.                                 U.park(false, ns);
  109.                             p.parked = null;
  110.                             U.putObject(t, BLOCKER, null);
  111.                         }
  112.                         //换槽位(原因可能一直没有线程) 逻辑有点复杂
  113.                         else if (U.getObjectVolatile(a, j) == p &&
  114.                                  U.compareAndSwapObject(a, j, p, null)) {
  115.                             if (m != 0) // try to shrink
  116.                                 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
  117.                             p.item = null;
  118.                             p.hash = h;
  119.                             i = p.index >>>= 1; // descend
  120.                             if (Thread.interrupted())
  121.                                 return null;
  122.                             if (timed && m == 0 && ns <= 0L)
  123.                                 return TIMED_OUT;
  124.                             break; // expired; restart
  125.                         }
  126.                     }
  127.                 }
  128.                 else
  129.                     //获取槽位失败,先清空数据
  130.                     p.item = null; // clear offer
  131.             }
  132.             //不在有效范围内,或者已经被其它线程抢了~
  133.             else {
  134.                 //更新bound
  135.                 if (p.bound != b) { // stale; reset
  136.                     p.bound = b;
  137.                     p.collides = 0;
  138.                     i = (i != m || m == 0) ? m : m - 1;
  139.                 }
  140.                 else if ((c = p.collides) < m || m == FULL ||
  141.                          !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
  142.                     p.collides = c + 1;
  143.                     i = (i == 0) ? m : i - 1; // cyclically traverse
  144.                 }
  145.                 else
  146.                     //往后挪动
  147.                     i = m + 1; // grow
  148.                 //更新下标
  149.                 p.index = i;
  150.             }
  151.         }
  152.     }
  153.     //单槽位交换方法
  154.     private final Object slotExchange(Object item, boolean timed, long ns) {
  155.         //获取当前线程携带的数据
  156.         Node p = participant.get();
  157.         //获取当前线程
  158.         Thread t = Thread.currentThread();
  159.         //判断中断不为中断
  160.         if (t.isInterrupted()) // preserve interrupt status so caller can recheck
  161.             return null;
  162.         //自旋
  163.         for (Node q;;) {
  164.             //赋给q
  165.             if ((q = slot) != null) {
  166.                 //cas交换对象并将slot置为null
  167.                 if (U.compareAndSwapObject(this, SLOT, q, null)) {
  168.                     Object v = q.item;
  169.                     //将当前的对象赋给交换对象的match
  170.                     q.match = item;
  171.                     //取出交换的线程
  172.                     Thread w = q.parked;
  173.                     //唤醒线程
  174.                     if (w != null)
  175.                         U.unpark(w);
  176.                     return v;
  177.                 }
  178.                 // create arena on contention, but continue until slot null
  179.                 if (NCPU > 1 && bound == 0 &&
  180.                     U.compareAndSwapInt(this, BOUND, 0, SEQ))
  181.                     arena = new Node[(FULL + 2) << ASHIFT];
  182.             }
  183.             //多槽交换不为空则退出
  184.             else if (arena != null)
  185.                 return null; // caller must reroute to arenaExchange
  186.             else {
  187.                 //把数据赋给当前node节点
  188.                 p.item = item;
  189.                 //进行交换数据 成功则退出
  190.                 if (U.compareAndSwapObject(this, SLOT, null, p))
  191.                     break;
  192.                  //如果交换不成功则赋为空
  193.                 p.item = null;
  194.             }
  195.         }
  196.         // await release
  197.         //拿到节点hash
  198.         int h = p.hash;
  199.         //计算时间
  200.         long end = timed ? System.nanoTime() + ns : 0L;
  201.         //获取自旋次数
  202.         int spins = (NCPU > 1) ? SPINS : 1;
  203.         //返回值
  204.         Object v;
  205.         //判断不为空
  206.         while ((v = p.match) == null) {
  207.             //自旋次数
  208.             if (spins > 0) {
  209.                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
  210.                 if (h == 0)
  211.                     h = SPINS | (int)t.getId();
  212.                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
  213.                     //让出线程权限
  214.                     Thread.yield();
  215.             }
  216.             //优化操作 自旋线程准备中
  217.             else if (slot != p)
  218.                 spins = SPINS;
  219.             //线程不挂起 不是 多槽 时间没结束
  220.             else if (!t.isInterrupted() && arena == null &&
  221.                      (!timed || (ns = end - System.nanoTime()) > 0L)) {
  222.                 //
  223.                 U.putObject(t, BLOCKER, this);
  224.                 p.parked = t;
  225.                 if (slot == p)
  226.                     U.park(false, ns);
  227.                 p.parked = null;
  228.                 U.putObject(t, BLOCKER, null);
  229.             }
  230.             else if (U.compareAndSwapObject(this, SLOT, p, null)) {
  231.                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
  232.                 break;
  233.             }
  234.         }
  235.         //
  236.         U.putOrderedObject(p, MATCH, null);
  237.         p.item = null;
  238.         p.hash = h;
  239.         //返回这个v是线程交换线程的。
  240.         return v;
  241.     }
  242.     //构造方法
  243.     public Exchanger() {
  244.         participant = new Participant();
  245.     }
  246.     //交换的数据
  247.     @SuppressWarnings("unchecked")
  248.     public V exchange(V x) throws InterruptedException {
  249.         //当前线程用于交换的数据
  250.         Object v;
  251.         //获取item值
  252.         Object item = (x == null) ? NULL_ITEM : x; // translate null args
  253.         //不是多槽
  254.         if ((arena != null ||
  255.               //slotExchange 单槽位交换实现的方法
  256.              (v = slotExchange(item, false, 0L)) == null) &&
  257.             ((Thread.interrupted() || // disambiguates null return
  258.               (v = arenaExchange(item, false, 0L)) == null)))
  259.             throw new InterruptedException();
  260.         return (v == NULL_ITEM) ? null : (V)v;
  261.     }
  262.     //
  263.     @SuppressWarnings("unchecked")
  264.     public V exchange(V x, long timeout, TimeUnit unit)
  265.         throws InterruptedException, TimeoutException {
  266.         Object v;
  267.         Object item = (x == null) ? NULL_ITEM : x;
  268.         long ns = unit.toNanos(timeout);
  269.         if ((arena != null ||
  270.              (v = slotExchange(item, true, ns)) == null) &&
  271.             ((Thread.interrupted() ||
  272.               (v = arenaExchange(item, true, ns)) == null)))
  273.             throw new InterruptedException();
  274.         if (v == TIMED_OUT)
  275.             throw new TimeoutException();
  276.         return (v == NULL_ITEM) ? null : (V)v;
  277.     }
  278.     //字段偏移量信息
  279.     private static final sun.misc.Unsafe U;
  280.     private static final long BOUND;
  281.     private static final long SLOT;
  282.     private static final long MATCH;
  283.     private static final long BLOCKER;
  284.     private static final int ABASE;
  285.     //初始化信息
  286.     static {
  287.         int s;
  288.         try {
  289.             U = sun.misc.Unsafe.getUnsafe();
  290.             Class<?> ek = Exchanger.class;
  291.             Class<?> nk = Node.class;
  292.             Class<?> ak = Node[].class;
  293.             Class<?> tk = Thread.class;
  294.             BOUND = U.objectFieldOffset
  295.                 (ek.getDeclaredField("bound"));
  296.             SLOT = U.objectFieldOffset
  297.                 (ek.getDeclaredField("slot"));
  298.             MATCH = U.objectFieldOffset
  299.                 (nk.getDeclaredField("match"));
  300.             BLOCKER = U.objectFieldOffset
  301.                 (tk.getDeclaredField("parkBlocker"));
  302.             s = U.arrayIndexScale(ak);
  303.             // ABASE absorbs padding in front of element 0
  304.             ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
  305.         } catch (Exception e) {
  306.             throw new Error(e);
  307.         }
  308.         if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
  309.             throw new Error("Unsupported array scale");
  310.     }
  311. }

最后

    exchanger默认也是使用了CAS+park/unpark进行实现的,我这里是基于jdk8,jdk7与jdk8是有区别的。本身解决的问题是通过两个线程进行交换执行值,没想到这个exchanger代码不多但是非常复杂,有些可能写得不好,但是有想深入同学可以看看下面两个文章。

参考资料:

https://www.bilibili.com/video/BV17P4y177SD/?spm_id_from=333.788&vd_source=7d0e42b081e08cb3cefaea55cc1fa8b7

https://blog.csdn.net/weixin_30612769/article/details/97769773

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/149085
推荐阅读
相关标签
  

闽ICP备14008679号