赞
踩
本文想通过简单转账过程,来谈谈如何活用java中的锁和相关同步工具,这也常在java面试的思维扩展题中被问到。
上一篇文章主要介绍了java的内存模型,jmm通过定义一套规范,使jvm能按需禁用cpu缓存导致的可见性问题和编译优化导致的有序性问题。这套规范包括对volatile,synchronized和final三个关键字的解析,和7个Happens-Before规则。
但是由线程上下文切换导致的原子性问题又该如何解决呢?
要实现多个操作的原子性执行,最简单的思路就是加锁,在java中我们可以使用synchronized锁或者ReentrantLock。
对于synchronized锁而言,通过管程中锁的规则: 对一个锁的解锁Happens-Before于后续对这个锁的加锁。可知,一个线程的解锁操作对后一个线程的加锁操作可见,综合Happens-Before的传递性规则,我们就能得出前一个线程在临界区修改的共享变量,对后续进入临界区的线程是可见的。
对于锁的使用而言,我们需要以下几点:
我们还需要注意一点: 受保护资源和锁之间的合理关系应该是N:1的关系,也就是可以用一把锁保护多个资源,但是不能用多把锁保护一个资源。
还有一点需要注意,当我们要保护多个资源时,首先需求区分这些资源是否存在关联关系。
账户类 Account 有两个成员变量,分别是账户余额 balance 和账户密码 password :
class Account { private Integer balance; private String password; void withdraw(Integer amt) { if (this.balance > amt) { this.balance -= amt; } } Integer getBalance() { return balance; } void updatePassword(String pw) { password = pw; } String getPassword() { return password; } }
如何确保取款,查询余额和更新密码,查看密码这些操作的并发安全性?
class Account { private final Object balLock = new Object(); private final Object pwdLock = new Object(); private Integer balance; private String password; void withdraw(Integer amt) { synchronized (balLock) { if (this.balance > amt) { this.balance -= amt; } } } Integer getBalance() { synchronized (balLock) { return balance; } } void updatePassword(String pw) { synchronized (pwdLock) { password = pw; } } String getPassword() { synchronized (pwdLock) { return password; } } }
如果多个资源是有关联关系的,那这个问题就有点复杂了。例如银行业务里面的转账操作,账户 A 减少 100 元,账户 B 增加 100 元。这两个账户就是有关联关系的。那对于像转账这种有关联关系的操作,我们应该怎么去解决呢?
class Account {
private int balance;
// 转账
void transfer(Account target, int amt) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
class Account {
private int balance;
// 转账
public synchronized void transfer(
Account target, int amt) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
上面这种做法是错误的,因为this保护的是当前Account自己的余额,他保护不了target的余额,就像你不能用自家的锁来保护别人家的资产一样,you konw ?
上面这种写法何时会出现问题呢?
我们假设线程 1 执行账户 A 转账户 B 的操作,线程 2 执行账户 B 转账户 C 的操作。这两个线程分别在两颗 CPU 上同时执行,那它们是互斥的吗?
我们期望是,但实际上并不是。因为线程 1 锁定的是账户 A 的实例(A.this),而线程 2 锁定的是账户 B 的实例(B.this),所以这两个线程可以同时进入临界区 transfer()。同时进入临界区的结果是什么呢?
线程 1 和线程 2 都会读到账户 B 的余额为 200,导致最终账户 B 的余额可能是 300(线程 1 后于线程 2 写 B.balance,线程 2 写的 B.balance 值被线程 1 覆盖),可能是 100(线程 1 先于线程 2 写 B.balance,线程 1 写的 B.balance 值被线程 2 覆盖),就是不可能是 200。
上面之所以会出问题,是因为this对象只能保护一个资源,而不能保护临界区的所有资源,所以我们需要一把大锁能覆盖所有受保护的资源,最简单的做法就是使用类级别锁:
方案一: 让所有对象都持有一个唯一性的对象,这个对象在创建 Account 时传入
class Account { private int balance; private final Object globalLock; public Account(Object globalLock) { this.globalLock = globalLock; } public void transfer(Account target, int amt) { synchronized (globalLock) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } }
评价: 这个办法确实能解决问题,但是有点小瑕疵,它要求在创建 Account 对象的时候必须传入同一个对象,如果创建 Account 对象时,传入的 lock 不是同一个对象,那可就惨了,会出现锁自家门来保护他家资产的荒唐事。在真实的项目场景中,创建 Account 对象的代码很可能分散在多个工程中,传入共享的 lock 真的很难。
方案二: 用 Account.class 作为共享的锁。Account.class 是所有 Account 对象共享的,而且这个对象是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。使用 Account.class 作为共享的锁,我们就无需在创建 Account 对象时传入了,代码更简单。
class Account {
private int balance;
// 转账
public void transfer(Account target, int amt) {
synchronized (Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
关联关系本质是一种"原子性"特征,原子性的本质是多个资源间有一致性的要求,操作的中间状态对外不可见。
解决原子性问题,关键在于保证中间状态对外不可见。
注意事项:
原因是 balance 字段和 password 字段是可变的。但说它是可变的并不是指字段指向 Integer 对象和 String 对象是可变的(事实上在 openjdk 11 里实测 int 基本类型做引导锁并不会自动装箱,需要显式转换为包装类才能通过编译),而是指引用变量本身由于没有被 final 修饰所以是可变的,所以如果某处修改了引用指向的对象,就会出现 “多个锁管理同一个共享资源” 的问题。在这一阶段,问题的核心是 “引用可变”。
但在我们的例子中,哪怕字段被 final 修饰了,依然是不妥当的。这主要是因为 IntCache 和字符串常量池的存在,因此会出现不必要的锁竞争,从而降低系统性能。在这一阶段,问题的核心不是“锁对象本身可变”,而是可能存在 “锁复用问题”。
但本身用不会产生复用的 Integer 和 String 对象作为锁理论上应该是没有问题,比如用 new Integer() 或者 new String() 在堆上创建新的对象,和创建一个 Object 实例是一样的。
原因:举个例子,假如this.balance = 10 ,多个线程同时竞争同一把锁this.balance,此时只有一个线程拿到了锁,其他线程等待,拿到锁的线程进行this.balance -= 1操作,this.balance = 9。 该线程释放锁, 之前等待锁的线程继续竞争this.balance=10的锁,新加入的线程竞争this.balance=9的锁,导致多个锁对应一个资源
LCK01-J. Do not synchronize on objects that may be reused
现实世界中的转账过程都是可以并行的,而我们上面的解决方案会使得所有转账过程都变为串行化执行,要解决这个问题,我们需要从现实中寻找灵感。
我们试想在古代,没有信息化,账户的存在形式真的就是一个账本,而且每个账户都有一个账本,这些账本都统一存放在文件架上。
银行柜员在给我们做转账时,要去文件架上把转出账本和转入账本都拿到手,然后做转账。这个柜员在拿账本的时候可能遇到以下三种情况:
上面这个过程在编程的世界里怎么实现呢?其实用两把锁就实现了,转出账本一把,转入账本另一把。
在 transfer() 方法内部,我们首先尝试锁定转出账户 this(先把转出账本拿到手),然后尝试锁定转入账户 target(再把转入账本拿到手),只有当两者都成功时,才执行转账操作。
class Account { private int balance; // 转账 void transfer(Account target, int amt) { // 锁定转出账户 synchronized (this) { // 锁定转入账户 synchronized (target) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } } }
相对于用 Account.class 作为互斥锁,锁定的范围太大,而我们锁定两个账户范围就小多了,这样的锁,上一节我们介绍过,叫细粒度锁。使用细粒度锁可以提高并行度,是性能优化的一个重要手段。
但是,使用细粒度锁是有代价的,这个代价就是可能会导致死锁。
死锁: 一组相互竞争资源的现场因为互相等待,导致永久的阻塞的现象。
死锁产生必须具备的四个条件:
解决死锁的最好办法是避免死锁,因此我们只需要破坏上面其中任意一个条件,就可以打破死锁:
上面是理论分析,下面我们来落实到代码层面解决问题。
从理论上讲,要破坏这个条件,可以一次性申请所有资源。在现实世界里,就拿前面我们提到的转账操作来讲,它需要的资源有两个,一个是转出账户,另一个是转入账户,当这两个账户同时被申请时,我们该怎么解决这个问题呢?
可以增加一个账本管理员,然后只允许账本管理员从文件架上拿账本,也就是说柜员不能直接在文件架上拿账本,必须通过账本管理员才能拿到想要的账本。例如,张三同时申请账本 A 和 B,账本管理员如果发现文件架上只有账本 A,这个时候账本管理员是不会把账本 A 拿下来给张三的,只有账本 A 和 B 都在的时候才会给张三。这样就保证了“一次性申请所有资源”。
对应到编程领域,“同时申请”这个操作是一个临界区,我们也需要一个角色(Java 里面的类)来管理这个临界区,我们就把这个角色定为 Allocator。它有两个重要功能,分别是:同时申请资源 apply() 和同时释放资源 free()。
账户 Account 类里面持有一个 Allocator 的单例(必须是单例,只能由一个人来分配资源
)。当账户 Account 在执行转账操作的时候,首先向 Allocator 同时申请转出账户和转入账户这两个资源,成功后再锁定这两个资源;当转账操作执行完,释放锁之后,我们需通知 Allocator 同时释放转出账户和转入账户这两个资源。
class Account { // actr应该为单例 -- 饿汉式直接初始化 private static final Allocator actr = new Allocator(); private int balance; // 转账 void transfer(Account target, int amt) { // 一次性申请转出账户和转入账户,直到成功 while (!actr.apply(this, target)) ; try { // 锁定转出账户 synchronized (this) { // 锁定转入账户 synchronized (target) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } } finally { actr.free(this, target, false); } } public static class Allocator { /** * 资源:占用情况(true表示被占用,false表示未被占用) */ private final Map<Object, Boolean> als = new HashMap<>(); // 一次性申请所有资源 synchronized boolean apply(Object from, Object to) { if (als.getOrDefault(from, false) || als.getOrDefault(to, false)) { return false; } als.put(from, true); als.put(to, true); return true; } // 归还资源,如果资源确定不会用到,设置remove为true synchronized void free(Object from, Object to, boolean remove) { if (remove) { als.remove(from); als.remove(to); } else { als.put(from, false); als.put(to, false); } } } }
破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的 , 利用Lock接口提供的超时等待获取锁方法可以破坏不可抢占条件。
破坏这个条件,需要对资源进行排序,然后按序申请资源。这个实现非常简单,我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。
class Account { private int id; private int balance; // 转账 void transfer(Account target, int amt) { Account first = target.id <= this.id ? target : this; Account second = target.id <= this.id ? this : target; //锁定序号小的账户 synchronized (first) { //锁定序号大的账户 synchronized (second) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } } }
使用细粒度锁锁定多个资源时,要注意死锁问题。
预防死锁问题主要是破坏三个条件的其中一个:
注意:
synchronized(Account.class) 锁了Account类相关的所有操作,只要与Account有关联,通通需要等待当前线程操作完成。而破坏占有并等待条件案例中的while死循环的方式只锁定了当前操作的两个相关的对象。两种影响到的范围不同。
在细粒度锁一节中,我们会面临死锁问题,我们可以通过一次性申请所有需要的资源破坏占有且等待条件,但是上面的案例中,如果不能一次性申请到所有的所需资源,我们是通过死循环的方式不断重试的。
// 一次性申请转出账户和转入账户,直到成功
while (!actr.apply(this, target)) ;
该方案在并发冲突量很大的情况下不适用,因为可能要循环上万次才能获取到锁,太消耗CPU了。
how to deal with it ?
在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。
等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。
在并发程序中,当一个线程进入临界区后,由于某些条件不满足,需要进入等待状态,Java 对象的 wait() 方法就能够满足这种需求。如上图所示,当调用 wait() 方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列
。 线程在进入等待队列的同时,会释放持有的互斥锁
,线程释放锁后,其他线程就有机会获得锁,并进入临界区了。
那线程要求的条件满足时,该怎么通知这个等待的线程呢?很简单,就是 Java 对象的 notify() 和 notifyAll() 方法。我在下面这个图里为你大致描述了这个过程,当条件满足时调用 notify(),会通知等待队列(互斥锁的等待队列
)中的线程,告诉它条件曾经满足过。
为什么说是曾经满足过呢?因为 notify() 只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)
。这一点你需要格外注意。
除此之外,还有一个需要注意的点,被通知的线程要想重新执行,仍然需要获取到互斥锁(因为曾经获取的锁在调用 wait() 时已经释放了)。
上面我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,所以如果 synchronized 锁定的是 this,那么对应的一定是 this.wait()、this.notify()、this.notifyAll();如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException。
利用wait-notify对资源分配器代码进行优化:
public static class Allocator { /** * 资源:占用情况(true表示被占用,false表示未被占用) */ private final Map<Object, Boolean> als = new HashMap<>(); // 一次性申请所有资源 synchronized void apply(Object from, Object to) { //唤醒后,先看能不能抢到锁,抢到锁了,还要看此时条件满不满足,不满足就继续休眠 while (als.getOrDefault(from, false) || als.getOrDefault(to, false)) { //可以考虑先重试指定次数 try { this.wait(); } catch (InterruptedException e) {} } als.put(from, true); als.put(to, true); } // 归还资源,如果资源确定不会用到,设置remove为true synchronized void free(Object from, Object to, boolean remove) { if (remove) { als.remove(from); als.remove(to); } else { als.put(from, false); als.put(to, false); } this.notifyAll(); } }
等待 - 通知机制是一种非常普遍的线程间协作的方式。工作中经常看到有同学使用轮询的方式来等待某个状态,其实很多情况下都可以用今天我们介绍的等待 - 通知机制来优化。
注意点:
wait() 方法和 sleep() 方法都能让当前线程挂起一段时间,那它们的区别是什么?
由线程上下文切换导致的原子性问题可以通过锁来解决,我们可以直接使用jvm层面提供的synchronized锁。
使用锁时,我们需要注意以下几点:
对于没有关联关系的多个资源而言,通常都是一个资源对应一把锁,这种锁我们也称之为细粒度锁。
对于存在关联关系的多个资源而言,最直接的想法就是使用一把能够覆盖所有资源的锁,进一步优化的想法就是使用细粒度锁,例如转账过程中先锁定转出账户,再锁定转入账户,避免大粒度锁锁定所有账户。
细粒度锁容易导致死锁问题的发生,死锁问题必须具备: 互斥,占有并等待,不可抢占,循环等待这四个条件,我们只需要打破其中一个条件即可。
这里补充一道面试题,看看大家对并发的理解程度:
开启 3 个线程,这三个线程的 ID 分别为 A、B、C,每个线程将自己的 ID 在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC……
线程A输出完自己的ID,接着线程B才能输出自己的ID,然后是线程C,这里面蕴含等待唤醒关系,因此首先想到条件变量实现多个线程之间的同步,这里给出一个我自己写的答案,解法不一定唯一:
public class Main { private static final int count = 10; private static final ReentrantLock lock = new ReentrantLock(); private static final Condition aOver = lock.newCondition(); private static final Condition bOver = lock.newCondition(); private static final Condition cOver = lock.newCondition(); private static int num = 1; public static void main(String[] args) throws InterruptedException, ExecutionException { Thread t1 = new Thread(() -> { task(() -> {System.out.print("A");num=2;}, cOver, aOver, 1); },"A"); Thread t2 = new Thread(() -> { task(() -> {System.out.print("B");num=3;}, aOver, bOver, 2); },"B"); Thread t3 = new Thread(() -> { task(() -> {System.out.print("C");num=1;}, bOver, cOver, 3); },"C"); t1.start(); t2.start(); t3.start(); } private static void task(Runnable r, Condition wait, Condition signal, int expectNum) { lock.lock(); try { for (int i = 0; i < count; i++) { if (num != expectNum) { wait.await(); } r.run(); signal.signalAll(); } } catch (InterruptedException e) { } finally { lock.unlock(); } } }
常见面试题二 : 如何确保库存上限设置和库存下限设置在多线程环境下的并发安全性
public class SafeWM { // 库存上限 private final AtomicLong upper = new AtomicLong(0); // 库存下限 private final AtomicLong lower = new AtomicLong(0); // 设置库存上限 void setUpper(long v){ // 检查参数合法性 if (v < lower.get()) { throw new IllegalArgumentException(); } upper.set(v); } // 设置库存下限 void setLower(long v){ // 检查参数合法性 if (v > upper.get()) { throw new IllegalArgumentException(); } lower.set(v); } // 省略其他业务代码 }
我们将库存上限upper和库存下限lower设置为Atomic原子类型,只能保证库存单个set操作的原子性,当增加库存校验判断逻辑的时候,就引入了竞态条件了 :
我们假设库存的下限和上限分别是 (2,10),线程 A 调用 setUpper(5) 将上限设置为 5,线程 B 调用 setLower(7) 将下限设置为 7,如果线程 A 和线程 B 完全同时执行,你会发现线程 A 能够通过参数校验,因为这个时候,下限还没有被线程 B 设置,还是 2,而 5>2;线程 B 也能够通过参数校验,因为这个时候,上限还没有被线程 A 设置,还是 10,而 7<10。当线程 A 和线程 B 都通过参数校验后,就把库存的下限和上限设置成 (7, 5) 了,显然此时的结果是不符合库存下限要小于库存上限这个约束条件的。
那么如何解决呢?
这里大方向有两种思路: 悲观锁或者乐观锁
悲观锁思路:
public class SafeWM1 { private final AtomicLong upper = new AtomicLong(0); private final AtomicLong lower = new AtomicLong(0); public synchronized void setUpper(long v) { if (v < lower.get()) { throw new IllegalArgumentException(); } upper.set(v); } public synchronized void setLower(long v) { if (v > upper.get()) { throw new IllegalArgumentException(); } lower.set(v); } }
//多把锁的加锁注意事项: 先加的锁后释放 public class SafeWM { private final AtomicLong upper = new AtomicLong(0); private final AtomicLong lower = new AtomicLong(0); private final ReadWriteLock lLock = new ReentrantReadWriteLock(); private final ReadWriteLock uLock = new ReentrantReadWriteLock(); public void setUpper(long v) { lLock.readLock().lock(); uLock.writeLock().lock(); if (v < lower.get()) { throw new IllegalArgumentException(); } upper.set(v); uLock.writeLock().unlock(); lLock.readLock().unlock(); } public void setLower(long v) { lLock.writeLock().lock(); uLock.readLock().lock(); if (v > upper.get()) { throw new IllegalArgumentException(); } lower.set(v); uLock.readLock().unlock(); lLock.writeLock().unlock(); } }
乐观锁思路: 考虑使用cas配合重试的方法
//乐观锁 public class SafeWM2 { private AtomicReference<StockLimit> stockLimit=new AtomicReference<StockLimit>(); public void setUpper(long v) { StockLimit oldObj; StockLimit newObj; do{ oldObj=stockLimit.get(); if(v<=oldObj.lower){ throw new IllegalArgumentException(); } newObj=new StockLimit(v, oldObj.lower); }while(stockLimit.compareAndSet(oldObj,newObj)); } public void setLower(long v) { StockLimit oldObj; StockLimit newObj; do{ oldObj=stockLimit.get(); if(v>=oldObj.upper){ throw new IllegalArgumentException(); } newObj=new StockLimit(oldObj.upper,v); }while(stockLimit.compareAndSet(oldObj,newObj)); } public static class StockLimit { private final long upper; private final long lower; public StockLimit(long upper, long lower) { this.upper = upper; this.lower = lower; } } }
本文参考该专栏部分文章整理而来,该专栏讲的很好,建议大家可以购买阅读原文获得更全面知识体验:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。