赞
踩
JUC在JDK1.8版本中引入了Striped64类及其四个实现类LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,在原有的AtomicXXXXX等系列上,针对多线程并发情况进行了优化。
Striped64类被设计用来支持累加器的的并发组件,可以在多线程高并发环境下安全的累加计数。Striped64内部存储了base、cells数组和cellsBusy锁,计数线程首先会尝试使用CAS对base变量进行更改,若更新成功,则计数完成,此时累加器竞争并不激烈。若未更新成功,表示此时竞争较激烈,需要引入cells数组,将待加数值放入cells内。调用累加器的线程与cells数组中的cell一一对应,各线程只操作自身对应的cell,Striped64根据线程的threadLocalRandomProbe属性值计算哈希值,将线程定位到某个固定的cell上。最后,需要统计计数结果时,若nullcells,则直接返回base值,若nullcells时,则累加cells各索引位置的值后返回。
源码分析:
实现基础:
// cell数组,若不为null,则为2的n次方.
transient volatile Cell[] cells;
// 基础值,在更新操作时基于CAS无锁技术实现原子更新.
transient volatile long base;
// CAS锁,用于保护创建或者扩展Cell表.
transient volatile int cellsBusy;
基本方法:
// 依赖CAS,更新base的值.
final boolean casBase(long cmp, long val);
// 依赖CAS,更新cells的值.0表示未被占用,1表示已被占用.
final boolean casCellsBusy();
// 获取线程的threadLocalRandomProbe.
static final int getProbe();
// 被用来进行创建、初始化、扩展.针对long类型.
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended);
// 被用来进行创建、初始化、扩展.针对double类型.
final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);
longAccumulate和doubleAccumulate实现思路很类似,我们主要针对longAccumulate来逐步解析一下:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 获取线程的threadLocalRandomProbe.
// 若线程的threadLocalRandomProbe == 0,则重新初始化.
if ((h = getProbe()) == 0) {
// 强制初始化.
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
// 无限循环.
for (;;) {
Cell[] as; Cell a; int n; long v;
// as = cells; as != null && n = as.length; n > 0
// cells 不为空时.
if ((as = cells) != null && (n = as.length) > 0) {
// (n - 1) & h 计算出当前线程在cells中的索引值.
// 当cells[(n - 1)&h]不为null时.
if ((a = as[(n - 1) & h]) == null) {
// cellsBusy未被占用时.
if (cellsBusy == 0) {
// 新建cell.
Cell r = new Cell(x);
// cellsBusy未被占用同时更新cellsBusy=1.
if (cellsBusy == 0 && casCellsBusy()) {
// 创建标识.
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// rs = cells; rs != null && m = rs.length; m > 0
// && j = (m - 1)&h; rs[j] == null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 将新建的Cell放入单元格.
rs[j] = r;
// 设置创建标识.
created = true;
}
} finally {
// 释放cellsBusy.
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
// CAS已经失败.需要重新计算索引位置.
else if (!wasUncontended)
wasUncontended = true;
// v = a.value
// fn == null) ? v + x : fn.applyAsLong(v, x))
// a.cas(v, fn)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// cells最大长度为cpu的数量,cells != as表示cells已经被更新了.
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
// cellsBusy未被占用同时更新cellsBusy=1.
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 扩展cells.
if (cells == as) {
// 新建原cells长度*2的新cells.
Cell[] rs = new Cell[n << 1];
// 原cells数据迁移至新cells.
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 整理后的新cells生效.
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
// hash过程可能存在冲突,使用此方法可解决问题.
h = advanceProbe(h);
}
// cellsBusy == 0 && cells == as(as在上面if语句里已经付过钱了).
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
//
if (cells == as) {
// 默认初始化为2.
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// v = base.
// (fn == null) ? v + x : fn.applyAsLong(v, x)).
// casBase(v, fn);
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
doubleAccumulate和longAccumulate,差别只在于数据类型的不同,其他逻辑是没有差别的,大家可以自己看一下这部分的源码。
Striped64是个抽象类,Striped64为它的继承者们LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator提供了实现基础,下一篇,我们讲述LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator是使用方法和源码分析。
注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。
如果有哪里有不明白或不清楚的内容,欢迎留言哦!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。