当前位置:   article > 正文

浅谈 CAS 及其缺点和解决方案_cas的问题

cas的问题

什么是 CAS 机制?

CAS 英文就是 compare and swap ,也就是比较并交换,首先它是一个原子操作,可以避免被其他线程打断。在Java并发中,我们最初接触的应该就是synchronized关键字了,但是synchronized属于重量级锁,很多时候会引起性能问题,虽然在新的JDK中对其已经进行了优化。volatile也是个不错的选择,但是volatile不能保证原子性,只能在某些场合下使用。那么问题来了,这个 CAS 机制是怎么在不加锁的情况下来保证共享资源的互斥呢?

synchronized补充:首先,synchronized会对第一个线程会有偏向,所以会给第一个线程添加偏向锁,如果偏向锁有其他线程来竞争时,这个锁会升级,变成轻量级锁(多数情况下是自旋锁),再如果这个锁在一定次内还是拿不到共享资源,这个轻量级的锁会进一步升级,成为重量级锁。

共享资源:通俗来讲就是指那些可以被多个不同线程一起使用的数据。

首先我们先来看一个例子

  • 当我们有很多个线程同时访问一个贡献资源时,我们无法保证我们的贡献资源只被一个线程使用,如果要保证只有一个线程使用时,我们最先想到的一般就是加锁。当然我们先看不加锁的情况:
  1. java复制代码 static volatile int NUMBER = 0; //volatile可以保证我们每次取值都是取内存的值(最新值),而不是取栈里面的缓存值
  2. private static void add() throws InterruptedException {
  3. Thread.sleep(5); //模拟线程处理数据
  4. NUMBER++;
  5. }
  6. public static void main(String[] args) throws InterruptedException {
  7. CountDownLatch countDownLatch = new CountDownLatch(100);//栅栏,底层为AQS实现,当里面值变为0时,栅栏才打开继续运行下面代码
  8. Long start = System.currentTimeMillis();
  9. for (int i = 0; i < 100; i++){
  10. new Thread(new Runnable() {
  11. @Override
  12. public void run() {
  13. for (int j = 0; j<100; j++){
  14. try {
  15. add();
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. countDownLatch.countDown();
  21. }
  22. }).start();
  23. }
  24. countDownLatch.await();//等待所有线程运行完
  25. System.out.println("最后执行结果:" + NUMBER + "。执行时间为:" + (System.currentTimeMillis() - start) + "ms");
  26. }
  27. //最后执行结果:9637。执行时间为:600ms

当我们在add方法上加上一个锁时,我们可以看到由于加完了锁我们的线程速度慢了很多,当然我们有模拟了线程运行数据的情况。那我们要怎么去优化我们的代码呢?

  • 首先我们要剖析add方法里面的代码,一方面我们在方法上加了锁,导致线程只能串行化,而且线程在方法中sleep了5毫秒,才进行NUMBER++,另一方面**我们在进行NUMBER++中,NUMBER++也并非原子操作。**我们看到代码的注释:
  1. java复制代码 private synchronized static void add() throws InterruptedException {
  2. Thread.sleep(5);
  3. /**
  4. * NUMBER++ 被拆分成三步
  5. * 1、从内存中获取 NUMBER 的值,赋给 A : A = NUMBER
  6. * 2、对 A 进行 +1 得到 B: B = A + 1
  7. * 3、把 B 的值写回 NUMBERNUMBER = B
  8. * 在不加锁的情况下,如果有A.B两个线程同时执行count++,他们通知执行到上面步骤的第一步,得到count是一样的,而当第 3 步操作结束后,count只加1,导致count结果不一致
  9. */
  10. NUMBER++;
  11. }
  12. //最后执行结果:10000。执行时间为:56534ms
  • 理清 JVM 的运行过程,我们可以发现其实把锁加在第三步就行,这样可以保证原子性够小,但是我们要怎么做呢?(重点)

CAS(compare and swap) 就是一种乐观锁,比较和交换,原理是: 它有3个操作数,内存值 V,旧的期望值 expectNumber,要修改的新值 newNumber。当且仅当预期值A和内存值V相同时(比较),它就认为这个期间没有人来访问过这个贡献资源。所以就把这个值改为新值(交换)。

  1. java复制代码 static volatile int NUMBER = 0;//共享资源
  2. private static void add() throws InterruptedException {
  3. Thread.sleep(5);
  4. /**
  5. * NUMBER++ 被拆分成三步
  6. * 1、从内存中获取 NUMBER 的值,赋给 A : A = NUMBER
  7. * 2、对 A 进行 +1 得到 B: B = A + 1
  8. * 3、把 B 的值写回 NUMBERNUMBER = B
  9. * 升级第三步:
  10. * 1、获取锁
  11. * 2、获取 NUMBER 的最新值,记作LV
  12. * 3、判断 LV 是否等于 A,如果相等,则将 B 的值赋给NUMBER,并且返回true,如果不相等返回false,线程再自旋请求修改值
  13. */
  14. int expectNumber;
  15. for (;;)//自旋修改,知道修改成功
  16. if (compareAndSwap((expectNumber=getNumber()),expectNumber+1))
  17. break;
  18. }
  19. /**
  20. * 模拟底层 sun.misc.Unsafe 的 compareAndSwap 方法,所以这里我加了锁,底层并不是加锁,它的底层是native修饰的,是方法区的方法,由C或者C++写的,它是对数据总线加锁,synchronized是对字节码加锁。
  21. * @param expectNumber 期望值
  22. * @param newNumber 要交换的值
  23. * @return 返回交换是否成功
  24. */
  25. public static synchronized boolean compareAndSwap(int expectNumber, int newNumber){
  26. if (expectNumber == getNumber()){
  27. NUMBER = newNumber;
  28. return true;
  29. }
  30. return false;
  31. }
  32. public static int getNumber(){return NUMBER;}
  33. //最后执行结果:10000。执行时间为:596ms

我们来看看 JDK 对 CAS 的支持

首先,我们上面模拟的 compareAndSwap 其实是再 sun.misc.Unsafe 包里面的,JDK 并不希望开发者去使用这个包,这个包里面有三个相关的方法

  1. java复制代码 public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
  2. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  3. public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
  4. //var1:表示要访问的对象,对象引用地址
  5. //var2:表示偏移量,即属性再要访问对象地址的偏移量
  6. //var4:表示要修改的数据的期望值
  7. //var5:表示要修改为的新值

CAS 的实现原理

CAS 通过 JNI(Java Native Interface)的代码来实现,允许java调用其他语言。而 compareAndSwap 的方法就是借用C语言来调用CPU底层的指令(cmpxchg)来实现的。cmpxchg是一个原子指令,这个指令是给数据总线进行加锁,所以是线程安全的。

CAS 其实也是有缺点的

1、首先就是经典的ABA问题

CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。这就是CAS的ABA问题。

2、就是循环时间长开销大

如果CAS不成功,则会原地自旋,如果长时间自旋会给CPU带来非常大且没必要的开销。

3、只能保证一个共享变量的原子操作

只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用CAS来操作间。从Java1.5开始DK提供了AtomicRefrence类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。


如何解决 ABA 问题

常见的解决思路是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。 目前在JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

  1. java复制代码//没有版本号的情况
  2. static AtomicInteger a = new AtomicInteger(1);
  3. public static void main(String[] args) throws InterruptedException {
  4. System.out.println("初始值:"+a.get());
  5. new Thread(new Runnable() {
  6. @Override
  7. public void run() {
  8. int expectNum = a.get();//拿到最初的值
  9. int newNum = expectNum+1;
  10. try {
  11. Thread.sleep(1000); //先休眠一会,保证是最后执行的
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. boolean isSuccess = a.compareAndSet(expectNum, newNum);
  16. System.out.println("主线程执行结果:" + isSuccess + ",值为:" + a.get());
  17. }
  18. },"主线程").start();
  19. new Thread(new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. Thread.sleep(20);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. a.incrementAndGet();
  28. System.out.println("干扰线程执行结果:" + a.get());
  29. a.decrementAndGet();
  30. System.out.println("干扰线程执行结果:" + a.get());
  31. }
  32. },"干扰线程").start();
  33. }
  34. 初始值:1
  35. 干扰线程执行结果:2
  36. 干扰线程执行结果:1
  37. 主线程执行结果:ture,值为:2

上面就是我们模拟的ABA情况,那么使用AtomicStampedReference可以怎么解决呢?我们先看到AtomicStampedReference里面关于创建对象和修改版本号和值的 API 。

  1. java复制代码 //initialRef为初始值,initialStamp为初始版本号
  2. public AtomicStampedReference(V initialRef, int initialStamp) {
  3. pair = Pair.of(initialRef, initialStamp);
  4. }
  5. /**
  6. * Atomically sets the value of both the reference and stamp
  7. * to the given update values if the
  8. * current reference is {@code ==} to the expected reference
  9. * and the current stamp is equal to the expected stamp.
  10. *
  11. * @param expectedReference the expected value of the reference 期望的引用
  12. * @param newReference the new value for the reference 引用的新值
  13. * @param expectedStamp the expected value of the stamp 期望的版本号
  14. * @param newStamp the new value for the stamp 新值的版本号
  15. * @return {@code true} if successful
  16. */
  17. public boolean compareAndSet(V expectedReference,
  18. V newReference,
  19. int expectedStamp,
  20. int newStamp) {
  21. Pair<V> current = pair;
  22. return
  23. expectedReference == current.reference &&
  24. expectedStamp == current.stamp &&
  25. //前两个判断当前的版本或者当前的值是否被改过,改过就直接返回false
  26. ((newReference == current.reference &&
  27. newStamp == current.stamp)
  28. //上面两个判断当前的版本号和新值是否和要修改后的一致,一致就不进行修改
  29. //如果不一致,就修改当前的版本值和当前值
  30. ||
  31. casPair(current, Pair.of(newReference, newStamp)));
  32. }

由于每个过程值都会有对应的版本,所以我们在修改过程中需要传入期望版本和当前的值,数据库的多版本并发控制也类似,我们先来看一下修改后的代码:

  1. java复制代码static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1),1);
  2. public static void main(String[] args) throws InterruptedException {
  3. System.out.println("初始值:"+a.getReference());
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. //获取需要的参数
  8. int expectReference = a.getReference();
  9. int newReference = expectReference + 1;
  10. int expectStamp = a.getStamp();
  11. int newStamp = expectStamp + 1;
  12. try {
  13. Thread.sleep(1000);//睡眠保证最后执行
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. //传入我们之前拿到的期望版本号和期望的值
  18. boolean isSuccess = a.compareAndSet(expectReference, newReference, expectStamp, newStamp);
  19. System.out.println("主线程执行结果:" + isSuccess + ",值为:" + a.getReference());
  20. }
  21. },"主线程").start();
  22. new Thread(new Runnable() {
  23. @Override
  24. public void run() {
  25. try {
  26. Thread.sleep(20);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. //模拟修改ABA模式
  31. a.compareAndSet(a.getReference(),a.getReference()+1,a.getStamp(),a.getStamp()+1);
  32. System.out.println("干扰线程执行结果:" + a.getReference());
  33. a.compareAndSet(a.getReference(),a.getReference()-1,a.getStamp(),a.getStamp()+1);
  34. System.out.println("干扰线程执行结果:" + a.getReference());
  35. }
  36. },"干扰线程").start();
  37. }
  38. 初始值:1
  39. 干扰线程执行结果:2
  40. 干扰线程执行结果:1
  41. 主线程执行结果:false,值为:1

我们可以看到执行结果是false,也就是说只要值被动过,就会修改失败,但是版本号我们需要人工维护。当然,如果我们对于业务的过程不是很注重的话也不需要去关注ABA问题,也不需要去维护版本号,而如果涉及到重要业务(转账),则需要解决ABA问题。

如何解决循环时间长开销大问题

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

总结:

  • CAS 可以实现原子性的操作,说白了就是一个native的API,解决并发带来的问题,但是也存在一些自身的问题。
  • 如何解决CAS自身带来的问题。
  • 明白他的应用,我们在读Concurrent包下的类的源码时,发现无论是ReenterLock内部的AQS(后续会出博客讲到),还是各种Atomic开头的原子类,内部都应用到了CAS。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/303172
推荐阅读
  

闽ICP备14008679号