赞
踩
我们在进行计数统计
的时,通常会使用AtomicLong
来实现,AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
说到线程安全的计数统计工具类,肯定少不了Atomic下的几个原子类。AtomicLong就是java.util.concurrent
包下重要的原子类
,在并发情况下可以对长整形类型数据进行原子操作,保证并发情况下数据的安全性
。
public class AtomicLong extends Number implements java.io.Serializable { /** * Atomically increments by one the current value. * 以原子方式将当前值递增 1 * @return the previous value * @return 先前值 */ public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } /** * Atomically decrements by one the current value. * 以原子方式将当前值递减 1 * @return the previous value * @return 先前值 */ public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; } }
注意:AtomicLong类中的
getAndIncrement()
和getAndDecrement()
和上面两个方法的功能一样
我们在计数的过程中,一般使用incrementAndGet()
和decrementAndGet()
进行加一和减一操作,这里调用了Unsafe类中的getAndAddLong()
方法进行操作。
这里直接进行CAS+自旋操作更新AtomicLong中的value值,进而保证value值的原子性更新。
public final int getAndAddInt(Object var1, long var2, int var4) {
// 定义一个变量
int var5;
//循环
do {
//获取字段对应的值
//注意:如果失败,就会从新获取这个值,要不然就会一直自旋
//如果在获取这个值后,在进入CAS之前,其他线程修改了该值,才会进行CAS
var5 = this.getIntVolatile(var1, var2);
//CAS+自旋:替换成功就返回true,跳出循环,替换失败就返回false,继续自旋
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
//将替换之前的值返回
return var5;
}
/**
* 功能:获取Object对象中offset偏移地址对应的整型字段(field)的值,支持volatile load语义
* var1:包含要读取字段(Field)的对象
* var2:该字段的偏移地址
*/
public native int getIntVolatile(Object var1, long var2);
/**
* var1:包含要读取字段(Field)的对象
* var2:该字段的偏移地址
* var4:期望值
* var5:新值
* 更新成功返回true,失败返回false
*/
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
通过查看上面的源码,我们知道 AtomicLong类中进行加一和减一的操作中都涉及到了Unsafe.getAndAddInt()方法,该方法通过CAS + 自旋来实现并发修改安全,如果在高并发环境下有N个线程进行加一或者减一操作,极端情况下,假如有N-1个线程进入了自旋操作,会出现大量失败并不断自旋的情况,将会占用很多空间
,这就是AtomicLong缺陷。
有了AtomicLong为什么还要说LongAdder,就是因为在高并发环境下,LongAdder的效率要比AtomicLong的效率高
。
LongAdder实现图解:
既然说到LongAdder可以显著提升高并发环境下的性能,那么它是如何做到的?
1,设计思想上,LongAdder采用分段的方式降低CAS失败的频次
我们知道,AtomicLong中有个内部变量value
保存着实际的long值
,所有的操作都是针对该变量进行,也就是说,高并发环境下,value变量其实是一个热点数据
,也就是N个线程竞争一个热点。
LongAdder的基本思路就是分散热点
,将value值的新增操作分散到一个数组中,不同线程
会命中到数组的不同槽中
,各个线程只对自己槽中的那个value值
进行CAS操作
,这样热点就被分散了,冲突的概率就小很多。
LongAdder有一个全局变量volatile long base
值,当并发不高
的情况下都是通过CAS来直接操作base值,如果CAS失败
,则针对LongAdder中的Cell[]数组中的Cell进行CAS操作
,减少失败的概率。
举例
例如当前类中
base = 10
,有三个线程进行CAS原子性的+1操作
,线程1
执行成功,此时base=11
,线程2、线程3执行失败后开始针对于Cell[]数组中的Cell元素进行+1操作
,同样也是CAS操作,此时数组index=1和index=2中Cell的value都被设置为了1.
执行完成后,统计累加数据:sum = 11 + 1 + 1 = 13
,利用LongAdder进行累加的操作就执行完了
2、使用Contended注解来消除伪共享
Striped64
中存在一个 volatile Cell[] cells;
数组,其长度是2 的幂次方,每个Cell都使用 @Contended
注解进行修饰,而@Contended
注解可以进行缓存行填充,从而解决伪共享问题,伪共享会导致缓存行失效,缓存一致性开销变大。@sun.misc.Contended static final class Cell {
}
伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU缓存失效,尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。
解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程的变量存在于不同的 CacheLine
即可,使用多余的字节来填充可以做点这一点,这样就不会出现伪共享问题。
3、惰性求值
LongAdder只有在使用longValue()获取当前累加值时才会真正的去结算计数的数据,longValue()方法底层就是调用sum()方法,对base和Cell数组的数据累加然后返回,做到数据写入和读取分离。
而AtomicLong使用incrementAndGet()每次都会返回long类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。
一般我们进行计数时都会使用increment()
方法进行+1操作,decrement()
方法进行-1操作,这两个方法的内部都是调用add()
方法,源码如下:
/** * as 表示cells引用 * b 表示获取的base值 * v 表示 期望值, * m 表示 cells 数组的长度 * a 表示当前线程命中的cell单元格 */ public void add(long x) { Cell[] as; long b, v; int m; Cell a; /** * 这里其实对应的就是单线程的情况: * 条件一: * true:表示cell数组已经初始化过了,当前线程应该经数据写入对应的cell中 * false:表示cell未被初始化,当前所有线程豆浆数据写入base中 * 条件二:casBase(b = base, b + x),注意:没取反 * true:表示当前线程cas替换数据成功 * false:表示发生竞争,可能需要重试或者扩容 * * 当数组已经初始化,当前线程需要将数据写入cell中 或者 发生竞争,可能需要重试或者扩容 */ if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; /** * 条件一:as == null || (m = as.length - 1) < 0 * true:说明cells数组未初始化,当前线程写base发生竞争 * false:已经初始化,当前线程需要向cell中写数据 * * 条件二:(a = as[getProbe() & m]) == null * getProbe()获取当前线程的hash值,m表示cells长度-1,cells长度是2的幂次方数, * true:说明当前线程通过hash计算出来数组位置处的cell为空,可以向里面写值 * false:说明当前线程对应的cell不为空 * * 条件三:!(uncontended = a.cas(v = a.value, v + x) * 主要看a.cas(v = a.value, v + x),接着条件二,说明当前线程hash与数组长度取模计算出 * 的位置的cell有值 * true:CAS失败,当前线程对应的cell有竞争 * false:表示替换成功 */ if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } /** * 加1操作 */ public void increment() { add(1L); } /** * 减1操作 */ public void decrement() { add(-1L); }
通过前面的分析,我们知道了:
longAccumulate(x, null, uncontended);
方法longAccumulate(x, null, uncontended);
方法重试或者扩容
),也会执行longAccumulate(x, null, uncontended);
方法这个方法是 Striped64类中提供的方法,我们先看一下这个类的源码:
@SuppressWarnings("serial") abstract class Striped64 extends Number { // 静态内部类Cell static final class Cell { volatile long value; Cell(long x) { value = x; } //CAS操作 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // 声明一个Unsafe类的变量 private static final sun.misc.Unsafe UNSAFE; //声明一个长整型的变量:value字段的内存便宜量 private static final long valueOffset; // 静态代码块 static { try { // 获取Unsafe类的实例 UNSAFE = sun.misc.Unsafe.getUnsafe(); // 获取Cell类的class对象 Class<?> ak = Cell.class; //给Cell类的value字段分配地址,返回value字段的内存便宜量 valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } /** CPUS 数量,用于限制cells数组大小*/ static final int NCPU = Runtime.getRuntime().availableProcessors(); /**Cell数组,当非空时,大小是 2 的幂.*/ transient volatile Cell[] cells; /**基值,没有发生竞争的时候直接将值加到base上, 当扩容是需要将值写到base中,通过 CAS 更新 */ transient volatile long base; /** 调整/或创建Cell数组时使用的自旋锁(通过 CAS 锁定)*/ transient volatile int cellsBusy; /** 包私有默认构造函数 */ Striped64() {} /**CASes*/ final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } /** 通过CAS方式获取锁,cellsBusy字段为0时才能获取锁,获取之后为1*/ final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } /**获取当前线程的hash值*/ static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } /** 重置当前线程的hash值,使之更加散列*/ static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } /** 这个方法会详细介绍*/ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //这个方法会详细介绍 } /** 功能与longAccumulate 相似*/ final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended) { //.......... } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
下面就来看看这个longAccumulate方法
的源码:
/** * @param x 需要增加的值,一般默认都是1 * @param fn 默认传递的是null * @param wasUncontended 竞争标识,如果是false则代表有竞争,只有cells初始化之后,并且当前线程 * CAS竞争修改失败(在同一个cell位置竞争失败),才会是false */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 表示hash值 int h; // getProbe():获取当前线程的hash值,条件成立说明还没有给当前线程分配hash值 if ((h = getProbe()) == 0) { // 分配hash值,分配完之后会将这个值存到: private static final long PROBE; ThreadLocalRandom.current(); // 取出当前线程的hash值 h = getProbe(); // 为什么? 默认情况下,分配完之后现将它放入cells[0]这个位置,如果发生冲突,在通过hash值存放 //这里含义是重新计算了当前线程的hash后认为此次不算是一次竞争。 // hash值被重置就好比一个全新的线程一样,所以设置了竞争状态为true。 wasUncontended = true; } // 扩容意向:false一定不会扩容,true可能会扩容 boolean collide = false; // 自旋 for (;;) { /** * as:cells数组变量 * a:cell实例变量 * n:用来接收cells数组的长度 * v:用来接收base值 * 里面的代码有很多if---if else--else语句,我们可以分批来分析 * 在分析里面的代码之前,我们要记着前面进来的条件 */ Cell[] as; Cell a; int n; long v; // CASE1:此判断表示cells数组已经初始化了,线程在向自己对应的cell存值,且第一次cas失败,才会进入 if ((as = cells) != null && (n = as.length) > 0) { //CASE1.1:当前位置不存在冲突 if ((a = as[(n - 1) & h]) == null) { // 判断锁的状态 if (cellsBusy == 0) { // 新建一个cell实例 Cell r = new Cell(x); // 判断能否获取锁 if (cellsBusy == 0 && casCellsBusy()) { // 状态标志 boolean created = false; try { Cell[] rs; int m, j; //添加操作 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } //CASE1.2:当前位置存在冲突,当前线程竞争修改失败,wasUncontended=false表示存在冲突 else if (!wasUncontended) wasUncontended = true; //CASE1.3:通过CAS操作尝试对当前数中的value值进行累加x操作,如果CAS成功则直接跳出循环 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //CASE1.4:如果cells数组的长度达到了CPU核心数,或者cells扩容了,设置扩容意向collide为false else if (n >= NCPU || cells != as) collide = false; //CASE1.5:不满足上面的判断,设置扩容意向为true else if (!collide) collide = true; //CASE1.6:获得锁,进行扩容 /** * 什么时候需要扩容: * T1在添加是时候线程被挂起,T1回来添加的时候发现存在冲突,设置竞争失败标志, * 进入下一次循环, * T1这次循环因为上次存在冲突且竞争失败,对竞争失败的位置进行加X操作,发现这个位置又 * 被别人提前修改了,这时候才会触发扩容操作。 */ else if (cellsBusy == 0 && casCellsBusy()) { try { // 获取旧的cells数组 if (cells == as) { // 创建新的cells数组 Cell[] rs = new Cell[n << 1]; // 旧数组向新数组中赋值 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 重新把新数组赋值给cells变量 cells = rs; } } finally { // 释放锁 cellsBusy = 0; } // 扩容意向设置为false collide = false; // 进入下一次循环 continue; // Retry with expanded table } //重置当前线程hash值 h = advanceProbe(h); } // CASE2:前面判断失败说明:cell还没初始化,线程进来准备获取锁,获取成功才能进入, //对cells数组初始化 // 如果有一个线程获取锁,则cellsBusy==1,其它线程就无法获取锁,就不能对cells数组初始化 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 判断标志 boolean init = false; try { //第二次判断cells==as,是防止cells数组被二次初始化 ,如果有两个线程都执行到了 // 这个判断语句,第一个线程初始化完之后,会释放锁,第二个线程有机会获得锁, // 也会因为cells!=as,防止二次初始化 if (cells == as) { //创建一个长度为2的cells数组 Cell[] rs = new Cell[2]; //创建一个Cell元素,value的值为x,一般默认为1。 rs[h & 1] = new Cell(x); // 将这个数组赋值给cells变量 cells = rs; init = true; } } finally { // 释放锁 cellsBusy = 0; } if (init) break; } //CASE3:只有在初始化Cell数组,多个线程尝试CAS修改cellsBusy加锁的时候,失败 //的线程会走到这个分支,然后直接CAS修改base数据。 //进入到这里说明cells正在或者已经初始化过了 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
当我们使用多线程进行加操作完之后
,最后会计算出这个值,使用的是LongAdder的longValue
方法
public long longValue() {
return sum();
}
public long sum() { /** * 获取cells数组 * */ Cell[] as = cells; Cell a; // 拿到base值 long sum = base; // 进入判断 if (as != null) { //开始循环 for (int i = 0; i < as.length; ++i) { //判断,值不为null,就把这个值加到base上 if ((a = as[i]) != null) sum += a.value; } } // 返回这个sum值 return sum; }
我们用代码来测试一下,直观感受一下,这俩在高并发下面的区别。
public class testAtomicLongAndLongAdder { public static void main(String[] args) throws Exception { testAtomicLongAdder(1, 10000000); testAtomicLongAdder(10, 10000000); testAtomicLongAdder(100, 10000000); } static void testAtomicLongAdder(int threadCount, int times) throws Exception { //打印创建的线程数和加一的次数 System.out.println("threadCount: " + threadCount + ", times: " + times); //获取当前系统时间 long start = System.currentTimeMillis(); //调用:测试LongAdder类 testLongAdder(threadCount, times); //获取执行代码的总时间:差值 System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms"); //打印创建的线程数和加一的次数 System.out.println("threadCount: " + threadCount + ", times: " + times); //获取当前系统时间 long atomicStart = System.currentTimeMillis(); //测试AtomicLong类 testAtomicLong(threadCount, times); //获取执行代码的总时间:差值 System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms"); System.out.println("----------------------------------------"); } //测试AtomicLong类 static void testAtomicLong(int threadCount, int times) throws Exception { //创建一个AtomicLong的实例 AtomicLong atomicLong = new AtomicLong(); //创建一个集合用来存储线程 List<Thread> list = new ArrayList(); //创建threadCount个数的线程 for (int i = 0; i < threadCount; i++) { //将创建的线程存入集合 list.add(new Thread(()->{ //在线程中调用加一操作 for(int j = 0; j < times; j++) { atomicLong.incrementAndGet(); } })); } //使用增强for循环遍历集合中的线程实例,启动线程 for (Thread thread : list) { thread.start(); } //等待这些线程死亡 for (Thread thread : list) { thread.join(); } //打印这个值 System.out.println("AtomicLong value is : " + atomicLong.get()); } //测试LongAdder类 static void testLongAdder(int threadCount, int times) throws Exception { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { longAdder.increment(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("LongAdder value is : " + longAdder.longValue()); } }
threadCount: 1, times: 10000000 LongAdder value is : 10000000 LongAdder 耗时:109ms threadCount: 1, times: 10000000 AtomicLong value is : 10000000 AtomicLong 耗时:46ms ---------------------------------------- threadCount: 10, times: 10000000 LongAdder value is : 100000000 LongAdder 耗时:113ms threadCount: 10, times: 10000000 AtomicLong value is : 100000000 AtomicLong 耗时:1603ms ---------------------------------------- threadCount: 100, times: 10000000 LongAdder value is : 1000000000 LongAdder 耗时:948ms threadCount: 100, times: 10000000 AtomicLong value is : 1000000000 AtomicLong 耗时:17836ms ----------------------------------------
在并发较少的情况下,AtomicLong还能表现出不错的性能,但是在高并发下就不行了,
并发量越大,性能越差。
看上去LongAdder的性能全面超越了AtomicLong,(减少乐观锁的重试次数),但是我们真的就可以舍弃掉LongAdder了吗?
当然不是,我们需要看场景来使用,如果是并发不太高的系统,使用AtomicLong可能会更好一些,而且内存需求也会小一些。
LongAdder中最核心的思想就是利用空间来换时间
,将热点value分散成一个Cell列表来承接并发的CAS,以此来提升性能。
我们看过sum()方法后可以知道LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差,如果要求是高精度的统计,还需要使用AtmicLong类才可以
。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。