赞
踩
上文:AQS-semaphore&CyclicBarrier&CountDownLatch源码学习
源码下载:https://gitee.com/hong99/jdk8
Exchanger是什么?
exchanger是一个极少使用到的交换类,主要用于线程阻塞或者因为阻塞引起但任务又急于执行,这里候就可以进行交换。但是有一个非常的复杂点就是两个并发任务执行过程中交换数据,这一点是非常厉害的,可以看下下面的一些基础实现。
基础功能的学习
- package com.aqs;
-
- import java.util.concurrent.Exchanger;
-
- /**
- * @author: csh
- * @Date: 2022/12/17 10:43
- * @Description:Exchanger 学习
- */
- public class ExchangerStudy {
-
- public static void main(String[] args) {
- Exchanger<Integer> exchanger = new Exchanger<Integer>();
- for (int i = 0; i < 10; i++) {
- final int index=i;
- new Thread(new Runnable() {
- @Override
- public void run() {
- System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"下标为:"+index);
- try {
- int newIndex = exchanger.exchange(index);
- //等待100毫秒
- Thread.sleep(100);
- System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"原来下标为:"+index+"交换后下标为:"+newIndex);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
-
- }
- }
结果
- 当前线程:Thread_Thread-2下标为:2
- 当前线程:Thread_Thread-1下标为:1
- 当前线程:Thread_Thread-4下标为:4
- 当前线程:Thread_Thread-3下标为:3
- 当前线程:Thread_Thread-5下标为:5
- 当前线程:Thread_Thread-6下标为:6
- 当前线程:Thread_Thread-0下标为:0
- 当前线程:Thread_Thread-7下标为:7
- 当前线程:Thread_Thread-8下标为:8
- 当前线程:Thread_Thread-9下标为:9
- 当前线程:Thread_Thread-4原来下标为:4交换后下标为:3
- 当前线程:Thread_Thread-6原来下标为:6交换后下标为:5
- 当前线程:Thread_Thread-0原来下标为:0交换后下标为:7
- 当前线程:Thread_Thread-9原来下标为:9交换后下标为:8
- 当前线程:Thread_Thread-8原来下标为:8交换后下标为:9
- 当前线程:Thread_Thread-7原来下标为:7交换后下标为:0
- 当前线程:Thread_Thread-5原来下标为:5交换后下标为:6
- 当前线程:Thread_Thread-2原来下标为:2交换后下标为:1
- 当前线程:Thread_Thread-1原来下标为:1交换后下标为:2
- 当前线程:Thread_Thread-3原来下标为:3交换后下标为:4
可以看到两个线程可以交换执行的下标。是比较厉害。接下来我们看看底层的源码实现。
源码学习
java.util.concurrent.Exchanger 源码实现时
- //交换机
- public class Exchanger<V> {
- //避免伪共享 左移数据下标(获取内存偏移量)
- private static final int ASHIFT = 7;
-
- //节点最大的数组下标
- private static final int MMASK = 0xff;
-
- //用于递增,每次加一个seq
- private static final int SEQ = MMASK + 1;
-
- //获取cpu的核数
- private static final int NCPU = Runtime.getRuntime().availableProcessors();
-
- //实际组数长度(线程数)
- static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
-
- //自旋次数 当cpu为单核时,该参为禁用
- private static final int SPINS = 1 << 10;
-
- //用于提供给其他线程的交换对象
- private static final Object NULL_ITEM = new Object();
-
- //用于超时的传递对象
- private static final Object TIMED_OUT = new Object();
-
- //交换节点 保存交换的数据
- @sun.misc.Contended static final class Node {
- int index; // 多槽数据索引
- int bound; // 上一次的边界
- int collides; // 记录边界范围内cas失败次数
- int hash; // 代表hash值 用于自旋优化
- Object item; // 节点带的数据
- volatile Object match; // 未来 配对成功 交换的数据
- volatile Thread parked; // 匹配的线程
- }
- //本地线程类实现初始化值
- static final class Participant extends ThreadLocal<Node> {
- public Node initialValue() { return new Node(); }
- }
-
- // 存放node节点 保障线程安全
- private final Participant participant;
-
- //多槽数组
- private volatile Node[] arena;
-
- /**
- * 交换的槽位
- */
- private volatile Node slot;
-
- //上次记录
- private volatile int bound;
-
- //多槽位的交换实现(带过期时间)
- private final Object arenaExchange(Object item, boolean timed, long ns) {
- //获取多槽位数组
- Node[] a = arena;
- //获取当前线程node对象
- Node p = participant.get();
- for (int i = p.index;;) { // access slot at i
- int b, m, c; long j; // 初始化相关的交换变量
- //获取交换节点信息(先获取偏移地址)
- Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
- //如果不为空(证明已经有线程) 进行交换
- if (q != null && U.compareAndSwapObject(a, j, q, null)) {
- //获取交换q的内容
- Object v = q.item; // release
- //将当前线程的内容赋值给q的match
- q.match = item;
- //获取被交换线程
- Thread w = q.parked;
- //不为空进行唤醒
- if (w != null)
- U.unpark(w);
- //这个是交换后的值
- return v;
- }
- //槽位还没被占的场景
- else if (i <= (m = (b = bound) & MMASK) && q == null) {
- //交换对象值
- p.item = item; // offer
- //cas交换
- if (U.compareAndSwapObject(a, j, null, p)) {
- //计算超时时间
- long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
- Thread t = Thread.currentThread(); // wait
- for (int h = p.hash, spins = SPINS;;) {
- //用来携带交换线程的数
- Object v = p.match;
- //已被交换 则清标识
- if (v != null) {
- U.putOrderedObject(p, MATCH, null);
- p.item = null; // clear for next use
- p.hash = h;
- //交换成功
- return v;
- }
- //判断自旋是否大于0
- else if (spins > 0) {
- //太复杂了这里 反正就是各各自旋次数获取 然后释放线程资源
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
- if (h == 0) // initialize hash
- h = SPINS | (int)t.getId();
- else if (h < 0 && // approx 50% true
- (--spins & ((SPINS >>> 1) - 1)) == 0)
- Thread.yield(); // two yields per wait
- }
- //已有交换线程 准备数据中
- else if (U.getObjectVolatile(a, j) != p)
- spins = SPINS; // releaser hasn't set match yet
- // //线程不挂起 不是 多槽 时间没结束
- else if (!t.isInterrupted() && m == 0 &&
- (!timed ||
- (ns = end - System.nanoTime()) > 0L)) {
- U.putObject(t, BLOCKER, this); // emulate LockSupport
- p.parked = t; // minimize window
- //注意这里 park则挂起线程
- if (U.getObjectVolatile(a, j) == p)
- U.park(false, ns);
- p.parked = null;
- U.putObject(t, BLOCKER, null);
- }
- //换槽位(原因可能一直没有线程) 逻辑有点复杂
- else if (U.getObjectVolatile(a, j) == p &&
- U.compareAndSwapObject(a, j, p, null)) {
- if (m != 0) // try to shrink
- U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
- p.item = null;
- p.hash = h;
- i = p.index >>>= 1; // descend
- if (Thread.interrupted())
- return null;
- if (timed && m == 0 && ns <= 0L)
- return TIMED_OUT;
- break; // expired; restart
- }
- }
- }
- else
- //获取槽位失败,先清空数据
- p.item = null; // clear offer
- }
- //不在有效范围内,或者已经被其它线程抢了~
- else {
- //更新bound
- if (p.bound != b) { // stale; reset
- p.bound = b;
- p.collides = 0;
- i = (i != m || m == 0) ? m : m - 1;
- }
- else if ((c = p.collides) < m || m == FULL ||
- !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
- p.collides = c + 1;
- i = (i == 0) ? m : i - 1; // cyclically traverse
- }
- else
- //往后挪动
- i = m + 1; // grow
- //更新下标
- p.index = i;
- }
- }
- }
-
- //单槽位交换方法
- private final Object slotExchange(Object item, boolean timed, long ns) {
- //获取当前线程携带的数据
- Node p = participant.get();
- //获取当前线程
- Thread t = Thread.currentThread();
- //判断中断不为中断
- if (t.isInterrupted()) // preserve interrupt status so caller can recheck
- return null;
- //自旋
- for (Node q;;) {
- //赋给q
- if ((q = slot) != null) {
- //cas交换对象并将slot置为null
- if (U.compareAndSwapObject(this, SLOT, q, null)) {
- Object v = q.item;
- //将当前的对象赋给交换对象的match
- q.match = item;
- //取出交换的线程
- Thread w = q.parked;
- //唤醒线程
- if (w != null)
- U.unpark(w);
- return v;
- }
- // create arena on contention, but continue until slot null
- if (NCPU > 1 && bound == 0 &&
- U.compareAndSwapInt(this, BOUND, 0, SEQ))
- arena = new Node[(FULL + 2) << ASHIFT];
- }
- //多槽交换不为空则退出
- else if (arena != null)
- return null; // caller must reroute to arenaExchange
- else {
- //把数据赋给当前node节点
- p.item = item;
- //进行交换数据 成功则退出
- if (U.compareAndSwapObject(this, SLOT, null, p))
- break;
- //如果交换不成功则赋为空
- p.item = null;
- }
- }
-
- // await release
- //拿到节点hash
- int h = p.hash;
- //计算时间
- long end = timed ? System.nanoTime() + ns : 0L;
- //获取自旋次数
- int spins = (NCPU > 1) ? SPINS : 1;
- //返回值
- Object v;
- //判断不为空
- while ((v = p.match) == null) {
- //自旋次数
- if (spins > 0) {
- h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
- if (h == 0)
- h = SPINS | (int)t.getId();
- else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
- //让出线程权限
- Thread.yield();
- }
- //优化操作 自旋线程准备中
- else if (slot != p)
- spins = SPINS;
- //线程不挂起 不是 多槽 时间没结束
- else if (!t.isInterrupted() && arena == null &&
- (!timed || (ns = end - System.nanoTime()) > 0L)) {
- //
- U.putObject(t, BLOCKER, this);
- p.parked = t;
- if (slot == p)
- U.park(false, ns);
- p.parked = null;
- U.putObject(t, BLOCKER, null);
- }
- else if (U.compareAndSwapObject(this, SLOT, p, null)) {
- v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
- break;
- }
- }
- //
- U.putOrderedObject(p, MATCH, null);
- p.item = null;
- p.hash = h;
- //返回这个v是线程交换线程的。
- return v;
- }
-
- //构造方法
- public Exchanger() {
- participant = new Participant();
- }
-
- //交换的数据
- @SuppressWarnings("unchecked")
- public V exchange(V x) throws InterruptedException {
- //当前线程用于交换的数据
- Object v;
- //获取item值
- Object item = (x == null) ? NULL_ITEM : x; // translate null args
- //不是多槽
- if ((arena != null ||
- //slotExchange 单槽位交换实现的方法
- (v = slotExchange(item, false, 0L)) == null) &&
- ((Thread.interrupted() || // disambiguates null return
- (v = arenaExchange(item, false, 0L)) == null)))
- throw new InterruptedException();
- return (v == NULL_ITEM) ? null : (V)v;
- }
-
- //
- @SuppressWarnings("unchecked")
- public V exchange(V x, long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- Object v;
- Object item = (x == null) ? NULL_ITEM : x;
- long ns = unit.toNanos(timeout);
- if ((arena != null ||
- (v = slotExchange(item, true, ns)) == null) &&
- ((Thread.interrupted() ||
- (v = arenaExchange(item, true, ns)) == null)))
- throw new InterruptedException();
- if (v == TIMED_OUT)
- throw new TimeoutException();
- return (v == NULL_ITEM) ? null : (V)v;
- }
-
- //字段偏移量信息
- private static final sun.misc.Unsafe U;
- private static final long BOUND;
- private static final long SLOT;
- private static final long MATCH;
- private static final long BLOCKER;
- private static final int ABASE;
- //初始化信息
- static {
- int s;
- try {
- U = sun.misc.Unsafe.getUnsafe();
- Class<?> ek = Exchanger.class;
- Class<?> nk = Node.class;
- Class<?> ak = Node[].class;
- Class<?> tk = Thread.class;
- BOUND = U.objectFieldOffset
- (ek.getDeclaredField("bound"));
- SLOT = U.objectFieldOffset
- (ek.getDeclaredField("slot"));
- MATCH = U.objectFieldOffset
- (nk.getDeclaredField("match"));
- BLOCKER = U.objectFieldOffset
- (tk.getDeclaredField("parkBlocker"));
- s = U.arrayIndexScale(ak);
- // ABASE absorbs padding in front of element 0
- ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
-
- } catch (Exception e) {
- throw new Error(e);
- }
- if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
- throw new Error("Unsupported array scale");
- }
-
- }
最后
exchanger默认也是使用了CAS+park/unpark进行实现的,我这里是基于jdk8,jdk7与jdk8是有区别的。本身解决的问题是通过两个线程进行交换执行值,没想到这个exchanger代码不多但是非常复杂,有些可能写得不好,但是有想深入同学可以看看下面两个文章。
参考资料:
https://www.bilibili.com/video/BV17P4y177SD/?spm_id_from=333.788&vd_source=7d0e42b081e08cb3cefaea55cc1fa8b7
https://blog.csdn.net/weixin_30612769/article/details/97769773
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。