赞
踩
有如下的需求,需要保证 account.withdraw() 取款方法的线程安全,代码如下:
- interface Account {
- // 获取余额
- Integer getBalance();
-
- // 取款
- void withdraw(Integer amount);
-
- /**
- * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
- * 如果初始余额为 10000 那么正确的结果应当是 0
- */
- static void demo(Account account) {
- List<Thread> ts = new ArrayList<>();
- long start = System.nanoTime();
- for (int i = 0; i < 1000; i++) {
- ts.add(new Thread(() -> {
- account.withdraw(10);
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- long end = System.nanoTime();
- System.out.println(account.getBalance()
- + " cost: " + (end - start) / 1000_000 + " ms");
- }
- }
- class AccountUnsafe implements Account {
- private Integer balance;
-
- public AccountUnsafe(Integer balance) {
- this.balance = balance;
- }
-
- @Override
- public Integer getBalance() {
- return balance;
- }
-
- @Override
- public void withdraw(Integer amount) {
- balance -= amount;
- }
-
- public static void main(String[] args) {
- Account.demo(new AccountUnsafe(10000));
- }
- }
原有的实现并不是线程安全的,执行结果如下所示:
为什么会出现线程安全问题?是因为在多线程的环境下取款的 withdraw() 方法里面是临界区,存在指令交错的行为。
首先想到的解决方式就是给 Account 对象加锁,如下代码:
- class AccountUnsafe implements Account {
- private Integer balance;
- public AccountUnsafe(Integer balance) {
-
- this.balance = balance;
- }
- @Override
- public Integer getBalance() {
- synchronized (this){
- return balance;
- }
- }
- @Override
- public void withdraw(Integer amount) {
- synchronized (this){
- balance -= amount;
- }
-
- }
- public static void main(String[] args) {
- Account.demo(new AccountUnsafe(10000));
- }
- }
运行结果如下,没有任何问题。
也可以通过无锁的方式解决上述的问题,如下代码:
- public class AccountSafe implements Account{
-
- private AtomicInteger balance;
- public AccountSafe(Integer balance) {
-
- this.balance = new AtomicInteger(balance);
- }
- @Override
- public Integer getBalance() {
- return balance.get();
- }
-
- @Override
- public void withdraw(Integer amount) {
- while(true){
- int prev = balance.get();
- int next = prev - amount;
- if(balance.compareAndSet(prev,next)){
- break;
- }
- }
- }
-
- public static void main(String[] args) {
- Account.demo(new AccountSafe(10000));
- }
- }
运行结果如下,没有任何问题。
在上一小结看到的使用 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
- @Override
- public void withdraw(Integer amount) {
- // 需要不断尝试,直到成功为止
- while(true){
- // 比如拿到了旧值 1000
- int prev = balance.get();
- // 在这个基础上 1000-10 = 990
- int next = prev - amount;
- /*
- * compareAndSet 会做一个检查,在 set 值之前先比较 prev 和当前值
- * 若 prev 值和当前值一致,则用 next 设置为新值,并返回 true 表示成功。
- * 若 prev 值和当前值不一致,则 next 作废,返回 false 表示失败,进入 while 下次循环重试
- * */
- if(balance.compareAndSet(prev,next)){
- break;
- }
- }
- }
这里面最关键的就是 compareAndSet() 方法,它的简称就是 CAS(也有 compare and swap 的说法),此方法是一个原子操作。
其实 CAS 的底层是 lock cmpxchg 指令,在单核 CPU 和多核 CPU 下都能够保证原子性。
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果。
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
将 CAS 和 volatile 结合使用可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
CAS 体现的是无锁并发、无阻塞并发。因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一;但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响 。
在 JUC 并发包下提供了一些原子整数的工具类,如:AtomicBoolean、AtomicInteger 和 AtomicLong,这几个类的用法类似,下面就以 AtomicInteger 为例,介绍下常用的方法。
- public class AtomicTest {
-
- public static void main(String[] args) {
- // 无参构造的初始值为 0,有参构造的初始值需要自己指定
- AtomicInteger i = new AtomicInteger(0);
-
- // 获取并自增,返回 0,类似于 i++
- System.out.println(i.getAndIncrement());
-
- // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
- System.out.println(i.incrementAndGet());
-
- // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
- System.out.println(i.decrementAndGet());
-
- // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
- System.out.println(i.getAndDecrement());
-
- // 获取并加值(i = 0, 结果 i = 5, 返回 0)
- System.out.println(i.getAndAdd(5));
-
- // 加值并获取(i = 5, 结果 i = 0, 返回 0)
- System.out.println(i.addAndGet(-5));
-
- // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
- // 其中函数中的操作能保证原子,但函数需要无副作用
- System.out.println(i.getAndUpdate(p -> p - 2));
-
- // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
- // 其中函数中的操作能保证原子,但函数需要无副作用
- System.out.println(i.updateAndGet(p -> p + 2));
-
- // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
- // 其中函数中的操作能保证原子,但函数需要无副作用
- // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
- // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
- System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
-
- // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
- // 其中函数中的操作能保证原子,但函数需要无副作用
- System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
- }
- }
为什么需要原子引用,因为共享的数据并不一定都是基本数据类型的,还有可能是小数类型,那么我们就可以使用原子引用来保证其中的共享变量操作时的线程安全。
原子引用分为如下几种:AtomicReference、AtomicMarkableReference、AtomicStampedReference
以最开始的例子为例,假设此时的账户余额是 BigDecimal 类型,我们就需要使用 AtomicReferenceAtomicReference,如下代码:
- interface Account {
- // 获取余额
- BigDecimal getBalance();
-
- // 取款
- void withdraw(BigDecimal amount);
-
- /**
- * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
- * 如果初始余额为 10000 那么正确的结果应当是 0
- */
- static void demo(Account account) {
- List<Thread> ts = new ArrayList<>();
- long start = System.nanoTime();
- for (int i = 0; i < 1000; i++) {
- ts.add(new Thread(() -> {
- account.withdraw(BigDecimal.TEN);
- }));
- }
- ts.forEach(Thread::start);
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- long end = System.nanoTime();
- System.out.println(account.getBalance()
- + " cost: " + (end - start) / 1000_000 + " ms");
- }
- }
- public class AccountSafe implements Account{
-
- private AtomicReference<BigDecimal> balance;
- public AccountSafe(BigDecimal balance) {
-
- this.balance = new AtomicReference(balance);
- }
- @Override
- public BigDecimal getBalance() {
-
- return balance.get();
- }
- @Override
- public void withdraw(BigDecimal amount) {
- // 需要不断尝试,直到成功为止
- while(true){
- BigDecimal prev = balance.get();
- // 调用 subtract 相当于减的操作
- BigDecimal next = prev.subtract(amount);
- if(balance.compareAndSet(prev,next)){
- break;
- }
- }
- }
- public static void main(String[] args) {
- Account.demo(new AccountSafe(new BigDecimal("10000")));
- }
- }
测试结果如下,没有任何问题。
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,这就是我们所说的 ABA 问题,如下代码:
- @Slf4j(topic = "c.test")
- public class Main8 {
- static AtomicReference<String> ref = new AtomicReference<>("A");
-
- public static void main(String[] args) throws InterruptedException {
- log.debug("main start...");
- // 获取值 A
- // 这个共享变量被它线程修改过?
- String prev = ref.get();
- other();
- Thread.sleep(1000);
- // 尝试改为 C
- log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
- }
-
- private static void other() throws InterruptedException {
- new Thread(() -> {
- log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
- }, "t1").start();
- Thread.sleep(500);
- new Thread(() -> {
- log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
- }, "t2").start();
- }
- }
输出结果如下:
如果主线程希望只要有其它线程动过了共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号 AtomicStampedReference。
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过 AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。如下代码:
- @Slf4j(topic = "c.test")
- public class Main9 {
- // 不光给变量一个初始值,还给一个初始的版本号
- static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
-
- public static void main(String[] args) throws InterruptedException {
- log.debug("main start...");
- // 获取值 A
- String prev = ref.getReference();
- // 获取版本号
- int stamp = ref.getStamp();
- log.debug("版本 {}", stamp);
- // 如果中间有其它线程干扰,发生了 ABA 现象
- other();
- Thread.sleep(1000);
- // 尝试改为 C,此时 compareAndSet 方法需要四个参数,当前值,期望值,当前版本号,期望版本号
- log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
- }
-
- private static void other() throws InterruptedException {
- new Thread(() -> {
- log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
- ref.getStamp(), ref.getStamp() + 1));
- log.debug("更新版本为 {}", ref.getStamp());
- }, "t1").start();
- Thread.sleep(500);
- new Thread(() -> {
- log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
- ref.getStamp(), ref.getStamp() + 1));
- log.debug("更新版本为 {}", ref.getStamp());
- }, "t2").start();
- }
- }
可以看到,解决了 ABA 问题,更新并没有成功。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,用一个布尔值就可以搞定,所以就有了 AtomicMarkableReference。
如下案例,主人要检查垃圾袋满没满,是否需要倒垃圾,如果满了则更换新的垃圾袋;如果还空着呢,就用原有的垃圾袋。此时还有另外一个线程保洁阿姨,她负责倒空垃圾袋里面的垃圾,但是她还是用原来的垃圾袋,如果此时主人检查垃圾袋是空的就不用再去更换垃圾袋了。
代码如下所示:
- @Slf4j(topic = "c.test")
- public class Main9 {
- public static void main(String[] args) throws InterruptedException {
- GarbageBag bag = new GarbageBag("装满了垃圾");
- // 参数2 mark 可以看作一个标记,表示垃圾袋满了
- AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
- log.debug("主线程 start...");
- GarbageBag prev = ref.getReference();
- log.debug(prev.toString());
- new Thread(() -> {
- log.debug("打扫卫生的线程 start...");
- bag.setDesc("空垃圾袋");
- while (!ref.compareAndSet(bag, bag, true, false)) {
- }
- log.debug(bag.toString());
- }).start();
- Thread.sleep(1000);
- log.debug("主线程想换一只新垃圾袋?");
- boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
- log.debug("换了么?" + success);
- log.debug(ref.getReference().toString());
- }
- }
-
- class GarbageBag {
- String desc;
-
- public GarbageBag(String desc) {
- this.desc = desc;
- }
-
- public void setDesc(String desc) {
- this.desc = desc;
- }
-
- @Override
- public String toString() {
- return super.toString() + " " + desc;
- }
- }
输出结果如下所示:
原子数据保护的是数组里面的元素,常用的原子数组类是 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,测试类的代码如下所示:
- @Slf4j(topic = "c.test")
- public class Main9 {
- public static void main(String[] args) throws InterruptedException {
- // 不安全的数组
- demo(
- () -> new int[10],
- (array) -> array.length,
- (array, index) -> array[index]++,
- array -> System.out.println(Arrays.toString(array))
- );
- // 安全的数组
- demo(
- () -> new AtomicIntegerArray(10),
- (array) -> array.length(),
- (array, index) -> array.getAndIncrement(index),
- array -> System.out.println(array)
- );
- }
-
- /**
- * 参数1,提供数组、可以是线程不安全数组或线程安全数组
- * 参数2,获取数组长度的方法
- * 参数3,自增方法,回传 array, index
- * 参数4,打印数组的方法
- */
- // supplier 提供者 无中生有 ()->结果
- // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
- // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
- private static <T> void demo(
- Supplier<T> arraySupplier,
- Function<T, Integer> lengthFun,
- BiConsumer<T, Integer> putConsumer,
- Consumer<T> printConsumer) {
- List<Thread> ts = new ArrayList<>();
- T array = arraySupplier.get();
- int length = lengthFun.apply(array);
- for (int i = 0; i < length; i++) {
- // 每个线程对数组作 10000 次操作
- ts.add(new Thread(() -> {
- for (int j = 0; j < 10000; j++) {
- putConsumer.accept(array, j % length);
- }
- }));
- }
- ts.forEach(t -> t.start()); // 启动所有线程
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }); // 等所有线程结束
- printConsumer.accept(array);
- }
- }
输出结果如下所示:
字段更新器保护的是对象里面的某个属性,即对这个属性进行原子操作,但是需要配合 volatile 修饰的字段使用,否则会出现异常,如下代码:
- public class Test5 {
- private volatile int field;
-
- public static void main(String[] args) {
- AtomicIntegerFieldUpdater fieldUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
- Test5 test5 = new Test5();
- fieldUpdater.compareAndSet(test5, 0, 10);
- // 修改成功 field = 10
- System.out.println(test5.field);
- // 修改成功 field = 20
- fieldUpdater.compareAndSet(test5, 10, 20);
- System.out.println(test5.field);
- // 修改失败 field = 20
- fieldUpdater.compareAndSet(test5, 10, 30);
- System.out.println(test5.field);
- }
- }
输出结果如下:
累加器顾名思义,就是对一个整数进行累加的操作,在 jdk8 以后,新增了几个专门用于累加操作的工具类,比如:LongAdder、LongAccumulator 等,他们的性能要比我们的 AtomicLong 性能要高很多。
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。如下测试代码:
- public class Test5 {
-
- public static void main(String[] args) {
- for (int i = 0; i < 5; i++) {
- demo(() -> new LongAdder(), adder -> adder.increment());
- }
- for (int i = 0; i < 5; i++) {
- demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
- }
- }
-
- private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
- T adder = adderSupplier.get();
- long start = System.nanoTime();
- List<Thread> ts = new ArrayList<>();
- // 4 个线程,每人累加 50 万
- for (int i = 0; i < 40; i++) {
- ts.add(new Thread(() -> {
- for (int j = 0; j < 500000; j++) {
- action.accept(adder);
- }
- }));
- }
- ts.forEach(t -> t.start());
- ts.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- long end = System.nanoTime();
- System.out.println(adder + " cost:" + (end - start) / 1000_000);
- }
- }
输出结果如下所示:可以看到,相差的时间还是蛮大的:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。