赞
踩
CAS的全称是Compare and swap, 它是一条CPU并发原语。
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。整个过程是原子性的。
CAS并发原语体现在JAVA语言就是sun.misc.Unsafe类中的各个方法。调用Unsafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。
在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁,出现性能问题。(关于synchronized参考文章解析synchronized实现原理_我不是欧拉_的博客-CSDN博客)。
锁机制存在以下问题:
volatile是不错的机制,但是volatile不能保证原子性(关于volatile参考文章解析volatile实现原理_我不是欧拉_的博客-CSDN博客)。因此对于同步最终还是要回到锁机制上来。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。
CAS可以看做是乐观锁的一种实现方式,所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
sun.misc.Unsafe中的递增操作就通过CAS自旋实现的。 CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。即用户模式下的线程同步,非内核模式
CAS流程图:
通过AtomicInteger类来研究在没有锁的情况下CAS是如何做到数据正确性的。
- public class AtomicInteger extends Number implements java.io.Serializable {
- private static final long serialVersionUID = 6214790243416807050L;
-
- // setup to use Unsafe.compareAndSwapInt for updates
- // 获取unsafe对象
- private static final Unsafe unsafe = Unsafe.getUnsafe();
- private static final long valueOffset;
-
- // objectFieldOffset()方法用于获取某个字段相对Java对象的“起始地址”的偏移量
- // 获取value字段相对于AtomicInteger对象起始位置的偏移量
- static {
- try {
- valueOffset = unsafe.objectFieldOffset
- (AtomicInteger.class.getDeclaredField("value"));
- } catch (Exception ex) { throw new Error(ex); }
- }
-
- // 借助volatile,保证线程间的数据的可见性
- private volatile int value;
-
- /**
- * Creates a new AtomicInteger with the given initial value.
- *
- * @param initialValue the initial value
- */
- // 构造函数初始化值
- public AtomicInteger(int initialValue) {
- value = initialValue;
- }
-
- public final int getAndIncrement() {
- return unsafe.getAndAddInt(this, valueOffset, 1);
- }
- }
-
getAndIncrement 调用的是 unsafe.getAndAddInt 方法。
- public final int getAndAddInt(Object var1, long var2, int var4) {
- int var5;
- do {
- // 获取value值
- var5 = this.getIntVolatile(var1, var2);
- // CAS自旋操作
- // 通过比较var1的值是否和期望值var5相同,如果相同就把var1的值换成var5 + var4(此时var4为1,即做了 +1操作)
- // 如果不相同就不断的尝试,直到相同为止
- } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
-
- return var5;
- }
Unsafe类存在于sun.misc包中是CAS的核心类,该类中的所有方法都是native修饰的。
由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务即Unsafe类可以直接操作特定内存的数据。
getAndAddInt方法中的valueOffset,表示该变量值在内存中的偏移地址,而Unsafe类就是根据内存偏移地址获取数据的。
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
compareAndSwapInt方法是判断内存某个位置的值是否是预期值,如果是就更改为最新的值,这个过程是原子的。这是因为CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成的,用户完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断。因此CAS是一条CPU原子指令,不会造成所谓的数据不一致问题。
伪代码如下:
- if (this == expect) {
-
- this = update;
-
- return true;
-
- } else {
-
- return false;
- }
- public final native boolean compareAndSwapInt(Object o, long offset,
- int expected,
- int x);
可以看到这是个本地方法调用。这个本地方法在 openjdk 中依次调用的 c++ 代码为:unsafe.cpp,atomic.cpp 和 atomic_windows_x86.inline.hpp。
1. unsafe.cpp
- UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
- UnsafeWrapper("Unsafe_CompareAndSwapInt");
- oop p = JNIHandles::resolve(obj);
- // 根据偏移量获取对象的变量的地址
- // 这里的offset就是AtomicInteger的valueOffset
- jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
- // 调用Atomic中的的函数cmpxchg,该函数声明在atomic.cpp中
- // Atomic::cmpxchg(x, addr, e) cas逻辑 x:要交换的值 e:要比较的值
- // cas成功,返回期望值e,等于e,此方法返回true
- // cas失败,返回内存中的value值,不等于e,此方法返回false
- return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
- UNSAFE_END
2. atomic_windows_x86.inline.hpp
cmpxchg函数根据不同操作系统和不同CPU会有不同的实现。这里以 windows 操作系统,X86 处理器为例,最终实现在 openjdk 的如下位置:openjdk-7-fcs-src-b147-27_jun_2011\openjdk\hotspot\src\os_cpu\windows_x86\vm\ atomic_windows_x86.inline.hpp(对应于 windows 操作系统,X86 处理器)。下面是对应于 intel x86 处理器的源代码的片段:
-
- // Adding a lock prefix to an instruction on MP machine
- // VC++ doesn't like the lock prefix to be on a single line
- // so we can't insert a label after the lock prefix.
- // By emitting a lock prefix, we can define a label after it.
- #define LOCK_IF_MP(mp) __asm cmp mp, 0 \
- __asm je L0 \
- __asm _emit 0xF0 \
- __asm L0:
-
- inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
- // alternative for InterlockedCompareExchange
- int mp = os::is_MP();
- __asm {
- mov edx, dest
- mov ecx, exchange_value
- mov eax, compare_value
- LOCK_IF_MP(mp)
- cmpxchg dword ptr [edx], ecx
- }
- }
方法解析:
1. mp是“os::is_MP()”的返回结果,“os::is_MP()”是一个内联函数,用来判断当前系统是否为多处理器(Multi Processor )。如果当前系统是多处理器,该函数返回1。否则,返回0。
2. LOCK_IF_MP(mp)会根据mp的值来决定是否为cmpxchg指令添加lock前缀。
3. cmpxchg是汇编指令,作用是比较并交换操作数。
总结:通过以上代码得到了一条带 lock 的 cmpxchg 指令,即 lock cmpxchg dword ptr [edx], ecx ,这是一个轻量的lock指令,可以让CPU保证原子性的操作。
现代处理器指令集架构基本上都会提供 CAS 指令,例如 x86 和 IA-64 架构中的 cmpxchgl 指令和 comxchgq 指令,sparc 架构中的 cas 指令和 casx 指令。 不管是 Hotspot 中的 Atomic::cmpxchg 方法,还是 Java 中的 compareAndSwapInt 方法,它们本质上都是对相应平台的 CAS 指令的一层简单封装。CAS 指令作为一种硬件原语,有着天然的原子性,这也正是 CAS 的价值所在。
intel手册对lock前缀的说明如下:
- 确保对内存的读-改-写操作原子执行。在Pentium及Pentium之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从Pentium 4,Intel Xeon及P6处理器开始,intel在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在lock前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读/写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做缓存锁定(cache locking),缓存锁定将大大降低lock前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
- 禁止该指令与之前和之后的读和写指令重排序。
- 把写缓冲区中的所有数据刷新到内存中。
上面的第1点保证了CAS操作是一个原子操作,第2点和第3点所具有的内存屏障效果,保证了CAS同时具有volatile读和volatile写的内存语义。
当有多个线程对一个原子类进行操作的时候,某个线程在短时间内将原子类的值 A 修改为 B,又马 上将其修改为 A,此时其他线程无感知,还是会修改成功。
ABA问题发生的概率很小,有时候发生了也不会产生什么问题,比如:对某个做加减法,不关心数字过程,那么ABA问题发生了也没啥。但是如果和生活中实际业务有关,ABA就会产生很大的问题,比如:仓库有一批货物,中途拿走一批去使用产生收益,然后还回这批货物,货物总数没变却产生了不合法的收益。
ABA问题解决方案:使用版本号。例如:数据库有个锁称为乐观锁,是一种基于数据版本实现数据同步的机制,每次修改一次数据,版本就会进行累加。 同样,Java也提供了相应的原子引用类 AtomicStampedReference<V>,代码如下:
- public class AtomicStampedReference<V> {
-
- // 私有内部类,用于存储实际对象和版本
- private static class Pair<T> {
-
- // 实际存储的对象
- final T reference;
-
- // 版本号 int类型
- final int stamp;
-
- // 内部类构造函数
- private Pair(T reference, int stamp) {
- this.reference = reference;
- this.stamp = stamp;
- }
-
- // 初始化pair对象
- static <T> Pair<T> of(T reference, int stamp) {
- return new Pair<T>(reference, stamp);
- }
- }
-
- // volatile修饰保证可见性
- private volatile Pair<V> pair;
-
- /**
- * Creates a new {@code AtomicStampedReference} with the given
- * initial values.
- *
- * @param initialRef the initial reference
- * @param initialStamp the initial stamp
- */
- // AtomicStampedReference构造函数,实例并初始化pair对象
- public AtomicStampedReference(V initialRef, int initialStamp) {
- pair = Pair.of(initialRef, initialStamp);
- }
-
- // expectedReference 期望的对象
- // newReference 新的对象
- // expectedStamp 期望的版本
- // newStamp 新的版本
- public boolean compareAndSet(V expectedReference,
- V newReference,
- int expectedStamp,
- int newStamp) {
- Pair<V> current = pair;
- return
- // 1. 比较期望对象和期望版本是否和当前的对象和版本相同
- // 2. 如果相同,则说明没有线程修改,则进行casPair方法
- // 3. casPair调用compareAndSwapObject,将当前对象和版本改为新的对象和版本
- // 4. 返回true。
- // 5. 如果不相同,则说明有其他线程进行了修改,返回false,不执行casPair方法进行交换
- expectedReference == current.reference &&
- expectedStamp == current.stamp &&
- ((newReference == current.reference &&
- newStamp == current.stamp) ||
- casPair(current, Pair.of(newReference, newStamp)));
- }
-
- private static final long pairOffset =
- objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
-
- private boolean casPair(Pair<V> cmp, Pair<V> val) {
- return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
- }
-
-
- }
在高并发场景下,CAS多线程更新的时候,会造成大量线程进行自旋,消耗CPU资源。这样会出现性能问题(当然比synchronized还是要好很多)。
解决方案:为了解决这一个问题,Java 8提供的一个对AtomicLong改进后的一个类,LongAdder。它具备一个分段CAS的原子操作类。让我们来看一下它的分段CAS优化的思想。
当某一个线程如果对一个值更新是,可以看对这个值进行分段更新,每一段叫做一个Cell,在更新每一个Cell的时候,如果很难更新它的值,出现了多次 CAS失败了,无限循环自旋的时候,进行自动迁移段,它会去尝试更新别的分段Cell的值,这样的话就可以让一个线程不会盲目的CAS自旋等待一个更新分段cell的值。这有点类似于分段锁的意思。
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
Java 的 CAS 会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读 - 改 - 写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读 - 改 - 写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读 - 改 - 写操作的原子指令)。同时,volatile 变量的读 / 写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:
AQS(参考解析AQS实现原理_我不是欧拉_的博客-CSDN博客),非阻塞数据结构,原子变量类(java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent 包的实现示意图如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。