当前位置:   article > 正文

Striped64 深入源码解析

Striped64 深入源码解析

  JUC在JDK1.8版本中引入了Striped64类及其四个实现类LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,在原有的AtomicXXXXX等系列上,针对多线程并发情况进行了优化。

  Striped64类被设计用来支持累加器的的并发组件,可以在多线程高并发环境下安全的累加计数。Striped64内部存储了base、cells数组和cellsBusy锁,计数线程首先会尝试使用CAS对base变量进行更改,若更新成功,则计数完成,此时累加器竞争并不激烈。若未更新成功,表示此时竞争较激烈,需要引入cells数组,将待加数值放入cells内。调用累加器的线程与cells数组中的cell一一对应,各线程只操作自身对应的cell,Striped64根据线程的threadLocalRandomProbe属性值计算哈希值,将线程定位到某个固定的cell上。最后,需要统计计数结果时,若nullcells,则直接返回base值,若nullcells时,则累加cells各索引位置的值后返回。

  源码分析:

  实现基础:

// cell数组,若不为null,则为2的n次方.
transient volatile Cell[] cells;
// 基础值,在更新操作时基于CAS无锁技术实现原子更新.
transient volatile long base;
// CAS锁,用于保护创建或者扩展Cell表.
transient volatile int cellsBusy;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  基本方法:

// 依赖CAS,更新base的值.
final boolean casBase(long cmp, long val);
// 依赖CAS,更新cells的值.0表示未被占用,1表示已被占用.
final boolean casCellsBusy();
// 获取线程的threadLocalRandomProbe.
static final int getProbe();
// 被用来进行创建、初始化、扩展.针对long类型.
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended);
// 被用来进行创建、初始化、扩展.针对double类型.
final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

 longAccumulate和doubleAccumulate实现思路很类似,我们主要针对longAccumulate来逐步解析一下:

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    // 获取线程的threadLocalRandomProbe.
    // 若线程的threadLocalRandomProbe == 0,则重新初始化.
    if ((h = getProbe()) == 0) {
        // 强制初始化.
        ThreadLocalRandom.current(); 
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                
    // 无限循环.
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // as = cells; as != null && n = as.length; n > 0
        // cells 不为空时.
        if ((as = cells) != null && (n = as.length) > 0) {
            // (n - 1) & h 计算出当前线程在cells中的索引值.
            // 当cells[(n - 1)&h]不为null时.
            if ((a = as[(n - 1) & h]) == null) {
                // cellsBusy未被占用时.
                if (cellsBusy == 0) {
                    // 新建cell.
                    Cell r = new Cell(x);
                    // cellsBusy未被占用同时更新cellsBusy=1.
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 创建标识.
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // rs = cells; rs != null && m = rs.length; m > 0
                            // && j = (m - 1)&h; rs[j] == null
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 将新建的Cell放入单元格.
                                rs[j] = r;
                                // 设置创建标识.
                                created = true;
                            }
                        } finally {
                            // 释放cellsBusy.
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue; 
                    }
                }
                collide = false;
            }
            // CAS已经失败.需要重新计算索引位置.
            else if (!wasUncontended)
                wasUncontended = true;
            // v = a.value
            // fn == null) ? v + x : fn.applyAsLong(v, x))
            // a.cas(v, fn)
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // cells最大长度为cpu的数量,cells != as表示cells已经被更新了.
            else if (n >= NCPU || cells != as)
                collide = false;
            else if (!collide)
                collide = true;
            // cellsBusy未被占用同时更新cellsBusy=1.
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 扩展cells.
                    if (cells == as) {
                        // 新建原cells长度*2的新cells.
                        Cell[] rs = new Cell[n << 1];
                        // 原cells数据迁移至新cells.
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 整理后的新cells生效.
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue; 
            }
            // hash过程可能存在冲突,使用此方法可解决问题.
            h = advanceProbe(h);
        }
        // cellsBusy == 0 && cells == as(as在上面if语句里已经付过钱了).
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                          
                // 
                if (cells == as) {
                    // 默认初始化为2.
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // v = base.
        // (fn == null) ? v + x : fn.applyAsLong(v, x)).
        // casBase(v, fn);
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113

  doubleAccumulate和longAccumulate,差别只在于数据类型的不同,其他逻辑是没有差别的,大家可以自己看一下这部分的源码。

  Striped64是个抽象类,Striped64为它的继承者们LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator提供了实现基础,下一篇,我们讲述LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator是使用方法和源码分析。

  注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。

​  如果有哪里有不明白或不清楚的内容,欢迎留言哦!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/363190
推荐阅读
相关标签
  

闽ICP备14008679号