赞
踩
1、并发编程的三个核心问题:
(1)分工:所谓分工指的是如何高效地拆解任务并分配给线程
(2)同步:而同步指的是线程之间如何协作
(3)互斥:互斥则是保证同一时刻只允许一个线程访问共享资源
(4)应用:Java SDK 并发包很大部分内容都是按照这三个维度组织的,例如 Fork/Join 框架就是一种分工模式,CountDownLatch 就是一种典型的同步方式,而可重入锁则是一种互斥手段。
2、学习方法:
(1)跳出来,看全景:简历一张全景图
① 分工:
其实这就是生产者 - 消费者模式的一个优点,生产者一个一个地生产数据,而消费者可以批处理,这样就提高了性能。
② 同步:
a、一个线程执行完了一个任务,如何通知执行后续任务的线程开工
b、工作中遇到的线程协作问题,基本上都可以描述为这样的一个问题:当某个条件不满足时,线程需要等待,当某个条件满足时,线程需要被唤醒执行。例如,在生产者 - 消费者模型里,也有类似的描述,“当队列满时,生产者线程等待,当队列不满时,生产者线程需要被唤醒执行;当队列空时,消费者线程等待,当队列不空时,消费者线程需要被唤醒执行。
c、在 Java 并发编程领域,解决协作问题的核心技术是管程,上面提到的所有线程协作技术底层都是利用管程解决的。管程是一种解决并发问题的通用模型,除了能解决线程协作问题,还能解决下面我们将要介绍的互斥问题。可以这么说,管程是解决并发问题的万能钥匙。
③ 互斥:
a、线程安全,而导致不确定的主要源头是可见性问题、有序性问题和原子性问题,
b、所谓互斥,指的是同一时刻,只允许一个线程访问共享变量。
c、Java SDK 里提供的 ReadWriteLock、StampedLock 就可以优化读多写少场景下锁的性能。
d、除此之外,还有一些其他的方案,原理是不共享变量或者变量只允许读。这方面,Java 提供了 Thread Local 和 final 关键字,还有一种 Copy-on-write 的模式。
3、钻进去,看本质
我认为工程上的解决方案,一定要有理论做基础。
4、并发程序幕后的故事
(1)CPU、内存、I/O 设备都在不断迭代,不断朝着更快的方向努力。有一个核心矛盾一直存在,就是这三者的速度差异。
(2)为了合理利用 CPU 的高性能,平衡这三者的速度差异:
(3)CPU 增加了缓存,以均衡与内存的速度差异;操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
(4)并发问题
a、源头之一:缓存导致的可见性问题
一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性。
多核时代,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据一致性就没那么容易解决了
b、源头之二:线程切换带来的原子性问题
操作系统允许某个进程执行一小段时间,例如 50 毫秒,过了 50 毫秒操作系统就会重新选择一个进程来执行(我们称为“任务切换”),这个 50 毫秒称为“时间片”。
在一个时间片内,如果一个进程进行一个 IO 操作,例如读个文件,这个时候该进程可以把自己标记为“休眠状态”并出让 CPU 的使用权,待文件读进内存,操作系统会把这个休眠的进程醒,唤醒后的进程就有机会重新获得 CPU 的使用权了。
这里的进程在等待 IO 时之所以会释放 CPU 使用权,是为了让 CPU 在这段等待时间里可以做别的事情,这样一来 CPU 的使用率就上来了;此外,如果这时有另外一个进程也读文件,读文件的操作就会排队,磁盘驱动在完成一个进程的读操作后,发现有排队的任务,就会立即启动下一个读操作,这样 IO 的使用率也上来了。
早期的操作系统基于进程来调度 CPU,不同进程间是不共享内存空间的,所以进程要做任务切换就要切换内存映射地址,而一个进程创建的所有线程,都是共享一个内存空间的,所以线程做任务切换成本就很低了。现代的操作系统都基于更轻量的线程来调度,现在我们提到的“任务切换”都是指“线程切换”。
高级语言里一条语句往往需要多条 CPU 指令完成,例如上面代码中的count += 1,至少需要三条 CPU 指令。
指令 1:首先,需要把变量 count 从内存加载到 CPU 的寄存器;
指令 2:之后,在寄存器中执行 +1 操作;
指令 3:最后,将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。
操作系统做任务切换,可以发生在任何一条 CPU 指令执行完,是的,是 CPU 指令,而不是高级语言里的一条语句。
我们把一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性。CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言的操作符,这是违背我们直觉的地方。因此,很多时候我们需要在高级语言层面保证操作的原子性。
c、源头之三:编译优化带来的有序性问题
-
- public class Singleton {
- static Singleton instance;
- static Singleton getInstance(){
- if (instance == null) {
- synchronized(Singleton.class) {
- if (instance == null)
- instance = new Singleton();
- }
- }
- return instance;
- }
- }
特意提到缓存导致的可见性问题,线程切换带来的原子性问题,编译优化带来的有序性问题
5、Java 内存模型
导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性合理的方案应该是按需禁用缓存以及编译优化。具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及六项 Happens-Before 规则,
(1)使用 volatile 的困惑:
告诉编译器,对这个变量的读写,不能使用 CPU 缓存,必须从内存中读取或者写入。
java1.5版本解决了,用了Happens-Before
(2)Happens-Before 规则
前面一个操作的结果对后续操作是可见的。
① 程序的顺序性规则:程序前面对某个变量的修改一定是对后续操作可见的,按先后顺序执行。
② volatile 变量规则:这条规则是指对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。
③ 传递性:这条规则是指如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。
如果线程 B 读到了“v=true”,那么线程 A 设置的“x=42”对线程 B 是可见的。也就是说,线程 B 能看到 “x == 42”
④ 管程中锁的规则:
这条规则是指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
管程是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。
管程中的锁在 Java 里是隐式实现的,例如下面的代码,在进入同步块之前,会自动加锁,而在代码块执行完会自动释放锁,加锁以及释放锁都是编译器帮我们实现的。
-
- synchronized (this) { //此处自动加锁
- // x是共享变量,初始值=10
- if (this.x < 12) {
- this.x = 12;
- }
- } //此处自动解锁
可以这样理解:假设 x 的初始值是 10,线程 A 执行完代码块后 x 的值会变成 12(执行完自动释放锁),线程 B 进入代码块时,能够看到线程 A 对 x 的写操作,也就是线程 B 能够看到 x==12。这个也是符合我们直觉的,应该不难理解。
⑤ 线程 start() 规则:
它是指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。
-
- Thread B = new Thread(()->{
- // 主线程调用B.start()之前
- // 所有对共享变量的修改,此处皆可见
- // 此例中,var==77
- });
- // 此处对共享变量var修改
- var = 77;
- // 主线程启动子线程
- B.start();
如果线程 A 调用线程 B 的 start() 方法(即在线程 A 中启动线程 B),那么该 start() 操作 Happens-Before 于线程 B 中的任意操作。
⑥ 线程 join() 规则
这条是关于线程等待的。它是指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。
换句话说就是,如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于该 join() 操作的返回。
-
- Thread B = new Thread(()->{
- // 此处对共享变量var修改
- var = 66;
- });
- // 例如此处对共享变量修改,
- // 则这个修改结果对线程B可见
- // 主线程启动子线程
- B.start();
- B.join()
- // 子线程所有对共享变量的修改
- // 在主线程调用B.join()之后皆可见
- // 此例中,var==66
(3)final 关键字
final 修饰变量时,初衷是告诉编译器:这个变量生而不变,可以可劲儿优化。
当然了,在 1.5 以后 Java 内存模型对 final 类型变量的重排进行了约束。现在只要我们提供正确构造函数没有“逸出”,就不会出问题了。
“逸出”有点抽象,我们还是举个例子吧,在下面例子中,在构造函数里面将 this 赋值给了全局变量 global.obj,这就是“逸出”,线程通过 global.obj 读取 x 是有可能读到 0 的。因此我们一定要避免“逸出”。
-
- // 以下代码来源于【参考1】
- final int x;
- // 错误的构造函数
- public FinalFieldExample() {
- x = 3;
- y = 4;
- // 此处就是讲this逸出,
- global.obj = this;
- }
5、Java 内存模型
(1)那原子性问题到底该如何解决呢?
原子性问题的源头是线程切换,而操作系统做线程切换是依赖 CPU 中断的,所以禁止 CPU 发生中断就能够禁止线程切换。
这里我们以 32 位 CPU 上执行 long 型变量的写操作为例来说明这个问题,long 型变量是 64 位,在 32 位 CPU 上执行写操作会被拆分成两次写操作(写高 32 位和写低 32 位,如下图所示)。
在多核场景下,同一时刻,有可能有两个线程同时在执行,一个线程执行在 CPU-1 上,一个线程执行在 CPU-2 上,此时禁止 CPU 中断,只能保证 CPU 上的线程连续执行,并不能保证同一时刻只有一个线程执行,如果这两个线程同时写 long 型变量高 32 位的话,那就有可能出现我们开头提及的诡异 Bug 了。
“同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥。如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核 CPU 还是多核 CPU,就都能保证原子性了。
(2)简易锁模型
我们把一段需要互斥执行的代码称为临界区。线程在进入临界区之前,首先尝试加锁 lock(),如果成功,则进入临界区,此时我们称这个线程持有锁;否则呢就等待,直到持有锁的线程解锁;持有锁的线程执行完临界区的代码后,执行解锁 unlock()。
(3)改进后的锁模型
比如你用你家的锁保护你家的东西,我用我家的锁保护我家的东西。
首先,我们要把临界区要保护的资源标注出来,如图中临界区里增加了一个元素:受保护的资源 R;其次,我们要保护资源 R 就得为它创建一把锁 LR;最后,针对这把锁 LR,我们还需在进出临界区时添上加锁操作和解锁操作。另外,在锁 LR 和受保护资源之间,我特地用一条线做了关联,这个关联关系非常重要。很多并发 Bug 的出现都是因为把它忽略了,然后就出现了类似锁自家门来保护他家资产的事情,这样的 Bug 非常不好诊断,因为潜意识里我们认为已经正确加锁了。
(4)synchronized
-
- class X {
- // 修饰非静态方法
- synchronized void foo() {
- // 临界区
- }
- // 修饰静态方法
- synchronized static void bar() {
- // 临界区
- }
- // 修饰代码块
- Object obj = new Object();
- void baz() {
- synchronized(obj) {
- // 临界区
- }
- }
- }
上面的代码我们看到只有修饰代码块的时候,锁定了一个 obj 对象
当修饰静态方法的时候,锁定的是当前类的 Class 对象,在上面的例子中就是 Class X;
当修饰非静态方法的时候,锁定的是当前实例对象 this。
synchronized 修饰静态方法相当于:
-
- class X {
- // 修饰静态方法
- synchronized(X.class) static void bar() {
- // 临界区
- }
- }
修饰非静态方法,相当于:
-
- class X {
- // 修饰非静态方法
- synchronized(this) void foo() {
- // 临界区
- }
- }
(5)用 synchronized 解决 count+=1 问题
SafeCalc 这个类有两个方法:一个是 get() 方法,用来获得 value 的值;另一个是 addOne() 方法,用来给 value 加 1,并且 addOne() 方法我们用 synchronized 修饰。
-
- class SafeCalc {
- long value = 0L;
- long get() {
- return value;
- }
- synchronized void addOne() {
- value += 1;
- }
- }
被 synchronized 修饰后,无论是单核 CPU 还是多核 CPU,只有一个线程能够执行 addOne() 方法,所以一定能保证原子操作
管程中锁的规则:对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
管程,就是我们这里的 synchronized(至于为什么叫管程,我们后面介绍),我们知道 synchronized 修饰的临界区是互斥的,也就是说同一时刻只有一个线程执行临界区的代码;而所谓“对一个锁解锁 Happens-Before 后续对这个锁的加锁”,指的是前一个线程的解锁操作对后一个线程的加锁操作可见,综合 Happens-Before 的传递性原则,我们就能得出前一个线程在临界区修改的共享变量(该操作在解锁之前),对后续进入临界区(该操作在加锁之后)的线程是可见的。
你一不小心就忽视了 get() 方法。执行 addOne() 方法后,value 的值对 get() 方法是可见的吗?这个可见性是没法保证的。管程中锁的规则,是只保证后续对这个锁的加锁的可见性,而 get() 方法并没有加锁操作,所以可见性没法保证。那如何解决呢?很简单,就是 get() 方法也 synchronized 一下,完整的代码如下所示。
-
- class SafeCalc {
- long value = 0L;
- synchronized long get() {
- return value;
- }
- synchronized void addOne() {
- value += 1;
- }
- }
上面的代码转换为我们提到的锁模型,就是下面图示这个样子。get() 方法和 addOne() 方法都需要访问 value 这个受保护的资源,这个资源用 this 这把锁来保护。线程要进入临界区 get() 和 addOne(),必须先获得 this 这把锁,这样 get() 和 addOne() 也是互斥的。
(6)锁和受保护资源的关系
受保护资源和锁之间的关联关系是 N:1 的关系。
上面那个例子我稍作改动,把 value 改成静态变量,把 addOne() 方法改成静态方法,此时 get() 方法和 addOne() 方法是否存在并发问题呢?
-
- class SafeCalc {
- static long value = 0L;
- synchronized long get() {
- return value;
- }
- synchronized static void addOne() {
- value += 1;
- }
- }
如果你仔细观察,就会发现改动后的代码是用两个锁保护一个资源。这个受保护的资源就是静态变量 value,两个锁分别是 this 和 SafeCalc.class。我们可以用下面这幅图来形象描述这个关系。由于临界区 get() 和 addOne() 是用两个锁保护的,因此这两个临界区没有互斥关系,临界区 addOne() 对 value 的修改对临界区 get() 也没有可见性保证,这就导致并发问题了。
6、如何用一把锁保护多个资源
(1)保护没有关联关系的多个资源
相关的示例代码如下,账户类 Account 有两个成员变量,分别是账户余额 balance 和账户密码 password。取款 withdraw() 和查看余额 getBalance() 操作会访问账户余额 balance,我们创建一个 final 对象 balLock 作为锁(类比球赛门票);而更改密码 updatePassword() 和查看密码 getPassword() 操作会修改账户密码 password,我们创建一个 final 对象 pwLock 作为锁(类比电影票)。不同的资源用不同的锁保护,各自管各自的,很简单。
-
- class Account {
- // 锁:保护账户余额
- private final Object balLock
- = new Object();
- // 账户余额
- private Integer balance;
- // 锁:保护账户密码
- private final Object pwLock
- = new Object();
- // 账户密码
- private String password;
-
- // 取款
- void withdraw(Integer amt) {
- synchronized(balLock) {
- if (this.balance > amt){
- this.balance -= amt;
- }
- }
- }
- // 查看余额
- Integer getBalance() {
- synchronized(balLock) {
- return balance;
- }
- }
-
- // 更改密码
- void updatePassword(String pw){
- synchronized(pwLock) {
- this.password = pw;
- }
- }
- // 查看密码
- String getPassword() {
- synchronized(pwLock) {
- return password;
- }
- }
- }
用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁。
(2)保护有关联关系的多个资源
-
- class Account {
- private int balance;
- // 转账
- synchronized void transfer(
- Account target, int amt){
- if (this.balance > amt) {
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
问题就出在 this 这把锁上,this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance,就像你不能用自家的锁来保护别人家的资产,也不能用自己的票来保护别人的座位一样。
(3)使用锁的正确姿势
锁能覆盖所有受保护资源
Account.class 作为共享的锁。Account.class 是所有 Account 对象共享的,而且这个对象是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。使用 Account.class 作为共享的锁,我们就无需在创建 Account 对象时传入了,代码更简单。
-
- class Account {
- private int balance;
- // 转账
- void transfer(Account target, int amt){
- synchronized(Account.class) {
- if (this.balance > amt) {
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
- }
7、死锁问题处理
账本问题
1、文件架上恰好有转出账本和转入账本,那就同时拿走;
2、如果文件架上只有转出账本和转入账本之一,那这个柜员就先把文件架上有的账本拿到手,同时等着其他柜员把另外一个账本送回来;
3、转出账本和转入账本都没有,那这个柜员就等着两个账本都被送回来。
其实用两把锁就实现了,转出账本一把,转入账本另一把。在 transfer() 方法内部,我们首先尝试锁定转出账户 this(先把转出账本拿到手),然后尝试锁定转入账户 target(再把转入账本拿到手),只有当两者都成功时,才执行转账操作。
-
- class Account {
- private int balance;
- // 转账
- void transfer(Account target, int amt){
- // 锁定转出账户
- synchronized(this) {
- // 锁定转入账户
- synchronized(target) {
- if (this.balance > amt) {
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
- }
- }
细粒度锁。使用细粒度锁可以提高并行度,是性能优化的一个重要手段。
使用细粒度锁是有代价的,这个代价就是可能会导致死锁。
死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
-
- class Account {
- private int balance;
- // 转账
- void transfer(Account target, int amt){
- // 锁定转出账户
- synchronized(this){ ①
- // 锁定转入账户
- synchronized(target){ ②
- if (this.balance > amt) {
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
- }
- }
如何预防死锁:
(1)只有以下这四个条件都发生时才会出现死锁:
① 互斥,共享资源 X 和 Y 只能被一个线程占用;
② 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
③ 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
④ 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。
处理方案:
其中,互斥这个条件我们没有办法破坏,因为我们用锁为的就是互斥。不过其他三个条件都是有办法破坏掉的,到底如何做呢?
① 对于“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。
② 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
③ 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。
代码实现:
① 破坏占用且等待条件:
可以增加一个账本管理员,然后只允许账本管理员从文件架上拿账本,也就是说柜员不能直接在文件架上拿账本,必须通过账本管理员才能拿到想要的账本。例如,张三同时申请账本 A 和 B,账本管理员如果发现文件架上只有账本 A,这个时候账本管理员是不会把账本 A 拿下来给张三的,只有账本 A 和 B 都在的时候才会给张三。这样就保证了“一次性申请所有资源”。
“同时申请”这个操作是一个临界区,我们也需要一个角色(Java 里面的类)来管理这个临界区,我们就把这个角色定为 Allocator。它有两个重要功能,分别是:同时申请资源 apply() 和同时释放资源 free()。账户 Account 类里面持有一个 Allocator 的单例(必须是单例,只能由一个人来分配资源)。当账户 Account 在执行转账操作的时候,首先向 Allocator 同时申请转出账户和转入账户这两个资源,成功后再锁定这两个资源;当转账操作执行完,释放锁之后,我们需通知 Allocator 同时释放转出账户和转入账户这两个资源。
-
- class Allocator {
- private List<Object> als =
- new ArrayList<>();
- // 一次性申请所有资源
- synchronized boolean apply(
- Object from, Object to){
- if(als.contains(from) ||
- als.contains(to)){
- return false;
- } else {
- als.add(from);
- als.add(to);
- }
- return true;
- }
- // 归还资源
- synchronized void free(
- Object from, Object to){
- als.remove(from);
- als.remove(to);
- }
- }
-
- class Account {
- // actr应该为单例
- private Allocator actr;
- private int balance;
- // 转账
- void transfer(Account target, int amt){
- // 一次性申请转出账户和转入账户,直到成功
- while(!actr.apply(this, target))
- ;
- try{
- // 锁定转出账户
- synchronized(this){
- // 锁定转入账户
- synchronized(target){
- if (this.balance > amt){
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
- } finally {
- actr.free(this, target)
- }
- }
- }
② 破坏不可抢占条件:
破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的。
③ 破坏循环等待条件
破坏这个条件,需要对资源进行排序,然后按序申请资源。这个实现非常简单,我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。
-
- class Account {
- private int id;
- private int balance;
- // 转账
- void transfer(Account target, int amt){
- Account left = this ①
- Account right = target; ②
- if (this.id > target.id) { ③
- left = target; ④
- right = this; ⑤
- } ⑥
- // 锁定序号小的账户
- synchronized(left){
- // 锁定序号大的账户
- synchronized(right){
- if (this.balance > amt){
- this.balance -= amt;
- target.balance += amt;
- }
- }
- }
- }
- }
8、 用“等待-通知”机制优化循环等待
在破坏占用且等待条件的时候,如果转出账本和转入账本不满足同时在文件架上这个条件,就用死循环的方式来循环等待,核心代码如下:
-
- // 一次性申请转出账户和转入账户,直到成功
- while(!actr.apply(this, target))
- ;
但是如果 apply() 操作耗时长,或者并发冲突量大的时候,循环等待这种方案就不适用了,因为在这种场景下,可能要循环上万次才能获取到锁,太消耗 CPU 了。
一个完整的等待 - 通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。
(1)用 synchronized 实现等待 - 通知机制
比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。
左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。
如上图所示,当调用 wait() 方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列。 线程在进入等待队列的同时,会释放持有的互斥锁,线程释放锁后,其他线程就有机会获得锁,并进入临界区了。
就是 Java 对象的 notify() 和 notifyAll() 方法。我在下面这个图里为你大致描述了这个过程,当条件满足时调用 notify(),会通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过。
为什么说是曾经满足过呢?因为 notify() 只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)。这一点你需要格外注意。
除此之外,还有一个需要注意的点,被通知的线程要想重新执行,仍然需要获取到互斥锁(因为曾经获取的锁在调用 wait() 时已经释放了)
(2)一个更好地资源分配器
互斥锁:上一篇文章我们提到 Allocator 需要是单例的,所以我们可以用 this 作为互斥锁。
线程要求的条件:转出账户和转入账户都没有被分配过。
何时等待:线程要求的条件不满足就等待。
何时通知:当有线程释放账户时就通知。
-
- while(条件不满足) {
- wait();
- }
利用这种范式可以解决上面提到的条件曾经满足过这个问题。
-
- class Allocator {
- private List<Object> als;
- // 一次性申请所有资源
- synchronized void apply(
- Object from, Object to){
- // 经典写法
- while(als.contains(from) ||
- als.contains(to)){
- try{
- wait();
- }catch(Exception e){
- }
- }
- als.add(from);
- als.add(to);
- }
- // 归还资源
- synchronized void free(
- Object from, Object to){
- als.remove(from);
- als.remove(to);
- notifyAll();
- }
- }
(3)尽量使用 notifyAll()
notify() 是会随机地通知等待队列中的一个线程,而 notifyAll() 会通知等待队列中的所有线程。从感觉上来讲,应该是 notify() 更好一些,因为即便通知所有线程,也只有一个线程能够进入临界区。但那所谓的感觉往往都蕴藏着风险,实际上使用 notify() 也很有风险,它的风险在于可能导致某些线程永远不会被通知到。
9、并发编程中问题
(1)安全性问题:
① 问题:就要避免出现原子性问题、可见性问题和有序性问题。
② 条件:存在共享数据并且该数据会发生变化,通俗地讲就是有多个线程会同时读写同一数据。
③ 原因:当多个线程同时访问同一数据,并且至少有一个线程会写这个数据的时候,如果我们不采取防护措施,那么就会导致并发 Bug,叫做数据竞争(Data Race)。
-
- public class Test {
- private long count = 0;
- void add10K() {
- int idx = 0;
- while(idx++ < 10000) {
- count += 1;
- }
- }
- }
对于修改后的代码,所有访问共享变量 value 的地方,我们都增加了互斥锁,此时是不存在数据竞争的。但很显然修改后的 add10K() 方法并不是线程安全的。
-
- public class Test {
- private long count = 0;
- synchronized long get(){
- return count;
- }
- synchronized void set(long v){
- count = v;
- }
- void add10K() {
- int idx = 0;
- while(idx++ < 10000) {
- set(get()+1)
- }
- }
- }
假设 count=0,当两个线程同时执行 get() 方法时,get() 方法会返回相同的值 0,两个线程执行 get()+1 操作,结果都是 1,之后两个线程再将结果 1 写入了内存。你本来期望的是 2,而结果却是 1。
所谓竞态条件,指的是程序的执行结果依赖线程执行的顺序。
其实这两类问题,都可以用互斥这个技术方案,而实现互斥的方案有很多,CPU 提供了相关的互斥指令,操作系统、编程语言也会提供相关的 API。从逻辑上来看,我们可以统一归为:锁。
(2)活跃性问题:
所谓活跃性问题,指的是某个操作无法执行下去。我们常见的“死锁”就是一种典型的活跃性问题,当然除了死锁外,还有两种情况,分别是“活锁”和“饥饿”。
① 有时线程虽然没有发生阻塞,但仍然会存在执行不下去的情况,这就是所谓的“活锁”。
解决“活锁”的方案很简单,谦让时,尝试等待一个随机的时间就可以了。
② 所谓“饥饿”指的是线程因无法访问所需资源而无法执行下去的情况。
如果线程优先级“不均”,在 CPU 繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程“饥饿”;持有锁的线程,如果执行的时间过长,也可能导致“饥饿”问题。
解决“饥饿”问题的方案很简单,有三种方案:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。倒是方案二的适用场景相对来说更多一些。
在并发编程里,主要是使用公平锁。所谓公平锁,是一种先来后到的方案,线程的等待是有顺序的,排在等待队列前面的线程会优先获得资源。
(3)性能问题:
锁”的过度使用可能导致串行化的范围过大,这样就不能够发挥多线程的优势了,而我们之所以使用多线程搞并发程序,为的就是提升性能。
第一,既然使用锁会带来性能问题,那最好的方案自然就是使用无锁的算法和数据结构了。
例如线程本地存储 (Thread Local Storage, TLS)、写入时复制 (Copy-on-write)、乐观锁等;Java 并发包里面的原子类也是一种无锁的数据结构;Disruptor 则是一个无锁的内存队列,性能都非常好……
第二,减少锁持有的时间。互斥锁本质上是将并行的程序串行化,所以要增加并行度,一定要减少持有锁的时间。这个方案具体的实现技术也有很多,例如使用细粒度的锁,一个典型的例子就是 Java 并发包里的 ConcurrentHashMap,它使用了所谓分段锁的技术(这个技术后面我们会详细介绍);还可以使用读写锁,也就是读是无锁的,只有写的时候才会互斥。
性能方面的度量指标有很多,我觉得有三个指标非常重要,就是:吞吐量、延迟和并发量。
吞吐量:指的是单位时间内能处理的请求数量。吞吐量越高,说明性能越好。
延迟:指的是从发出请求到收到响应的时间。延迟越小,说明性能越好。
并发量:指的是能同时处理的请求数量,一般来说随着并发量的增加、延迟也会增加。所以延迟这个指标,一般都会是基于并发量来说的。例如并发量是 1000 的时候,延迟是 50 毫秒。总结
10、管程:并发编程的万能钥匙
(1)什么是管程:
Java 采用的是管程技术,synchronized 关键字及 wait()、notify()、notifyAll() 这三个方法都是管程的组成部分。而管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。但是管程更容易使用,所以 Java 选择了管程。
管程,对应的英文是 Monitor,很多 Java 领域的同学都喜欢将其翻译成“监视器”,这是直译。操作系统领域一般都翻译成“管程”,这个是意译,而我自己也更倾向于使用“管程”。
所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。
(2)管程模型:MESA 模型
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。
① 我们先来看看管程是如何解决互斥问题的:
管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。假如我们要实现一个线程安全的阻塞队列,一个最直观的想法就是:将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作和出队操作。
利用管程,可以快速实现这个直观的想法。在下图中,管程 X 将共享变量 queue 这个线程不安全的队列和相关的操作入队操作 enq()、出队操作 deq() 都封装起来了;线程 A 和线程 B 如果想访问共享变量 queue,只能通过调用管程提供的 enq()、deq() 方法来实现;enq()、deq() 保证互斥性,只允许一个线程进入管程。
管程模型和面向对象高度契合的。
② 那管程如何解决线程间的同步问题:
在管程模型里,共享变量和对共享变量的操作是被封装起来的,图中最外层的框就代表封装的意思。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。这个过程类似就医流程的分诊,只允许一个患者就诊,其他患者都在门口等待。
管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列,如下图,条件变量 A 和条件变量 B 分别都有自己的等待队列。
条件变量和条件变量等待队列的作用是解决线程同步问题。
一定要注意阻塞队列和等待队列是不同的
假设有个线程 T1 执行阻塞队列的出队操作,执行出队操作,需要注意有个前提条件,就是阻塞队列不能是空的(空队列只能出 Null 值,是不允许的),阻塞队列不空这个前提条件对应的就是管程里的条件变量。 如果线程 T1 进入管程后恰好发现阻塞队列是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程 T1 就去“队列不空”这个条件变量的等待队列中等待。这个过程类似于大夫发现你要去验个血,于是给你开了个验血的单子,你呢就去验血的队伍里排队。线程 T1 进入条件变量的等待队列后,是允许其他线程进入管程的。这和你去验血的时候,医生可以给其他患者诊治,道理都是一样的。
再假设之后另外一个线程 T2 执行阻塞队列的入队操作,入队操作执行成功之后,“阻塞队列不空”这个条件对于线程 T1 来说已经满足了,此时线程 T2 要通知 T1,告诉它需要的条件已经满足了。当线程 T1 得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。这个过程类似你验血完,回来找大夫,需要重新分诊。
wait()、notify()、notifyAll() 这三个操作。前面提到线程 T1 发现“阻塞队列不空”这个条件不满足,需要进到对应的等待队列里等待。这个过程就是通过调用 wait() 来实现的。如果我们用对象 A 代表“阻塞队列不空”这个条件,那么线程 T1 需要调用 A.wait()。同理当“阻塞队列不空”这个条件满足时,线程 T2 需要调用 A.notify() 来通知 A 等待队列中的一个线程,此时这个等待队列里面只有线程 T1。至于 notifyAll() 这个方法,它可以通知等待队列中的所有线程。
用管程实现了一个线程安全的阻塞队列。阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。
-
- public class BlockedQueue<T>{
- final Lock lock =
- new ReentrantLock();
- // 条件变量:队列不满
- final Condition notFull =
- lock.newCondition();
- // 条件变量:队列不空
- final Condition notEmpty =
- lock.newCondition();
-
- // 入队
- void enq(T x) {
- lock.lock();
- try {
- while (队列已满){
- // 等待队列不满
- notFull.await();
- }
- // 省略入队操作...
- //入队后,通知可出队
- notEmpty.signal();
- }finally {
- lock.unlock();
- }
- }
- // 出队
- void deq(){
- lock.lock();
- try {
- while (队列已空){
- // 等待队列不空
- notEmpty.await();
- }
- // 省略出队操作...
- //出队后,通知可入队
- notFull.signal();
- }finally {
- lock.unlock();
- }
- }
- }
1、对于阻塞队列的入队操作,如果阻塞队列已满,就需要等待直到阻塞队列不满,所以这里用了notFull.await();。
2、对于阻塞出队操作,如果阻塞队列为空,就需要等待直到阻塞队列不空,所以就用了notEmpty.await();。
3、如果入队成功,那么阻塞队列就不空了,就需要通知条件变量:阻塞队列不空notEmpty对应的等待队列。
4、如果出队成功,那就阻塞队列就不满了,就需要通知条件变量:阻塞队列不满notFull对应的等待队列。
await() 和前面我们提到的 wait() 语义是一样的;signal() 和前面我们提到的 notify() 语义是一样的。
(3)wait()的正确姿势
对于 MESA 管程来说,有一个编程范式,就是需要在一个 while 循环里面调用 wait()。这个是 MESA 管程特有的。
- while(条件不满足) {
- wait();
- }
Hasen 模型、Hoare 模型和 MESA 模型的一个核心区别就是当条件满足后,如何通知相关线程。管程要求同一时刻只允许一个线程执行,那当线程 T2 的操作使线程 T1 等待的条件满足时,T1 和 T2 究竟谁可以执行呢?
① Hasen 模型里面,要求 notify() 放在代码的最后,这样 T2 通知完 T1 后,T2 就结束了,然后 T1 再执行,这样就能保证同一时刻只有一个线程执行。
② Hoare 模型里面,T2 通知完 T1 后,T2 阻塞,T1 马上执行;等 T1 执行完,再唤醒 T2,也能保证同一时刻只有一个线程执行。但是相比 Hasen 模型,T2 多了一次阻塞唤醒操作。
③ MESA 管程里面,T2 通知完 T1 后,T2 还是会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 notify() 不用放到代码的最后,T2 也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。
(4)notify() 何时可以使用
除非经过深思熟虑,否则尽量使用 notifyAll()。
使用notify条件,同时满足3个条件:
① 所有等待线程拥有相同的等待条件;
② 所有等待线程被唤醒后,执行相同的操作;
③ 只需要唤醒一个线程。
重点是 while 里面的等待条件是完全相同的。
- while (阻塞队列已满){
- // 等待队列不满
- notFull.await();
- }
所有等待线程被唤醒后执行的操作也是相同的,都是下面这几行:
- // 省略入队操作...
- // 入队后,通知可出队
- notEmpty.signal();
同时也满足第 3 条,只需要唤醒一个线程。所以上面阻塞队列的代码,使用 signal() 是可以的。
11、Java线程的生命周期
(1)通用的线程生命周期
初始状态,指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
可运行状态,指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
(2)Java 中线程的生命周期
NEW(初始化状态)
RUNNABLE(可运行 / 运行状态)
BLOCKED(阻塞状态)
WAITING(无时限等待)
TIMED_WAITING(有时限等待)
TERMINATED(终止状态)
Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,即前面我们提到的休眠状态。也就是说只要 Java 线程处于这三种状态之一,那么这个线程就永远没有 CPU 的使用权。
BLOCKED、WAITING、TIMED_WAITING 可以理解为线程导致休眠状态的三种原因
(3)RUNNABLE 与 BLOCKED 的状态转换
只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,这种情况下,等待的线程就会从 RUNNABLE 转换到 BLOCKED 状态。而当等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。
JVM 层面并不关心操作系统调度相关的状态,因为在 JVM 看来,等待 CPU 使用权(操作系统层面此时处于可执行状态)与等待 I/O(操作系统层面此时处于休眠状态)没有区别,都是在等待某个资源,所以都归入了 RUNNABLE 状态。
而我们平时所谓的 Java 在调用阻塞式 API 时,线程会阻塞,指的是操作系统线程的状态,并不是 Java 线程的状态。
(4)RUNNABLE 与 WAITING 的状态转换
第一种场景,获得 synchronized 隐式锁的线程,调用无参数的 Object.wait() 方法。其中,wait() 方法我们在上一篇讲解管程的时候已经深入介绍过了,这里就不再赘述。
第二种场景,调用无参数的 Thread.join() 方法。其中的 join() 是一种线程同步方法,例如有一个线程对象 thread A,当调用 A.join() 的时候,执行这条语句的线程会等待 thread A 执行完,而等待中的这个线程,其状态会从 RUNNABLE 转换到 WAITING。当线程 thread A 执行完,原来等待它的线程又会从 WAITING 状态转换到 RUNNABLE。
第三种场景,调用 LockSupport.park() 方法。其中的 LockSupport 对象,也许你有点陌生,其实 Java 并发包中的锁,都是基于它实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。
(5)RUNNABLE 与 TIMED_WAITING 的状态转换
调用带超时参数的 Thread.sleep(long millis) 方法;
获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
调用带超时参数的 Thread.join(long millis) 方法;
调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数。
(6)从 NEW 到 RUNNABLE 状态
Java 刚创建出来的 Thread 对象就是 NEW 状态,而创建 Thread 对象主要有两种方法。一种是继承 Thread 对象,重写 run() 方法。示例代码如下:
-
- // 自定义线程对象
- class MyThread extends Thread {
- public void run() {
- // 线程需要执行的代码
- ......
- }
- }
- // 创建线程对象
- MyThread myThread = new MyThread();
另一种是实现 Runnable 接口,重写 run() 方法,并将该实现类作为创建 Thread 对象的参数。示例代码如下:
-
- // 实现Runnable接口
- class Runner implements Runnable {
- @Override
- public void run() {
- // 线程需要执行的代码
- ......
- }
- }
- // 创建线程对象
- Thread thread = new Thread(new Runner());
从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法就可以了,
-
- MyThread myThread = new MyThread();
- // 从NEW状态转换到RUNNABLE状态
- myThread.start();
(7)从 RUNNABLE 到 TERMINATED 状态
线程执行完 run() 方法后,会自动转换到 TERMINATED 状态,当然如果执行 run() 方法的时候异常抛出,也会导致线程终止。
有时候我们需要强制中断 run() 方法的执行,例如 run() 方法访问一个很慢的网络,我们等不下去了,想终止,正确的姿势其实是调用 interrupt() 方法。
(8) stop() 和 interrupt() 方法的主要区别是什么呢?
① stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 ReentrantLock 锁,被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,那其他线程就再也没机会获得 ReentrantLock 锁,这实在是太危险了,所以该方法就不建议使用了
② 而 interrupt() 方法就温柔多了,interrupt() 方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被 interrupt 的线程,是怎么收到通知的呢?一种是异常,另一种是主动检测。
a、当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。
b、当线程 A 处于 RUNNABLE 状态时,并且阻塞在 java.nio.channels.InterruptibleChannel 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 会触发 java.nio.channels.ClosedByInterruptException 这个异常;而阻塞在 java.nio.channels.Selector 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 的 java.nio.channels.Selector 会立即返回。
c、上面这两种情况属于被中断的线程通过异常的方式获得了通知。还有一种是主动检测,如果线程处于 RUNNABLE 状态,并且没有阻塞在某个 I/O 操作上,例如中断计算圆周率的线程 A,这时就得依赖线程 A 主动检测中断状态了。如果其他线程调用线程 A 的 interrupt() 方法,那么线程 A 可以通过 isInterrupted() 方法,检测是不是自己被中断了
(9)诊断多线程问题
上面这两种情况属于被中断的线程通过异常的方式获得了通知。还有一种是主动检测,如果线程处于 RUNNABLE 状态,并且没有阻塞在某个 I/O 操作上,例如中断计算圆周率的线程 A,这时就得依赖线程 A 主动检测中断状态了。如果其他线程调用线程 A 的 interrupt() 方法,那么线程 A 可以通过 isInterrupted() 方法,检测是不是自己被中断了
12、创建多少线程才是合适的
(1)为什么要使用多线程
有两个指标是最核心的,它们就是延迟和吞吐量。延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。 吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。这两个指标内部有一定的联系(同等条件下,延迟越短,吞吐量越大),但是由于它们隶属不同的维度(一个是时间维度,一个是空间维度),并不能互相转换。
(2)多线程的应用场景
① 一个方向是优化算法,另一个方向是将硬件的性能发挥到极致。前者属于算法范畴,后者则是在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。
② 但是操作系统解决硬件利用率问题的对象往往是单一的硬件设备,而我们的并发程序,往往需要 CPU 和 I/O 设备相互配合工作,也就是说,我们需要解决 CPU 和 I/O 设备综合利用率的问题。
如下图所示,如果只有一个线程,执行 CPU 计算的时候,I/O 设备空闲;执行 I/O 操作的时候,CPU 空闲,所以 CPU 的利用率和 I/O 设备的利用率都是 50%。
如果有两个线程,如下图所示,当线程 A 执行 CPU 计算的时候,线程 B 执行 I/O 操作;当线程 A 执行 I/O 操作的时候,线程 B 执行 CPU 计算,这样 CPU 的利用率和 I/O 设备的利用率就都达到了 100%。
如果 CPU 和 I/O 设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量。
在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。
一个线程,对于 4 核的 CPU,CPU 的利用率只有 25%,而 4 个线程,则能够将 CPU 的利用率提高到 100%。
(3)创建多少线程合适?
我们的程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。
① 对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
② 对于 I/O 密集型的计算场景,比如前面我们的例子中,如果 CPU 计算和 I/O 操作的耗时是 1:1,那么 2 个线程是最合适的。如果 CPU 计算和 I/O 操作的耗时是 1:2,那多少个线程合适呢?是 3 个线程,如下图所示:CPU 在 A、B、C 三个线程之间切换,对于线程 A,当 CPU 从 B、C 切换回来时,线程 A 正好执行完 I/O 操作。这样 CPU 和 I/O 设备的利用率都达到了 100%。
对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:
最佳线程数 =1 +(I/O 耗时 / CPU 耗时)
我们令 R=I/O 耗时 / CPU 耗时,综合上图,可以这样理解:当线程 A 执行 IO 操作时,另外 R 个线程正好执行完各自的 CPU 计算。这样 CPU 的利用率就达到了 100%。
上面这个公式是针对单核 CPU 的,至于多核 CPU,也很简单,只需要等比扩大就可以了,计算公式如下:
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
(4)应用场景
对于 I/O 密集型计算场景,I/O 耗时和 CPU 耗时的比值是一个关键参数,不幸的是这个参数是未知的,而且是动态变化的,所以工程上,我们要估算这个参数,然后做各种不同场景下的压测来验证我们的估计。不过工程上,原则还是将硬件的性能发挥到极致,所以压测时,我们需要重点关注 CPU、I/O 设备的利用率和性能指标(响应时间、吞吐量)之间的关系。
13、为什么局部变量是线程安全的
斐波那契数列方法
-
- // 返回斐波那契数列
- int[] fibonacci(int n) {
- // 创建结果数组
- int[] r = new int[n];
- // 初始化第一、第二个数
- r[0] = r[1] = 1; // ①
- // 计算2..n
- for(int i = 2; i < n; i++) {
- r[i] = r[i-2] + r[i-1];
- }
- return r;
- }
你自己可以在大脑里模拟一下多个线程调用 fibonacci() 方法的情景,假设多个线程执行到 ① 处,多个线程都要对数组 r 的第 1 项和第 2 项赋值,这里看上去感觉是存在数据竞争的,但实际不存在数据竞争。
在CPU 层面,是没有方法概念的,CPU 的眼里,只有一条条的指令。编译程序,负责把高级语言里的方法转换成一条条的指令。
(1)方法是如何被执行的
- int a = 7;
- int[] b = fibonacci(a);
- int[] c = b;
第 1 行,声明一个 int 变量 a;
第 2 行,调用方法 fibonacci(a);
第 3 行,将 b 赋值给 c。
当你调用 fibonacci(a) 的时候,CPU 要先找到方法 fibonacci() 的地址,然后跳转到这个地址去执行代码,最后 CPU 执行完方法 fibonacci() 之后,要能够返回。首先找到调用方法的下一条语句的地址:也就是int[] c=b;的地址,再跳转到这个地址去执行。 你可以参考下面这个图再加深一下理解。
CPU 去哪里找到调用方法的参数和返回地址:
通过 CPU 的堆栈寄存器。CPU 支持一种栈结构,栈你一定很熟悉了,就像手枪的弹夹,先入后出。因为这个栈是和方法调用相关的,因此经常被称为调用栈。
例如,有三个方法 A、B、C,他们的调用关系是 A->B->C(A 调用 B,B 调用 C),在运行时,会构建出下面这样的调用栈。每个方法在调用栈里都有自己的独立空间,称为栈帧,每个栈帧里都有对应方法需要的参数和返回地址。当调用方法时,会创建新的栈帧,并压入调用栈;当方法返回时,对应的栈帧就会被自动弹出。也就是说,栈帧和方法是同生共死的。
利用栈结构来支持方法调用这个方案非常普遍,以至于 CPU 里内置了栈寄存器。虽然各家编程语言定义的方法千奇百怪,但是方法的内部执行原理却是出奇的一致:都是靠栈结构解决的。Java 语言虽然是靠虚拟机解释执行的,但是方法的调用也是利用栈结构解决的。
(2)局部变量存哪里?
局部变量的作用域是方法内部,也就是说当方法执行完,局部变量就没用了,局部变量应该和方法同生共死。此时你应该会想到调用栈的栈帧,调用栈的栈帧就是和方法同生共死的,所以局部变量放到调用栈里那儿是相当的合理。事实上,的确是这样的,局部变量就是放到了调用栈里。于是调用栈的结构就变成了下图这样。
这个结论相信很多人都知道,因为学 Java 语言的时候,基本所有的教材都会告诉你 new 出来的对象是在堆里,局部变量是在栈里,只不过很多人并不清楚堆和栈的区别,以及为什么要区分堆和栈。现在你应该很清楚了,局部变量是和方法同生共死的,一个变量如果想跨越方法的边界,就必须创建在堆里。
(3)调用栈与线程
两个线程可以同时用不同的参数调用相同的方法,那调用栈和线程之间是什么关系呢?答案是:每个线程都有自己独立的调用栈。因为如果不是这样,那两个线程就互相干扰了。如下面这幅图所示,线程 A、B、C 每个线程都有自己独立的调用栈。
Java 方法里面的局部变量是否存在并发问题?现在你应该很清楚了,一点问题都没有。因为每个线程都有自己的调用栈,局部变量保存在线程各自的调用栈里面,不会共享,所以自然也就没有并发问题。再次重申一遍:没有共享,就没有伤害。
(4)线程封闭
线程封闭,比较官方的解释是:仅在单线程内访问数据。由于不存在共享,所以即便不同步也不会有并发问题,性能杠杠的。
采用线程封闭技术的案例非常多,例如从数据库连接池里获取的连接 Connection,在 JDBC 规范里并没有要求这个 Connection 必须是线程安全的。数据库连接池通过线程封闭技术,保证一个 Connection 一旦被一个线程获取之后,在这个线程关闭 Connection 之前的这段时间里,不会再分配给其他线程,从而保证了 Connection 不会有并发问题。
14、如何用面向对象思想写好并发程序?
在 Java 语言里,面向对象思想能够让并发编程变得更简单。
从封装共享变量、识别共享变量间的约束条件和制定并发访问策略这三个方面下手。
(1)封装共享变量
现实世界里门票管理的一个核心问题是:所有观众只能通过规定的入口进入,否则检票就形同虚设。在编程世界这个问题也很重要,编程领域里面对于共享变量的访问路径就类似于球场的入口,必须严格控制。好在有了面向对象思想,对共享变量的访问路径可以轻松把控。
面向对象思想里面有一个很重要的特性是封装,封装的通俗解释就是将属性和实现细节封装在对象内部,外界对象只能通过目标对象提供的公共方法来间接访问这些内部属性,这和门票管理模型匹配度相当的高,球场里的座位就是对象属性,球场入口就是对象的公共方法。我们把共享变量作为对象的属性,那对于共享变量的访问路径就是对象的公共方法,所有入口都要安排检票程序就相当于我们前面提到的并发访问策略。
将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略。
下面的计数器程序共享变量只有一个,就是 value,我们把它作为 Counter 类的属性,并且将两个公共方法 get() 和 addOne() 声明为同步方法,这样 Counter 类就成为一个线程安全的类了。
-
- public class Counter {
- private long value;
- synchronized long get(){
- return value;
- }
- synchronized long addOne(){
- return ++value;
- }
- }
很多共享变量的值是不会变的,例如信用卡账户的卡号、姓名、身份证。对于这些不会发生变化的共享变量,建议你用 final 关键字来修饰。这样既能避免并发问题,也能很明了地表明你的设计意图,让后面接手你程序的兄弟知道,你已经考虑过这些共享变量的并发安全问题了。
(2)识别共享变量间的约束条件
因为识别共享变量间的约束条件,决定了并发访问策略。例如,库存管理里面有个合理库存的概念,库存量不能太高,也不能太低,它有一个上限和一个下限。关于这些约束条件,我们可以用下面的程序来模拟一下。在类 SafeWM 中,声明了两个成员变量 upper 和 lower,分别代表库存上限和库存下限,这两个变量用了 AtomicLong 这个原子类,原子类是线程安全的,所以这两个成员变量的 set 方法就不需要同步了。
-
- public class SafeWM {
- // 库存上限
- private final AtomicLong upper =
- new AtomicLong(0);
- // 库存下限
- private final AtomicLong lower =
- new AtomicLong(0);
- // 设置库存上限
- void setUpper(long v){
- upper.set(v);
- }
- // 设置库存下限
- void setLower(long v){
- lower.set(v);
- }
- // 省略其他业务代码
- }
虽说上面的代码是没有问题的,但是忽视了一个约束条件,就是库存下限要小于库存上限,这个约束条件能够直接加到上面的 set 方法上吗?我们先直接加一下看看效果(如下面代码所示)。我们在 setUpper() 和 setLower() 中增加了参数校验,这乍看上去好像是对的,但其实存在并发问题,问题在于存在竞态条件。
其实当你看到代码里出现 if 语句的时候,就应该立刻意识到可能存在竞态条件。
我们假设库存的下限和上限分别是 (2,10),线程 A 调用 setUpper(5) 将上限设置为 5,线程 B 调用 setLower(7) 将下限设置为 7,如果线程 A 和线程 B 完全同时执行,你会发现线程 A 能够通过参数校验,因为这个时候,下限还没有被线程 B 设置,还是 2,而 5>2;线程 B 也能够通过参数校验,因为这个时候,上限还没有被线程 A 设置,还是 10,而 7<10。当线程 A 和线程 B 都通过参数校验后,就把库存的下限和上限设置成 (7, 5) 了,显然此时的结果是不符合库存下限要小于库存上限这个约束条件的。
-
- public class SafeWM {
- // 库存上限
- private final AtomicLong upper =
- new AtomicLong(0);
- // 库存下限
- private final AtomicLong lower =
- new AtomicLong(0);
- // 设置库存上限
- void setUpper(long v){
- // 检查参数合法性
- if (v < lower.get()) {
- throw new IllegalArgumentException();
- }
- upper.set(v);
- }
- // 设置库存下限
- void setLower(long v){
- // 检查参数合法性
- if (v > upper.get()) {
- throw new IllegalArgumentException();
- }
- lower.set(v);
- }
- // 省略其他业务代码
- }
在没有识别出库存下限要小于库存上限这个约束条件之前,我们制定的并发访问策略是利用原子类,但是这个策略,完全不能保证库存下限要小于库存上限这个约束条件。所以说,在设计阶段,我们一定要识别出所有共享变量之间的约束条件,如果约束条件识别不足,很可能导致制定的并发访问策略南辕北辙。
共享变量之间的约束条件,反映在代码里,基本上都会有 if 语句,所以,一定要特别注意竞态条件。
(3)制定并发访问策略
① 避免共享:避免共享的技术主要是利于线程本地存储以及为每个任务分配独立的线程。
② 不变模式:这个在 Java 领域应用的很少,但在其他领域却有着广泛的应用,例如 Actor 模式、CSP 模式以及函数式编程的基础都是不变模式。
③ 管程及其他同步工具:Java 领域万能的解决方案是管程,但是对于很多特定场景,使用 Java 并发包提供的读写锁、并发容器等同步工具会更好。
(4)宏观原则,有助于你写出“健壮”的并发程序。这些原则主要有以下三条。
优先使用成熟的工具类:Java SDK 并发包里提供了丰富的工具类,基本上能满足你日常的需要,建议你熟悉它们,用好它们,而不是自己再“发明轮子”,毕竟并发工具类不是随随便便就能发明成功的。
迫不得已时才使用低级的同步原语:低级的同步原语主要指的是 synchronized、Lock、Semaphore 等,这些虽然感觉简单,但实际上并没那么简单,一定要小心使用。
避免过早优化:安全第一,并发程序首先要保证安全,出现性能瓶颈后再优化。在设计期和开发期,很多人经常会情不自禁地预估性能的瓶颈,并对此实施优化,但残酷的现实却是:性能瓶颈不是你想预估就能预估的。
15、 用锁的最佳实践
(1)synchronized (new Object()) 这行代码很多同学已经分析出来了,每次调用方法 get()、addOne() 都创建了不同的锁,相当于无锁。这里需要你再次加深一下记忆,“一个合理的受保护资源与锁之间的关联关系应该是 N:1”。只有共享一把锁才能起到互斥的作用。
-
- class SafeCalc {
- long value = 0L;
- long get() {
- synchronized (new Object()) {
- return value;
- }
- }
- void addOne() {
- synchronized (new Object()) {
- value += 1;
- }
- }
- }
(2)它的核心问题有两点:一个是锁有可能会变化,另一个是 Integer 和 String 类型的对象不适合做锁。如果锁发生变化,就意味着失去了互斥功能。 Integer 和 String 类型的对象在 JVM 里面是可能被重用的,除此之外,JVM 里可能被重用的对象还有 Boolean,那重用意味着什么呢?意味着你的锁可能被其他代码使用,如果其他代码 synchronized(你的锁),而且不释放,那你的程序就永远拿不到锁,这是隐藏的风险。
-
- class Account {
- // 账户余额
- private Integer balance;
- // 账户密码
- private String password;
- // 取款
- void withdraw(Integer amt) {
- synchronized(balance) {
- if (this.balance > amt){
- this.balance -= amt;
- }
- }
- }
- // 更改密码
- void updatePassword(String pw){
- synchronized(password) {
- this.password = pw;
- }
- }
- }
通过这两个反例,我们可以总结出这样一个基本的原则:锁,应是私有的、不可变的、不可重用的。
正确用法如下
-
- // 普通对象锁
- private final Object
- lock = new Object();
- // 静态对象锁
- private static final Object
- lock = new Object();
16、锁的性能要看场景
比较while(!actr.apply(this, target));这个方法和synchronized(Account.class)的性能哪个更好。
这个要看具体的应用场景,不同应用场景它们的性能表现是不同的。在这个思考题里面,如果转账操作非常费时,那么前者的性能优势就显示出来了,因为前者允许 A->B、C->D 这种转账业务的并行。不同的并发场景用不同的方案,这是并发编程里面的一项基本原则;没有通吃的技术和方案,因为每种技术和方案都是优缺点和适用场景的。
17、竞态条件需要格外关注
竞态条件问题非常容易被忽略,contains() 和 add() 方法虽然都是线程安全的,但是组合在一起却不是线程安全的。所以你的程序里如果存在类似的组合操作,一定要小心。
-
- void addIfNotExist(Vector v,
- Object o){
- if(!v.contains(o)) {
- v.add(o);
- }
- }
你需要将共享变量 v 封装在对象的内部,而后控制并发访问的路径,这样就能有效防止对 Vector v 变量的滥用,从而导致并发问题。
-
- class SafeVector{
- private Vector v;
- // 所有公共方法增加同步控制
- synchronized
- void addIfNotExist(Object o){
- if(!v.contains(o)) {
- v.add(o);
- }
- }
- }
18、方法调用是先计算参数
方法的调用,是先计算参数,然后将参数压入调用栈之后才会执行方法体,
-
- while(idx++ < 10000) {
- set(get()+1);
- }
先计算参数这个事情也是容易被忽视的细节。例如,下面写日志的代码,如果日志级别设置为 INFO,虽然这行代码不会写日志,但是会计算"The var1:" + var1 + ", var2:" + var2的值,因为方法调用前会先计算参数。
-
- logger.debug("The var1:" +
- var1 + ", var2:" + var2);
更好地写法应该是下面这样,这种写法仅仅是讲参数压栈,而没有参数的计算。使用{}占位符是写日志的一个良好习惯。
-
- logger.debug("The var1:{}, var2:{}",
- var1, var2);
19、InterruptedException 异常处理需小心
注意 InterruptedException 的处理方式。当你调用 Java 对象的 wait() 方法或者线程的 sleep() 方法时,需要捕获并处理 InterruptedException 异常,在思考题里面(如下所示),本意是通过 isInterrupted() 检查线程是否被中断了,如果中断了就退出 while 循环。当其他线程通过调用th.interrupt().来中断 th 线程时,会设置 th 线程的中断标志位,从而使th.isInterrupted()返回 true,这样就能退出 while 循环了。
-
- Thread th = Thread.currentThread();
- while(true) {
- if(th.isInterrupted()) {
- break;
- }
- // 省略业务代码无数
- try {
- Thread.sleep(100);
- }catch (InterruptedException e){
- e.printStackTrace();
- }
- }
这看上去一点问题没有,实际上却是几乎起不了作用。原因是这段代码在执行的时候,大部分时间都是阻塞在 sleep(100) 上,当其他线程通过调用th.interrupt().来中断 th 线程时,大概率地会触发 InterruptedException 异常,在触发 InterruptedException 异常的同时,JVM 会同时把线程的中断标志位清除,所以这个时候th.isInterrupted()返回的是 false。
正确的处理方式应该是捕获异常之后重新设置中断标志位,也就是下面这样:
-
- try {
- Thread.sleep(100);
- }catch(InterruptedException e){
- // 重新设置中断标志位
- th.interrupt();
- }
20、理论值 or 经验值
经验值对于很多“I/O 耗时 / CPU 耗时”不太容易确定的系统来说,却是一个很好到初始值。
最佳线程数最终还是靠压测来确定的,实际工作中大家面临的系统,“I/O 耗时 / CPU 耗时”往往都大于 1,所以基本上都是在这个初始值的基础上增加。增加的过程中,应关注线程数是如何影响吞吐量和延迟的。一般来讲,随着线程数的增加,吞吐量会增加,延迟也会缓慢增加;但是当线程数增加到一定程度,吞吐量就会开始下降,延迟会迅速增加。这个时候基本上就是线程能够设置的最大值了。
实际工作中,不同的 I/O 模型对最佳线程数的影响非常大,例如大名鼎鼎的 Nginx 用的是非阻塞 I/O,采用的是多进程单线程结构,Nginx 本来是一个 I/O 密集型系统,但是最佳进程数设置的却是 CPU 的核数,完全参考的是 CPU 密集型的算法。所以,理论我们还是要活学活用。
21、Lock和Condition:隐藏在并发包中的管程
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
(1)再造管程的理由
死锁问题的时候,提出了一个破坏不可抢占条件方案,但是这个方案 synchronized 没有办法解决。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。但我们希望的是:
对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?我觉得有三种方案。
① 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
② 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
③ 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
体现在 API 上,就是 Lock 接口的三个方法。详情如下:
-
- // 支持中断的API
- void lockInterruptibly()
- throws InterruptedException;
- // 支持超时的API
- boolean tryLock(long time, TimeUnit unit)
- throws InterruptedException;
- // 支持非阻塞获取锁的API
- boolean tryLock();
(2)如何保证可见性
Java SDK 里面 Lock 的使用,有一个经典的范例,就是try{}finally{},需要重点关注的是在 finally 里面释放锁。这个范例无需多解释,你看一下下面的代码就明白了。但是有一点需要解释一下,那就是可见性是怎么保证的。你已经知道 Java 里多线程的可见性是通过 Happens-Before 规则保证的,而 synchronized 之所以能够保证可见性,也是因为有一条 synchronized 相关的规则:synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。那 Java SDK 里面 Lock 靠什么保证可见性呢?例如在下面的代码中,线程 T1 对 value 进行了 +=1 操作,那后续的线程 T2 能够看到 value 的正确结果吗?
-
- class X {
- private final Lock rtl =
- new ReentrantLock();
- int value;
- public void addOne() {
- // 获取锁
- rtl.lock();
- try {
- value+=1;
- } finally {
- // 保证锁能释放
- rtl.unlock();
- }
- }
- }
答案必须是肯定的。Java SDK 里面锁的实现非常复杂,这里我就不展开细说了,但是原理还是需要简单介绍一下:它是利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:
① 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
② volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
③ 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
-
- class SampleLock {
- volatile int state;
- // 加锁
- lock() {
- // 省略代码无数
- state = 1;
- }
- // 解锁
- unlock() {
- // 省略代码无数
- state = 0;
- }
- }
(3)什么是可重入锁
我们创建的锁的具体类名是 ReentrantLock,这个翻译过来叫可重入锁,这个概念前面我们一直没有介绍过。所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。例如下面代码中,当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。
除了可重入锁,可能你还听说过可重入函数,可重入函数怎么理解呢?指的是线程可以重复调用?显然不是,所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。
-
- class X {
- private final Lock rtl =
- new ReentrantLock();
- int value;
- public int get() {
- // 获取锁
- rtl.lock(); ②
- try {
- return value;
- } finally {
- // 保证锁能释放
- rtl.unlock();
- }
- }
- public void addOne() {
- // 获取锁
- rtl.lock();
- try {
- value = 1 + get(); ①
- } finally {
- // 保证锁能释放
- rtl.unlock();
- }
- }
- }
(3)公平锁与非公平锁
在使用 ReentrantLock 的时候,你会发现 ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。
-
- //无参构造函数:默认非公平锁
- public ReentrantLock() {
- sync = new NonfairSync();
- }
- //根据公平策略参数创建锁
- public ReentrantLock(boolean fair){
- sync = fair ? new FairSync()
- : new NonfairSync();
- }
入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
(4)用锁的最佳实践
① 永远只在更新对象的成员变量时加锁
② 永远只在访问可变的成员变量时加锁
③ 永远不在调用其他对象的方法时加锁
④ 减少锁的持有时间
⑤ 减小锁的粒度
这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程 sleep() 的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。
22、Lock和Condition:Dubbo如何用管程实现异步转同步?
Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。
在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。
(1)那如何利用两个条件变量快速实现阻塞队列呢?
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队),这个例子我们前面在介绍管程的时候详细说过,这里就不再赘述。相关的代码,我这里重新列了出来,你可以温故知新一下。
-
- public class BlockedQueue<T>{
- final Lock lock =
- new ReentrantLock();
- // 条件变量:队列不满
- final Condition notFull =
- lock.newCondition();
- // 条件变量:队列不空
- final Condition notEmpty =
- lock.newCondition();
-
- // 入队
- void enq(T x) {
- lock.lock();
- try {
- while (队列已满){
- // 等待队列不满
- notFull.await();
- }
- // 省略入队操作...
- //入队后,通知可出队
- notEmpty.signal();
- }finally {
- lock.unlock();
- }
- }
- // 出队
- void deq(){
- lock.lock();
- try {
- while (队列已空){
- // 等待队列不空
- notEmpty.await();
- }
- // 省略出队操作...
- //出队后,通知可入队
- notFull.signal();
- }finally {
- lock.unlock();
- }
- }
- }
不过,这里你需要注意,Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。
(2)同步与异步
通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
比如在下面的代码里,有一个计算圆周率小数点后 100 万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf("hello world")了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf("hello world"),这个就属于异步。
-
- // 计算圆周率小说点后100万位
- String pai1M() {
- //省略代码无数
- }
-
- pai1M()
- printf("hello world")
同步,是 Java 代码默认的处理方式。如果你想让你的程序支持异步,可以通过下面两种方式来实现:
① 调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用;
② 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法。
(3)Dubbo 源码分析
其实在编程领域,异步的场景还是挺多的,比如 TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。可能你会觉得奇怪,平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?
其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下 Dubbo 的相关源码。
对于下面一个简单的 RPC 调用,默认情况下 sayHello() 方法,是个同步方法,也就是说,执行 service.sayHello(“dubbo”) 的时候,线程会停下来等结果。
- DemoService service = 初始化部分省略
- String message =
- service.sayHello("dubbo");
- System.out.println(message);
如果此时你将调用线程 dump 出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是 TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明 Dubbo 帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在 DefaultFuture.get() 方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture 这个类实现的。
不过为了理清前后关系,还是有必要分析一下调用 DefaultFuture.get() 之前发生了什么。DubboInvoker 的 108 行调用了 DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout) 方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
-
- public class DubboInvoker{
- Result doInvoke(Invocation inv){
- // 下面这行就是源码中108行
- // 为了便于展示,做了修改
- return currentClient
- .request(inv, timeout)
- .get();
- }
- }
DefaultFuture 这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待 - 通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看 Dubbo 是怎么实现的。
-
- // 创建锁与条件变量
- private final Lock lock
- = new ReentrantLock();
- private final Condition done
- = lock.newCondition();
-
- // 调用方通过该方法等待结果
- Object get(int timeout){
- long start = System.nanoTime();
- lock.lock();
- try {
- while (!isDone()) {
- done.await(timeout);
- long cur=System.nanoTime();
- if (isDone() ||
- cur-start > timeout){
- break;
- }
- }
- } finally {
- lock.unlock();
- }
- if (!isDone()) {
- throw new TimeoutException();
- }
- return returnFromResponse();
- }
- // RPC结果是否已经返回
- boolean isDone() {
- return response != null;
- }
- // RPC结果返回时调用该方法
- private void doReceived(Response res) {
- lock.lock();
- try {
- response = res;
- if (done != null) {
- done.signal();
- }
- } finally {
- lock.unlock();
- }
- }
调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。
当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。
最近这几年,工作中需要异步处理的越来越多了,其中有一个主要原因就是有些 API 本身就是异步 API。例如 websocket 也是一个异步的通信协议,如果基于这个协议实现一个简单的 RPC,你也会遇到异步转同步的问题。现在很多公有云的 API 本身也是异步的,例如创建云主机,就是一个异步的 API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个 API 去轮询云主机的状态。如果你需要在项目内部封装创建云主机的 API,你也会面临异步转同步的问题,因为同步的 API 更易用。
23、Semaphore:如何快速实现一个限流器?
Semaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。
信号量是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,在这之后的 15 年,信号量一直都是并发编程领域的终结者,直到 1980 年管程被提出来,我们才有了第二选择。目前几乎所有支持并发编程的语言都支持信号量机制,所以学好信号量还是很有必要的。
(1)信号量模型
可以简单概括为:一个计数器,一个等待队列,三个方法
这三个方法详细的语义具体如下所示。
① init():设置计数器的初始值。
② down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
③ up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。
信号量模型
-
- class Semaphore{
- // 计数器
- int count;
- // 等待队列
- Queue queue;
- // 初始化操作
- Semaphore(int c){
- this.count=c;
- }
- //
- void down(){
- this.count--;
- if(this.count<0){
- //将当前线程插入等待队列
- //阻塞当前线程
- }
- }
- void up(){
- this.count++;
- if(this.count<=0) {
- //移除等待队列中的某个线程T
- //唤醒线程T
- }
- }
- }
这里再插一句,信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。
(2)如何使用信号量
那具体该如何使用呢?其实你想想红绿灯就可以了。十字路口的红绿灯可以控制交通,得益于它的一个关键规则:车辆在通过路口前必须先检查是否是绿灯,只有绿灯才能通行。
其实,信号量的使用也是类似的。这里我们还是用累加器的例子来说明信号量的使用吧。在累加器的例子里面,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。那这种情况用信号量怎么控制呢?
其实很简单,就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作就可以了。下面是 Java 代码的示例,acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作。
-
- static int count;
- //初始化信号量
- static final Semaphore s
- = new Semaphore(1);
- //用信号量保证互斥
- static void addOne() {
- s.acquire();
- try {
- count+=1;
- } finally {
- s.release();
- }
- }
下面我们再来分析一下,信号量是如何保证互斥的。假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0,另外一个线程(T2)则是将计数器减为 -1。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0,按照信号量模型里对 down() 操作的描述,线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count+=1;。
当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。
(3)快速实现一个限流器
其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。
比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
其实前不久,我在工作中也遇到了一个对象池的需求。所谓对象池呢,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用 List 保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。
信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。
其实前不久,我在工作中也遇到了一个对象池的需求。所谓对象池呢,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用 List 保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。
信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码
-
- class ObjPool<T, R> {
- final List<T> pool;
- // 用信号量实现限流器
- final Semaphore sem;
- // 构造函数
- ObjPool(int size, T t){
- pool = new Vector<T>(){};
- for(int i=0; i<size; i++){
- pool.add(t);
- }
- sem = new Semaphore(size);
- }
- // 利用对象池的对象,调用func
- R exec(Function<T,R> func) {
- T t = null;
- sem.acquire();
- try {
- t = pool.remove(0);
- return func.apply(t);
- } finally {
- pool.add(t);
- sem.release();
- }
- }
- }
- // 创建对象池
- ObjPool<Long, String> pool =
- new ObjPool<Long, String>(10, 2);
- // 通过对象池获取t,之后执行
- pool.exec(t -> {
- System.out.println(t);
- return t.toString();
- });
我们用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
使用信号量,我们可以轻松地实现一个限流器,使用起来还是非常简单的。
Java 在并发编程领域走的很快,重点支持的还是管程模型。 管程模型理论上解决了信号量模型的一些不足,主要体现在易用性和工程化方面,例如用信号量解决我们曾经提到过的阻塞队列问题,就比管程模型麻烦很多
24、ReadWriteLock:如何快速实现一个完备的缓存?
那 Java SDK 并发包里为什么还有很多其他的工具类呢?原因很简单:分场景优化性能,提升易用性。
介绍一种非常普遍的并发场景:读多写少场景。实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。
针对读多写少这种并发场景,Java SDK 并发包提供了读写锁——ReadWriteLock,非常容易使用,并且性能很好。
(1)那什么是读写锁呢?
读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:
① 允许多个线程同时读共享变量;
② 只允许一个线程写共享变量;
③ 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。
(2)快速实现一个缓存
用 ReadWriteLock 快速实现一个通用的缓存工具类。
在下面的代码中,我们声明了一个 Cache 类,其中类型参数 K 代表缓存里 key 的类型,V 代表缓存里 value 的类型。缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap 不是线程安全的,这里我们使用读写锁 ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过 rwl 创建了一把读锁和一把写锁。
Cache 这个工具类,我们提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法 put()。读缓存需要用到读锁,读锁的使用和前面我们介绍的 Lock 的使用是相同的,都是 try{}finally{}这个编程范式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。这样看来,读写锁的使用还是非常简单的。
-
- class Cache<K,V> {
- final Map<K, V> m =
- new HashMap<>();
- final ReadWriteLock rwl =
- new ReentrantReadWriteLock();
- // 读锁
- final Lock r = rwl.readLock();
- // 写锁
- final Lock w = rwl.writeLock();
- // 读缓存
- V get(K key) {
- r.lock();
- try { return m.get(key); }
- finally { r.unlock(); }
- }
- // 写缓存
- V put(K key, V value) {
- w.lock();
- try { return m.put(key, v); }
- finally { w.unlock(); }
- }
- }
如果你曾经使用过缓存的话,你应该知道使用缓存首先要解决缓存数据的初始化问题。缓存数据的初始化,可以采用一次性加载的方式,也可以使用按需加载的方式。
如果源头数据的数据量不大,就可以采用一次性加载的方式,这种方式最简单(可参考下图),只需在应用启动的时候把源头数据查询出来,依次调用类似上面示例代码中的 put() 方法就可以了。
如果源头数据量非常大,那么就需要按需加载了,按需加载也叫懒加载,指的是只有当应用查询缓存,并且数据不在缓存里的时候,才触发加载源头相关数据进缓存的操作。下面你可以结合文中示意图看看如何利用 ReadWriteLock 来实现缓存的按需加载。
(3)实现缓存的按需加载
文中下面的这段代码实现了按需加载的功能,这里我们假设缓存的源头是数据库。需要注意的是,如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,所以在代码中的⑤处,我们调用了 w.lock() 来获取写锁。
另外,还需要注意的是,在获取写锁之后,我们并没有直接去查询数据库,而是在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。为什么我们要再次验证呢?
-
- class Cache<K,V> {
- final Map<K, V> m =
- new HashMap<>();
- final ReadWriteLock rwl =
- new ReentrantReadWriteLock();
- final Lock r = rwl.readLock();
- final Lock w = rwl.writeLock();
-
- V get(K key) {
- V v = null;
- //读缓存
- r.lock(); ①
- try {
- v = m.get(key); ②
- } finally{
- r.unlock(); ③
- }
- //缓存中存在,返回
- if(v != null) { ④
- return v;
- }
- //缓存中不存在,查询数据库
- w.lock(); ⑤
- try {
- //再次验证
- //其他线程可能已经查询过数据库
- v = m.get(key); ⑥
- if(v == null){ ⑦
- //查询数据库
- v=省略代码无数
- m.put(key, v);
- }
- } finally{
- w.unlock();
- }
- return v;
- }
- }
原因是在高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get() 方法,并且参数 key 也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程 T2 和 T3 会再有一个线程能够获取写锁,假设是 T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3 完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。
(4)读写锁的升级与降级
上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?详细的代码如下。
-
- //读缓存
- r.lock(); ①
- try {
- v = m.get(key); ②
- if (v == null) {
- w.lock();
- try {
- //再次验证并更新缓存
- //省略详细代码
- } finally{
- w.unlock();
- }
- }
- } finally{
- r.unlock(); ③
- }
这样看上去好像是没有问题的,先是获取读锁,然后再升级为写锁,对此还有个专业的名字,叫锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个你一定要注意。
不过,虽然锁的升级是不允许的,但是锁的降级却是允许的。以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。你会发现在代码①处,获取读锁的时候线程还是持有写锁的,这种锁的降级是支持的。
-
- class CachedData {
- Object data;
- volatile boolean cacheValid;
- final ReadWriteLock rwl =
- new ReentrantReadWriteLock();
- // 读锁
- final Lock r = rwl.readLock();
- //写锁
- final Lock w = rwl.writeLock();
-
- void processCachedData() {
- // 获取读锁
- r.lock();
- if (!cacheValid) {
- // 释放读锁,因为不允许读锁的升级
- r.unlock();
- // 获取写锁
- w.lock();
- try {
- // 再次检查状态
- if (!cacheValid) {
- data = ...
- cacheValid = true;
- }
- // 释放写锁前,降级为读锁
- // 降级是可以的
- r.lock(); ①
- } finally {
- // 释放写锁
- w.unlock();
- }
- }
- // 此处仍然持有读锁
- try {use(data);}
- finally {r.unlock();}
- }
- }
读写锁类似于 ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。
今天我们用 ReadWriteLock 实现了一个简单的缓存,这个缓存虽然解决了缓存的初始化问题,但是没有解决缓存数据与源头数据的同步问题,这里的数据同步指的是保证缓存数据和源头数据的一致性。解决数据同步问题的一个最简单的方案就是超时机制。所谓超时机制指的是加载进缓存的数据不是长久有效的,而是有时效的,当缓存的数据超过时效,也就是超时之后,这条数据在缓存中就失效了。而访问缓存中失效的数据,会触发缓存重新从源头把数据加载进缓存。
当然也可以在源头数据发生变化时,快速反馈给缓存,但这个就要依赖具体的场景了。例如 MySQL 作为数据源头,可以通过近实时地解析 binlog 来识别数据是否发生了变化,如果发生了变化就将最新的数据推送给缓存。另外,还有一些方案采取的是数据库和缓存的双写方案。
具体采用哪种方案,还是要看应用的场景。
25、StampedLock:有没有比读写锁更快的锁?
Java 在 1.8 这个版本里,提供了一种叫 StampedLock 的锁,它的性能就比读写锁还要好。
(1)StampedLock 支持的三种锁模式
我们先来看看在使用上 StampedLock 和上一篇文章讲的 ReadWriteLock 有哪些区别。
ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。
-
- final StampedLock sl =
- new StampedLock();
-
- // 获取/释放悲观读锁示意代码
- long stamp = sl.readLock();
- try {
- //省略业务相关代码
- } finally {
- sl.unlockRead(stamp);
- }
-
- // 获取/释放写锁示意代码
- long stamp = sl.writeLock();
- try {
- //省略业务相关代码
- } finally {
- sl.unlockWrite(stamp);
- }
StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。
注意这里,我们用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。
文中下面这段代码是出自 Java SDK 官方示例,并略做了修改。在 distanceFromOrigin() 这个方法中,首先通过调用 tryOptimisticRead() 获取了一个 stamp,这里的 tryOptimisticRead() 就是我们前面提到的乐观读。之后将共享变量 x 和 y 读入方法的局部变量中,不过需要注意的是,由于 tryOptimisticRead() 是无锁的,所以共享变量 x 和 y 读入方法局部变量时,x 和 y 有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用 validate(stamp) 来实现的。
-
- class Point {
- private int x, y;
- final StampedLock sl =
- new StampedLock();
- //计算到原点的距离
- int distanceFromOrigin() {
- // 乐观读
- long stamp =
- sl.tryOptimisticRead();
- // 读入局部变量,
- // 读的过程数据可能被修改
- int curX = x, curY = y;
- //判断执行读操作期间,
- //是否存在写操作,如果存在,
- //则sl.validate返回false
- if (!sl.validate(stamp)){
- // 升级为悲观读锁
- stamp = sl.readLock();
- try {
- curX = x;
- curY = y;
- } finally {
- //释放悲观读锁
- sl.unlockRead(stamp);
- }
- }
- return Math.sqrt(
- curX * curX + curY * curY);
- }
- }
在上面这个代码示例中,如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。升级为悲观读锁,代码简练且不易出错,建议你在具体实践时也采用这样的方法。
(2)进一步理解乐观读
如果你曾经用过数据库的乐观锁,可能会发现 StampedLock 的乐观读和数据库的乐观锁有异曲同工之妙。的确是这样的,就拿我个人来说,我是先接触的数据库里的乐观锁,然后才接触的 StampedLock,我就觉得我前期数据库里乐观锁的学习对于后面理解 StampedLock 的乐观读有很大帮助,所以这里有必要再介绍一下数据库里的乐观锁。
还记得我第一次使用数据库乐观锁的场景是这样的:在 ERP 的生产模块里,会有多个人通过 ERP 系统提供的 UI 同时修改同一条生产订单,那如何保证生产订单数据是并发安全的呢?我采用的方案就是乐观锁。
乐观锁的实现很简单,在生产订单的表 product_doc 里增加了一个数值型版本号字段 version,每次更新 product_doc 这个表的时候,都将 version 字段加 1。生产订单的 UI 在展示的时候,需要查询数据库,此时将这个 version 字段和其他业务字段一起返回给生产订单 UI。假设用户查询的生产订单的 id=777,那么 SQL 语句类似下面这样:
-
- select id,... ,version
- from product_doc
- where id=777
用户在生产订单 UI 执行保存操作的时候,后台利用下面的 SQL 语句更新生产订单,此处我们假设该条生产订单的 version=9。
-
- update product_doc
- set version=version+1,...
- where id=777 and version=9
如果这条 SQL 语句执行成功并且返回的条数等于 1,那么说明从生产订单 UI 执行查询操作到执行保存操作期间,没有其他人修改过这条数据。因为如果这期间其他人修改过这条数据,那么版本号字段一定会大于 9。
你会发现数据库里的乐观锁,查询的时候需要把 version 字段查出来,更新的时候要利用 version 字段做验证。这个 version 字段就类似于 StampedLock 里面的 stamp。这样对比着看,相信你会更容易理解 StampedLock 里乐观读的用法。
(3)StampedLock 使用注意事项
对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是 StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。
StampedLock 在命名上并没有增加 Reentrant,想必你已经猜测到 StampedLock 应该是不可重入的。事实上,的确是这样的,StampedLock 不支持重入。这个是在使用中必须要特别注意的。
另外,StampedLock 的悲观读锁、写锁都不支持条件变量,这个也需要你注意。
还有一点需要特别注意,那就是:如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%。
-
- final StampedLock lock
- = new StampedLock();
- Thread T1 = new Thread(()->{
- // 获取写锁
- lock.writeLock();
- // 永远阻塞在此处,不释放写锁
- LockSupport.park();
- });
- T1.start();
- // 保证T1获取写锁
- Thread.sleep(100);
- Thread T2 = new Thread(()->
- //阻塞在悲观读锁
- lock.readLock()
- );
- T2.start();
- // 保证T2阻塞在读锁
- Thread.sleep(100);
- //中断线程T2
- //会导致线程T2所在CPU飙升
- T2.interrupt();
- T2.join();
所以,使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()。这个规则一定要记清楚。总结
StampedLock 的使用看上去有点复杂,但是如果你能理解乐观锁背后的原理,使用起来还是比较流畅的。建议你认真揣摩 Java 的官方示例,这个示例基本上就是一个最佳实践。我们把 Java 官方示例精简后,形成下面的代码模板,建议你在实际工作中尽量按照这个模板来使用 StampedLock。
StampedLock 读模板:
-
- final StampedLock sl =
- new StampedLock();
-
- // 乐观读
- long stamp =
- sl.tryOptimisticRead();
- // 读入方法局部变量
- ......
- // 校验stamp
- if (!sl.validate(stamp)){
- // 升级为悲观读锁
- stamp = sl.readLock();
- try {
- // 读入方法局部变量
- .....
- } finally {
- //释放悲观读锁
- sl.unlockRead(stamp);
- }
- }
- //使用方法局部变量执行业务操作
- ......
StampedLock 写模板:
-
- long stamp = sl.writeLock();
- try {
- // 写共享变量
- ......
- } finally {
- sl.unlockWrite(stamp);
- }
26、CountDownLatch和CyclicBarrier:如何让多线程步调一致?
对账系统最近越来越慢了,能不能快速优化一下。我了解了对账系统的业务后,发现还是挺简单的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。
对账系统的处理逻辑很简单,你可以参考下面的对账系统流程图。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
对账系统的代码抽象之后,也很简单,核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
-
- while(存在未对账订单){
- // 查询未对账订单
- pos = getPOrders();
- // 查询派送单
- dos = getDOrders();
- // 执行对账操作
- diff = check(pos, dos);
- // 差异写入差异库
- save(diff);
- }
(1)利用并行优化对账系统
首先要找到这个对账系统的瓶颈所在。
目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理。
所以,这里你应该能够看出来这个对账系统里的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?显然是可以的,因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后,执行过程如下图所示。对比一下单线程的执行示意图,你会发现同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。
思路有了,下面我们再来看看如何用代码实现。在下面的代码中,我们创建了两个线程 T1 和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意的是:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 这两个操作,为此我们通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和 T2 线程退出时,调用 T1.join() 和 T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。
-
- while(存在未对账订单){
- // 查询未对账订单
- Thread T1 = new Thread(()->{
- pos = getPOrders();
- });
- T1.start();
- // 查询派送单
- Thread T2 = new Thread(()->{
- dos = getDOrders();
- });
- T2.start();
- // 等待T1、T2结束
- T1.join();
- T2.join();
- // 执行对账操作
- diff = check(pos, dos);
- // 差异写入差异库
- save(diff);
- }
(2)用 CountDownLatch 实现线程等待
经过上面的优化之后,基本上可以跟老板汇报收工了,但还是有点美中不足,相信你也发现了,while 循环里面每次都会创建新的线程,而创建线程可是个耗时的操作。所以最好是创建出来的线程能够循环利用,估计这时你已经想到线程池了,是的,线程池就能解决这个问题。
而下面的代码就是用线程池优化后的:我们首先创建了一个固定大小为 2 的线程池,之后在 while 循环里重复利用。一切看上去都很顺利,但是有个问题好像无解了,那就是主线程如何知道 getPOrders() 和 getDOrders() 这两个操作什么时候执行完。前面主线程通过调用线程 T1 和 T2 的 join() 方法来等待线程 T1 和 T2 退出,但是在线程池的方案里,线程根本就不会退出,所以 join() 方法已经失效了。
-
- // 创建2个线程的线程池
- Executor executor =
- Executors.newFixedThreadPool(2);
- while(存在未对账订单){
- // 查询未对账订单
- executor.execute(()-> {
- pos = getPOrders();
- });
- // 查询派送单
- executor.execute(()-> {
- dos = getDOrders();
- });
-
- /* ??如何实现等待??*/
-
- // 执行对账操作
- diff = check(pos, dos);
- // 差异写入差异库
- save(diff);
- }
那如何解决这个问题呢?你可以开动脑筋想出很多办法,最直接的办法是弄一个计数器,初始值设置成 2,当执行完pos = getPOrders();这个操作之后将计数器减 1,执行完dos = getDOrders();之后也将计数器减 1,在主线程里,等待计数器等于 0;当计数器等于 0 时,说明这两个查询操作执行完了。等待计数器等于 0 其实就是一个条件变量,用管程实现起来也很简单。
不过我并不建议你在实际项目中去实现上面的方案,因为 Java 并发包里已经提供了实现类似功能的工具类:CountDownLatch,我们直接使用就可以了。下面的代码示例中,在 while 循环里面,我们首先创建了一个 CountDownLatch,计数器的初始值等于 2,之后在pos = getPOrders();和dos = getDOrders();两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用 latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。
-
- // 创建2个线程的线程池
- Executor executor =
- Executors.newFixedThreadPool(2);
- while(存在未对账订单){
- // 计数器初始化为2
- CountDownLatch latch =
- new CountDownLatch(2);
- // 查询未对账订单
- executor.execute(()-> {
- pos = getPOrders();
- latch.countDown();
- });
- // 查询派送单
- executor.execute(()-> {
- dos = getDOrders();
- latch.countDown();
- });
-
- // 等待两个查询操作结束
- latch.await();
-
- // 执行对账操作
- diff = check(pos, dos);
- // 差异写入差异库
- save(diff);
- }
(3)进一步优化性能
经过上面的重重优化之后,长出一口气,终于可以交付了。不过在交付之前还需要再次审视一番,看看还有没有优化的余地,仔细看还是有的。
前面我们将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。
那接下来我们再来思考一下如何实现这步优化,两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者 - 消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。
不过针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
下面再来看如何用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。
下面这幅图形象地描述了上面的意图:线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。
(4)用 CyclicBarrier 实现线程同步
下面我们就来实现上面提到的方案。这个方案的难点有两个:一个是线程 T1 和 T2 要做到步调一致,另一个是要能够通知到线程 T3。
你依然可以利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器了。
同样,还是建议你不要在实际项目中这么做,因为 Java 并发包里也已经提供了相关的工具类:CyclicBarrier。在下面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。
非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。
-
- // 订单队列
- Vector<P> pos;
- // 派送单队列
- Vector<D> dos;
- // 执行回调的线程池
- Executor executor =
- Executors.newFixedThreadPool(1);
- final CyclicBarrier barrier =
- new CyclicBarrier(2, ()->{
- executor.execute(()->check());
- });
-
- void check(){
- P p = pos.remove(0);
- D d = dos.remove(0);
- // 执行对账操作
- diff = check(p, d);
- // 差异写入差异库
- save(diff);
- }
-
- void checkAll(){
- // 循环查询订单库
- Thread T1 = new Thread(()->{
- while(存在未对账订单){
- // 查询订单库
- pos.add(getPOrders());
- // 等待
- barrier.await();
- }
- });
- T1.start();
- // 循环查询运单库
- Thread T2 = new Thread(()->{
- while(存在未对账订单){
- // 查询运单库
- dos.add(getDOrders());
- // 等待
- barrier.await();
- }
- });
- T2.start();
- }
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
示例代码中有两处用到了线程池,线程池提供了 Future 特性,我们也可以利用 Future 特性来实现线程之间的等待
27、并发容器:都有哪些“坑”需要我们填?
Java 1.5 之前提供的同步容器虽然也能保证线程安全,但是性能很差,而 Java 1.5 版本之后提供的并发容器在性能方面则做了很多优化,并且容器的类型也更加丰富了。下面我们就对比二者来学习这部分的内容。
(1)同步容器及其注意事项
Java 中的容器主要可以分为四个大类,分别是 List、Map、Set 和 Queue,但并不是所有的 Java 容器都是线程安全的。例如,我们常用的 ArrayList、HashMap 就不是线程安全的。在介绍线程安全的容器之前,我们先思考这样一个问题:如何将非线程安全的容器变成线程安全的容器?
实现思路其实很简单,只要把非线程安全的容器封装在对象内部,然后控制好访问路径就可以了。
下面我们就以 ArrayList 为例,看看如何将它变成线程安全的。在下面的代码中,SafeArrayList 内部持有一个 ArrayList 的实例 c,所有访问 c 的方法我们都增加了 synchronized 关键字,需要注意的是我们还增加了一个 addIfNotExist() 方法,这个方法也是用 synchronized 来保证原子性的。
-
- SafeArrayList<T>{
- //封装ArrayList
- List<T> c = new ArrayList<>();
- //控制访问路径
- synchronized
- T get(int idx){
- return c.get(idx);
- }
-
- synchronized
- void add(int idx, T t) {
- c.add(idx, t);
- }
-
- synchronized
- boolean addIfNotExist(T t){
- if(!c.contains(t)) {
- c.add(t);
- return true;
- }
- return false;
- }
- }
看到这里,你可能会举一反三,然后想到:所有非线程安全的类是不是都可以用这种包装的方式来实现线程安全呢?其实这一点不止你想到了,Java SDK 的开发人员也想到了,所以他们在 Collections 这个类中还提供了一套完备的包装类,比如下面的示例代码中,分别把 ArrayList、HashSet 和 HashMap 包装成了线程安全的 List、Set 和 Map。
-
- List list = Collections.
- synchronizedList(new ArrayList());
- Set set = Collections.
- synchronizedSet(new HashSet());
- Map map = Collections.
- synchronizedMap(new HashMap());
我们曾经多次强调,组合操作需要注意竞态条件问题,例如上面提到的 addIfNotExist() 方法就包含组合操作。组合操作往往隐藏着竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性,这个一定要注意。
在容器领域一个容易被忽视的“坑”是用迭代器遍历容器,例如在下面的代码中,通过迭代器遍历容器 list,对每个元素调用 foo() 方法,这就存在并发问题,这些组合的操作不具备原子性。
-
- List list = Collections.
- synchronizedList(new ArrayList());
- Iterator i = list.iterator();
- while (i.hasNext())
- foo(i.next());
而正确做法是下面这样,锁住 list 之后再执行遍历操作。如果你查看 Collections 内部的包装类源码,你会发现包装类的公共方法锁的是对象的 this,其实就是我们这里的 list,所以锁住 list 绝对是线程安全的。
-
- List list = Collections.
- synchronizedList(new ArrayList());
- synchronized (list) {
- Iterator i = list.iterator();
- while (i.hasNext())
- foo(i.next());
- }
上面我们提到的这些经过包装后线程安全容器,都是基于 synchronized 这个同步关键字实现的,所以也被称为同步容器。Java 提供的同步容器还有 Vector、Stack 和 Hashtable,这三个容器不是基于包装类实现的,但同样是基于 synchronized 实现的,对这三个容器的遍历,同样要加锁保证互斥。
(2)并发容器及其注意事项
Java 在 1.5 版本之前所谓的线程安全的容器,主要指的就是同步容器。不过同步容器有个最大的问题,那就是性能差,所有方法都用 synchronized 来保证互斥,串行度太高了。因此 Java 在 1.5 及之后版本提供了性能更高的容器,我们一般称为并发容器。
并发容器虽然数量非常多,但依然是前面我们提到的四大类:List、Map、Set 和 Queue,下面的并发容器关系图,基本上把我们经常用的容器都覆盖到了。
鉴于并发容器的数量太多,再加上篇幅限制,所以我并不会一一详细介绍它们的用法,只是把关键点介绍一下。
(3)List:
List 里面只有一个实现类就是 CopyOnWriteArrayList。CopyOnWrite,顾名思义就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。
CopyOnWriteArrayList 的实现原理
CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。
如果在遍历 array 的同时,还有一个写操作,例如增加元素,CopyOnWriteArrayList 是如何处理的呢?CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。通过下图你可以看到,读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。
使用 CopyOnWriteArrayList 需要注意的“坑”主要有两个方面。一个是应用场景,CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。例如上面的例子中,写入的新元素并不能立刻被遍历到。另一个需要注意的是,CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。
(4) Map:
Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于 ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。
使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。下面这个表格总结了 Map 相关的实现类对于 key 和 value 的要求,你可以对比学习。
ConcurrentSkipListMap 里面的 SkipList 本身就是一种数据结构,中文一般都翻译为“跳表”。跳表插入、删除、查询操作平均的时间复杂度是 O(log n),理论上和并发线程数没有关系,所以在并发度非常高的情况下,若你对 ConcurrentHashMap 的性能还不满意,可以尝试一下 ConcurrentSkipListMap。
(5)Set
Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的,这里就不再赘述了。
(6)Queue
Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。
这两个维度组合后,可以将 Queue 细分为四大类,分别是:
① 单端阻塞队列
其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和 DelayQueue。内部一般会持有一个队列,这个队列可以是数组(其实现是 ArrayBlockingQueue)也可以是链表(其实现是 LinkedBlockingQueue);甚至还可以不持有队列(其实现是 SynchronousQueue),此时生产者线程的入队操作必须等待消费者线程的出队操作。而 LinkedTransferQueue 融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。
② 双端阻塞队列:
其实现是 LinkedBlockingDeque。
③ 单端非阻塞队列:
其实现是 ConcurrentLinkedQueue。
④ .双端非阻塞队列:
其实现是 ConcurrentLinkedDeque。
另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。
而在实际工作中,你不单要清楚每种容器的特性,还要能选对容器,这才是关键,至于每种容器的用法,用的时候看一下 API 说明就可以了,这些容器的使用都不难。在文中,我们甚至都没有介绍 Java 容器的快速失败机制(Fail-Fast),原因就在于当你选对容器的时候,根本不会触发它。
28、原子类:无锁工具类的典范
一个累加器的例子,示例代码如下。在这个例子中,add10K() 这个方法不是线程安全的,问题就出在变量 count 的可见性和 count+=1 的原子性上。可见性问题可以用 volatile 来解决,而原子性问题我们前面一直都是采用的互斥锁方案。
-
- public class Test {
- long count = 0;
- void add10K() {
- int idx = 0;
- while(idx++ < 10000) {
- count += 1;
- }
- }
- }
其实对于简单的原子性问题,还有一种无锁方案。Java SDK 并发包将这种无锁方案封装提炼之后,实现了一系列的原子类。不过,在深入介绍原子类的实现之前,我们先看看如何利用原子类解决累加器问题,这样你会对原子类有个初步的认识。
在下面的代码中,我们将原来的 long 型变量 count 替换为了原子类 AtomicLong,原来的 count +=1 替换成了 count.getAndIncrement(),仅需要这两处简单的改动就能使 add10K() 方法变成线程安全的,原子类的使用还是挺简单的。
-
- public class Test {
- AtomicLong count =
- new AtomicLong(0);
- void add10K() {
- int idx = 0;
- while(idx++ < 10000) {
- count.getAndIncrement();
- }
- }
- }
无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性,既解决了问题,又没有带来新的问题,可谓绝佳方案。那它是如何做到的呢?
(1)无锁方案的实现原理
其实原子类性能高的秘密很简单,硬件支持而已。CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。
你可以通过下面 CAS 指令的模拟代码来理解 CAS 的工作原理。在下面的模拟程序中有两个参数,一个是期望值 expect,另一个是需要写入的新值 newValue,只有当目前 count 的值和期望值 expect 相等时,才会将 count 更新为 newValue。
-
- class SimulatedCAS{
- int count;
- synchronized int cas(
- int expect, int newValue){
- // 读目前count的值
- int curValue = count;
- // 比较目前count值是否==期望值
- if(curValue == expect){
- // 如果是,则更新count的值
- count = newValue;
- }
- // 返回写入前的值
- return curValue;
- }
- }
只有当目前 count 的值和期望值 expect 相等时,才会将 count 更新为 newValue
对于前面提到的累加器的例子,count += 1 的一个核心问题是:基于内存中 count 的当前值 A 计算出来的 count+=1 为 A+1,在将 A+1 写入内存的时候,很可能此时内存中 count 已经被其他线程更新过了,这样就会导致错误地覆盖其他线程写入的值。也就是说,只有当内存中 count 的值等于期望值 A 时,才能将内存中 count 的值更新为计算结果 A+1,这不就是 CAS 的语义吗!
使用 CAS 来解决并发问题,一般都会伴随着自旋,而所谓自旋,其实就是循环尝试。例如,实现一个线程安全的count += 1操作,“CAS+ 自旋”的实现方案如下所示,首先计算 newValue = count+1,如果 cas(count,newValue) 返回的值不等于 count,则意味着线程在执行完代码①处之后,执行代码②处之前,count 的值被其他线程更新过。那此时该怎么处理呢?可以采用自旋方案,就像下面代码中展示的,可以重新读 count 最新的值来计算 newValue 并尝试再次更新,直到成功。
-
- class SimulatedCAS{
- volatile int count;
- // 实现count+=1
- addOne(){
- do {
- newValue = count+1; //①
- }while(count !=
- cas(count,newValue) //②
- }
- // 模拟实现CAS,仅用来帮助理解
- synchronized int cas(
- int expect, int newValue){
- // 读目前count的值
- int curValue = count;
- // 比较目前count值是否==期望值
- if(curValue == expect){
- // 如果是,则更新count的值
- count= newValue;
- }
- // 返回写入前的值
- return curValue;
- }
- }
通过上面的示例代码,想必你已经发现了,CAS 这种无锁方案,完全没有加锁、解锁操作,即便两个线程完全同时执行 addOne() 方法,也不会有线程被阻塞,所以相对于互斥锁方案来说,性能好了很多。
但是在 CAS 方案中,有一个问题可能会常被你忽略,那就是 ABA 的问题。什么是 ABA 问题呢?
前面我们提到“如果 cas(count,newValue) 返回的值不等于count,意味着线程在执行完代码①处之后,执行代码②处之前,count 的值被其他线程更新过”,那如果 cas(count,newValue) 返回的值等于count,是否就能够认为 count 的值没有被其他线程更新过呢?显然不是的,假设 count 原本是 A,线程 T1 在执行完代码①处之后,执行代码②处之前,有可能 count 被线程 T2 更新成了 B,之后又被 T3 更新回了 A,这样线程 T1 虽然看到的一直是 A,但是其实已经被其他线程更新过了,这就是 ABA 问题。
可能大多数情况下我们并不关心 ABA 问题,例如数值的原子递增,但也不能所有情况下都不关心,例如原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。所以在使用 CAS 方案的时候,一定要先 check 一下。
(2)看 Java 如何实现原子化的 count += 1
在本文开始部分,我们使用原子类 AtomicLong 的 getAndIncrement() 方法替代了count += 1,从而实现了线程安全。原子类 AtomicLong 的 getAndIncrement() 方法内部就是基于 CAS 实现的,下面我们来看看 Java 是如何使用 CAS 来实现原子化的count += 1的。
在 Java 1.8 版本中,getAndIncrement() 方法会转调 unsafe.getAndAddLong() 方法。这里 this 和 valueOffset 两个参数可以唯一确定共享变量的内存地址。
-
- final long getAndIncrement() {
- return unsafe.getAndAddLong(
- this, valueOffset, 1L);
- }
unsafe.getAndAddLong() 方法的源码如下,该方法首先会在内存中读取共享变量的值,之后循环调用 compareAndSwapLong() 方法来尝试设置共享变量的值,直到成功为止。compareAndSwapLong() 是一个 native 方法,只有当内存中共享变量的值等于 expected 时,才会将共享变量的值更新为 x,并且返回 true;否则返回 fasle。compareAndSwapLong 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已。
-
- public final long getAndAddLong(
- Object o, long offset, long delta){
- long v;
- do {
- // 读取内存中的值
- v = getLongVolatile(o, offset);
- } while (!compareAndSwapLong(
- o, offset, v, v + delta));
- return v;
- }
- //原子性地将变量更新为x
- //条件是内存中的值等于expected
- //更新成功则返回true
- native boolean compareAndSwapLong(
- Object o, long offset,
- long expected,
- long x);
另外,需要你注意的是,getAndAddLong() 方法的实现,基本上就是 CAS 使用的经典范例。所以请你再次体会下面这段抽象后的代码片段,它在很多无锁程序中经常出现。Java 提供的原子类里面 CAS 一般被实现为 compareAndSet(),compareAndSet() 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已,compareAndSet() 里面如果更新成功,则会返回 true,否则返回 false。
-
- do {
- // 获取当前值
- oldV = xxxx;
- // 根据当前值计算新值
- newV = ...oldV...
- }while(!compareAndSet(oldV,newV);
(3)原子类概览
Java SDK 并发包里提供的原子类内容很丰富,我们可以将它们分为五个类别:原子化的基本数据类型、原子化的对象引用类型、原子化数组、原子化对象属性更新器和原子化的累加器。这五个类别提供的方法基本上是相似的,并且每个类别都有若干原子类,你可以通过下面的原子类组成概览图来获得一个全局的印象。下面我们详细解读这五个类别。
① 原子化的基本数据类型
相关实现有 AtomicBoolean、AtomicInteger 和 AtomicLong,提供的方法主要有以下这些,详情你可以参考 SDK 的源代码,都很简单,这里就不详细介绍了。
-
- getAndIncrement() //原子化i++
- getAndDecrement() //原子化的i--
- incrementAndGet() //原子化的++i
- decrementAndGet() //原子化的--i
- //当前值+=delta,返回+=前的值
- getAndAdd(delta)
- //当前值+=delta,返回+=后的值
- addAndGet(delta)
- //CAS操作,返回是否成功
- compareAndSet(expect, update)
- //以下四个方法
- //新值可以通过传入func函数来计算
- getAndUpdate(func)
- updateAndGet(func)
- getAndAccumulate(x,func)
- accumulateAndGet(x,func)
② 原子化的对象引用类型
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。AtomicReference 提供的方法和原子化的基本数据类型差不多,这里不再赘述。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。
解决 ABA 问题的思路其实很简单,增加一个版本号维度就可以了,这个和的乐观锁机制很类似,每次执行 CAS 操作,附加再更新一个版本号,只要保证版本号是递增的,那么即便 A 变成 B 之后再变回 A,版本号也不会变回来(版本号递增的)。AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:
-
- boolean compareAndSet(
- V expectedReference,
- V newReference,
- int expectedStamp,
- int newStamp)
AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下:
-
- boolean compareAndSet(
- V expectedReference,
- V newReference,
- boolean expectedMark,
- boolean newMark)
③ 原子化数组
相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,利用这些原子类,我们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数,所以这里也不再赘述了。
④ 原子化对象属性更新器
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:
-
- public static <U>
- AtomicXXXFieldUpdater<U>
- newUpdater(Class<U> tclass,
- String fieldName)
需要注意的是,对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。
你会发现 newUpdater() 的方法参数只有类的信息,没有对象的引用,而更新对象的属性,一定需要对象的引用,那这个参数是在哪里传入的呢?是在原子操作的方法参数中传入的。例如 compareAndSet() 这个原子操作,相比原子化的基本数据类型多了一个对象引用 obj。原子化对象属性更新器相关的方法,相比原子化的基本数据类型仅仅是多了对象引用参数,所以这里也不再赘述了。
-
- boolean compareAndSet(
- T obj,
- int expect,
- int update)
⑤ 原子化的累加器
DoubleAccumulator、DoubleAdder、LongAccumulator 和 LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。
无锁方案相对于互斥锁方案,优点非常多,首先性能好,其次是基本不会出现死锁问题(但可能出现饥饿和活锁问题,因为自旋会反复重试)。Java 提供的原子类大部分都实现了 compareAndSet() 方法,基于 compareAndSet() 方法,你可以构建自己的无锁数据结构,但是建议你不要这样做,这个工作最好还是让大师们去完成,原因是无锁算法没你想象的那么简单。
Java 提供的原子类能够解决一些简单的原子性问题,但你可能会发现,上面我们所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案。原子类虽好,但使用要慎之又慎。
29、Executor与线程池:如何创建正确的线程池?
虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但实际上创建线程远不是创建一个对象那么简单。创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
那如何避免呢?应对方案估计你已经知道了,那就是线程池。
线程池的需求是如此普遍,所以 Java SDK 并发包自然也少不了它。但是很多人在初次接触并发包里线程池相关的工具类时,多少会都有点蒙,不知道该从哪里入手,我觉得根本原因在于线程池和一般意义上的池化资源是不同的。一般意义上的池化资源,都是下面这样,当你需要资源的时候就调用 acquire() 方法来申请资源,用完之后就调用 release() 释放资源。若你带着这个固有模型来看并发包里线程池相关的工具类时,会很遗憾地发现它们完全匹配不上,Java 提供的线程池里面压根就没有申请线程和释放线程的方法。
-
- class XXXPool{
- // 获取池化资源
- XXX acquire() {
- }
- // 释放池化资源
- void release(XXX x){
- }
- }
(1)线程池是一种生产者 - 消费者模式
为什么线程池没有采用一般意义上池化资源的设计方法呢?如果线程池采用一般意义上池化资源的设计方法,应该是下面示例代码这样。你可以来思考一下,假设我们获取到一个空闲线程 T1,然后该如何使用 T1 呢?你期望的可能是这样:通过调用 T1 的 execute() 方法,传入一个 Runnable 对象来执行具体业务逻辑,就像通过构造函数 Thread(Runnable target) 创建线程一样。可惜的是,你翻遍 Thread 对象的所有方法,都不存在类似 execute(Runnable target) 这样的公共方法。
-
- //采用一般意义上池化资源的设计方法
- class ThreadPool{
- // 获取空闲线程
- Thread acquire() {
- }
- // 释放线程
- void release(Thread t){
- }
- }
- //期望的使用
- ThreadPool pool;
- Thread T1=pool.acquire();
- //传入Runnable对象
- T1.execute(()->{
- //具体业务逻辑
- ......
- });
所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法。那线程池该如何设计呢?目前业界线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池 MyThreadPool,你可以通过它来理解线程池的工作原理。
-
- //简化的线程池,仅用来说明工作原理
- class MyThreadPool{
- //利用阻塞队列实现生产者-消费者模式
- BlockingQueue<Runnable> workQueue;
- //保存内部工作线程
- List<WorkerThread> threads
- = new ArrayList<>();
- // 构造方法
- MyThreadPool(int poolSize,
- BlockingQueue<Runnable> workQueue){
- this.workQueue = workQueue;
- // 创建工作线程
- for(int idx=0; idx<poolSize; idx++){
- WorkerThread work = new WorkerThread();
- work.start();
- threads.add(work);
- }
- }
- // 提交任务
- void execute(Runnable command){
- workQueue.put(command);
- }
- // 工作线程负责消费任务,并执行任务
- class WorkerThread extends Thread{
- public void run() {
- //循环取任务并执行
- while(true){ ①
- Runnable task = workQueue.take();
- task.run();
- }
- }
- }
- }
-
- /** 下面是使用示例 **/
- // 创建有界阻塞队列
- BlockingQueue<Runnable> workQueue =
- new LinkedBlockingQueue<>(2);
- // 创建线程池
- MyThreadPool pool = new MyThreadPool(
- 10, workQueue);
- // 提交任务
- pool.execute(()->{
- System.out.println("hello");
- });
在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些,是不是还挺简单的?
(2)如何使用 Java 中的线程池
Java 并发包里提供的线程池,远比我们上面的示例代码强大得多,当然也复杂得多。Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,通过名字你也能看出来,它强调的是 Executor,而不是一般意义上的池化资源。
ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有 7 个参数。
-
- ThreadPoolExecutor(
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
下面我们一一介绍这些参数的意义,你可以把线程池类比为一个项目组,而线程就是项目组的成员。
① corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
② maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
③ keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
④ workQueue:工作队列,和上面示例代码的工作队列同义。
⑤ threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
⑥ handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。
(3)使用线程池要注意些什么
考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了,所以这里我就不再花篇幅介绍了。
不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
-
- try {
- //业务逻辑
- } catch (RuntimeException x) {
- //按需处理
- } catch (Throwable x) {
- //按需处理
- }
线程池在 Java 并发编程领域非常重要,很多大厂的编码规范都要求必须通过线程池来管理线程。线程池和普通的池化资源有很大不同,线程池实际上是生产者 - 消费者模式的一种实现,理解生产者 - 消费者模式是理解线程池的关键所在。
30、Future:如何用多线程实现最优的“烧水泡茶”程序?
创建完线程池,我们该如何使用呢?在上一篇文章中,我们仅仅介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。那 ThreadPoolExecutor 是否提供了相关功能呢?必须的,这么重要的功能当然需要提供了。
下面我们就来介绍一下使用 ThreadPoolExecutor 的时候,如何获取任务执行结果。
(1)如何获取任务执行结果
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
-
- // 提交Runnable任务
- Future<?>
- submit(Runnable task);
- // 提交Callable任务
- <T> Future<T>
- submit(Callable<T> task);
- // 提交Runnable任务及结果引用
- <T> Future<T>
- submit(Runnable task, T result);
你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
-
- // 取消任务
- boolean cancel(
- boolean mayInterruptIfRunning);
- // 判断任务是否已取消
- boolean isCancelled();
- // 判断任务是否已结束
- boolean isDone();
- // 获得任务执行结果
- get();
- // 获得任务执行结果,支持超时
- get(long timeout, TimeUnit unit);
这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下。
① 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
② 提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
③ 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
-
- ExecutorService executor
- = Executors.newFixedThreadPool(1);
- // 创建Result对象r
- Result r = new Result();
- r.setAAA(a);
- // 提交任务
- Future<Result> future =
- executor.submit(new Task(r), r);
- Result fr = future.get();
- // 下面等式成立
- fr === r;
- fr.getAAA() === a;
- fr.getXXX() === x
-
- class Task implements Runnable{
- Result r;
- //通过构造函数传入result
- Task(Result r){
- this.r = r;
- }
- void run() {
- //可以操作result
- a = r.getAAA();
- r.setXXX(x);
- }
- }
下面我们再来介绍 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
-
- FutureTask(Callable<V> callable);
- FutureTask(Runnable runnable, V result);
那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
-
- // 创建FutureTask
- FutureTask<Integer> futureTask
- = new FutureTask<>(()-> 1+2);
- // 创建线程池
- ExecutorService es =
- Executors.newCachedThreadPool();
- // 提交FutureTask
- es.submit(futureTask);
- // 获取计算结果
- Integer result = futureTask.get();
FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
-
- // 创建FutureTask
- FutureTask<Integer> futureTask
- = new FutureTask<>(()-> 1+2);
- // 创建并启动线程
- Thread T1 = new Thread(futureTask);
- T1.start();
- // 获取计算结果
- Integer result = futureTask.get();
(2)实现最优的“烧水泡茶”程序
记得以前初中语文课文里有一篇著名数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
下面我们用程序来模拟一下这个最优工序。我们专栏前面曾经提到,并发编程可以总结为三个核心问题:分工、同步和互斥。编写并发程序,首先要做的就是分工,所谓分工指的是如何高效地拆解任务并分配给线程。对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。对于 T1 的这个等待动作,你应该可以想出很多种办法,例如 Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用 Future 特性来实现。
下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。
-
- // 创建任务T2的FutureTask
- FutureTask<String> ft2
- = new FutureTask<>(new T2Task());
- // 创建任务T1的FutureTask
- FutureTask<String> ft1
- = new FutureTask<>(new T1Task(ft2));
- // 线程T1执行任务ft1
- Thread T1 = new Thread(ft1);
- T1.start();
- // 线程T2执行任务ft2
- Thread T2 = new Thread(ft2);
- T2.start();
- // 等待线程T1执行结果
- System.out.println(ft1.get());
-
- // T1Task需要执行的任务:
- // 洗水壶、烧开水、泡茶
- class T1Task implements Callable<String>{
- FutureTask<String> ft2;
- // T1任务需要T2任务的FutureTask
- T1Task(FutureTask<String> ft2){
- this.ft2 = ft2;
- }
- @Override
- String call() throws Exception {
- System.out.println("T1:洗水壶...");
- TimeUnit.SECONDS.sleep(1);
-
- System.out.println("T1:烧开水...");
- TimeUnit.SECONDS.sleep(15);
- // 获取T2线程的茶叶
- String tf = ft2.get();
- System.out.println("T1:拿到茶叶:"+tf);
-
- System.out.println("T1:泡茶...");
- return "上茶:" + tf;
- }
- }
- // T2Task需要执行的任务:
- // 洗茶壶、洗茶杯、拿茶叶
- class T2Task implements Callable<String> {
- @Override
- String call() throws Exception {
- System.out.println("T2:洗茶壶...");
- TimeUnit.SECONDS.sleep(1);
-
- System.out.println("T2:洗茶杯...");
- TimeUnit.SECONDS.sleep(2);
-
- System.out.println("T2:拿茶叶...");
- TimeUnit.SECONDS.sleep(1);
- return "龙井";
- }
- }
- // 一次执行结果:
- T1:洗水壶...
- T2:洗茶壶...
- T1:烧开水...
- T2:洗茶杯...
- T2:拿茶叶...
- T1:拿到茶叶:龙井
- T1:泡茶...
- 上茶:龙井
利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。Future 可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕了。
利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。在分析这种问题的过程中,建议你用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,类似于烧水泡茶最优分工方案那幅图。对照图来写代码,好处是更形象,且不易出错。
31、CompletableFuture:异步编程没那么难
用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化,那具体实施起来该怎么做呢?
-
- //以下两个方法都是耗时操作
- doBizA();
- doBizB();
还是挺简单的,就像下面代码中这样,创建两个子线程去执行就可以了。你会发现下面的并行方案,主线程无需等待 doBizA() 和 doBizB() 的执行结果,也就是说 doBizA() 和 doBizB() 两个操作已经被异步化了。
-
- new Thread(()->doBizA())
- .start();
- new Thread(()->doBizB())
- .start();
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信你应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼。
(1)CompletableFuture 的核心优势
为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。
下面是代码实现,你先略过 runAsync()、supplyAsync()、thenCombine() 这些不太熟悉的方法,从大局上看,你会发现:
① 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
② 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
③ 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
-
- //任务1:洗水壶->烧开水
- CompletableFuture<Void> f1 =
- CompletableFuture.runAsync(()->{
- System.out.println("T1:洗水壶...");
- sleep(1, TimeUnit.SECONDS);
-
- System.out.println("T1:烧开水...");
- sleep(15, TimeUnit.SECONDS);
- });
- //任务2:洗茶壶->洗茶杯->拿茶叶
- CompletableFuture<String> f2 =
- CompletableFuture.supplyAsync(()->{
- System.out.println("T2:洗茶壶...");
- sleep(1, TimeUnit.SECONDS);
-
- System.out.println("T2:洗茶杯...");
- sleep(2, TimeUnit.SECONDS);
-
- System.out.println("T2:拿茶叶...");
- sleep(1, TimeUnit.SECONDS);
- return "龙井";
- });
- //任务3:任务1和任务2完成后执行:泡茶
- CompletableFuture<String> f3 =
- f1.thenCombine(f2, (__, tf)->{
- System.out.println("T1:拿到茶叶:" + tf);
- System.out.println("T1:泡茶...");
- return "上茶:" + tf;
- });
- //等待任务3执行结果
- System.out.println(f3.join());
-
- void sleep(int t, TimeUnit u) {
- try {
- u.sleep(t);
- }catch(InterruptedException e){}
- }
- // 一次执行结果:
- T1:洗水壶...
- T2:洗茶壶...
- T1:烧开水...
- T2:洗茶杯...
- T2:拿茶叶...
- T1:拿到茶叶:龙井
- T1:泡茶...
- 上茶:龙井
领略 CompletableFuture 异步编程的优势之后,下面我们详细介绍 CompletableFuture 的使用,首先是如何创建 CompletableFuture 对象。
(2)创建 CompletableFuture 对象
创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。
前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
-
- //使用默认线程池
- static CompletableFuture<Void>
- runAsync(Runnable runnable)
- static <U> CompletableFuture<U>
- supplyAsync(Supplier<U> supplier)
- //可以指定线程池
- static CompletableFuture<Void>
- runAsync(Runnable runnable, Executor executor)
- static <U> CompletableFuture<U>
- supplyAsync(Supplier<U> supplier, Executor executor)
创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?
(3)如何理解 CompletionStage 接口
我觉得,你可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。
CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。
在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage 接口也可以方便地描述异常处理。
下面我们就来一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。
① 描述串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。
thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>。
而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。
thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。
这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
-
- CompletionStage<R> thenApply(fn);
- CompletionStage<R> thenApplyAsync(fn);
- CompletionStage<Void> thenAccept(consumer);
- CompletionStage<Void> thenAcceptAsync(consumer);
- CompletionStage<Void> thenRun(action);
- CompletionStage<Void> thenRunAsync(action);
- CompletionStage<R> thenCompose(fn);
- CompletionStage<R> thenComposeAsync(fn);
通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。
-
- CompletableFuture<String> f0 =
- CompletableFuture.supplyAsync(
- () -> "Hello World") //①
- .thenApply(s -> s + " QQ") //②
- .thenApply(String::toUpperCase);//③
-
- System.out.println(f0.join());
- //输出结果
- HELLO WORLD QQ
② 描述 AND 汇聚关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。它们的使用你可以参考上面烧水泡茶的实现程序,这里就不赘述了。
-
- CompletionStage<R> thenCombine(other, fn);
- CompletionStage<R> thenCombineAsync(other, fn);
- CompletionStage<Void> thenAcceptBoth(other, consumer);
- CompletionStage<Void> thenAcceptBothAsync(other, consumer);
- CompletionStage<Void> runAfterBoth(other, action);
- CompletionStage<Void> runAfterBothAsync(other, action);
③ 描述 OR 汇聚关系
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
-
- CompletionStage applyToEither(other, fn);
- CompletionStage applyToEitherAsync(other, fn);
- CompletionStage acceptEither(other, consumer);
- CompletionStage acceptEitherAsync(other, consumer);
- CompletionStage runAfterEither(other, action);
- CompletionStage runAfterEitherAsync(other, action);
下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。
-
- CompletableFuture<String> f1 =
- CompletableFuture.supplyAsync(()->{
- int t = getRandom(5, 10);
- sleep(t, TimeUnit.SECONDS);
- return String.valueOf(t);
- });
-
- CompletableFuture<String> f2 =
- CompletableFuture.supplyAsync(()->{
- int t = getRandom(5, 10);
- sleep(t, TimeUnit.SECONDS);
- return String.valueOf(t);
- });
-
- CompletableFuture<String> f3 =
- f1.applyToEither(f2,s -> s);
-
- System.out.println(f3.join());
④ 异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
-
- CompletableFuture<Integer>
- f0 = CompletableFuture.
- .supplyAsync(()->(7/0))
- .thenApply(r->r*10);
- System.out.println(f0.join());
CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
-
- CompletionStage exceptionally(fn);
- CompletionStage<R> whenComplete(consumer);
- CompletionStage<R> whenCompleteAsync(consumer);
- CompletionStage<R> handle(fn);
- CompletionStage<R> handleAsync(fn);
下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
-
- CompletableFuture<Integer>
- f0 = CompletableFuture
- .supplyAsync(()->(7/0))
- .thenApply(r->r*10)
- .exceptionally(e->0);
- System.out.println(f0.join());
曾经一提到异步编程,大家脑海里都会随之浮现回调函数,例如在 JavaScript 里面异步问题基本上都是靠回调函数来解决的,回调函数在处理异常以及复杂的异步任务关系时往往力不从心,对此业界还发明了个名词:回调地狱(Callback Hell)。应该说在前些年,异步编程还是声名狼藉的。
不过最近几年,伴随着ReactiveX的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决了,异步编程已经慢慢开始成熟,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。因此,学好异步编程还是很有必要的。
CompletableFuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 RxJava 这个项目,利用 RxJava,即便在 Java 1.6 版本也能享受异步编程的乐趣。
32、CompletionService:如何批量执行异步任务?
“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
-
- // 创建线程池
- ExecutorService executor =
- Executors.newFixedThreadPool(3);
- // 异步向电商S1询价
- Future<Integer> f1 =
- executor.submit(
- ()->getPriceByS1());
- // 异步向电商S2询价
- Future<Integer> f2 =
- executor.submit(
- ()->getPriceByS2());
- // 异步向电商S3询价
- Future<Integer> f3 =
- executor.submit(
- ()->getPriceByS3());
-
- // 获取电商S1报价并保存
- r=f1.get();
- executor.execute(()->save(r));
-
- // 获取电商S2报价并保存
- r=f2.get();
- executor.execute(()->save(r));
-
- // 获取电商S3报价并保存
- r=f3.get();
- executor.execute(()->save(r));
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。这点小瑕疵你该如何解决呢?
估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。
-
- // 创建阻塞队列
- BlockingQueue<Integer> bq =
- new LinkedBlockingQueue<>();
- //电商S1报价异步进入阻塞队列
- executor.execute(()->
- bq.put(f1.get()));
- //电商S2报价异步进入阻塞队列
- executor.execute(()->
- bq.put(f2.get()));
- //电商S3报价异步进入阻塞队列
- executor.execute(()->
- bq.put(f3.get()));
- //异步保存所有报价
- for (int i=0; i<3; i++) {
- Integer r = bq.take();
- executor.execute(()->save(r));
- }
(1)利用 CompletionService 实现询价系统
不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的 CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
(2)那到底该如何创建 CompletionService 呢?
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
① ExecutorCompletionService(Executor executor);
② ExecutorCompletionService(Executor executor, BlockingQueue<future> completionQueue)。
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,我们没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。
-
- // 创建线程池
- ExecutorService executor =
- Executors.newFixedThreadPool(3);
- // 创建CompletionService
- CompletionService<Integer> cs = new
- ExecutorCompletionService<>(executor);
- // 异步向电商S1询价
- cs.submit(()->getPriceByS1());
- // 异步向电商S2询价
- cs.submit(()->getPriceByS2());
- // 异步向电商S3询价
- cs.submit(()->getPriceByS3());
- // 将询价结果异步保存到数据库
- for (int i=0; i<3; i++) {
- Integer r = cs.take().get();
- executor.execute(()->save(r));
- }
(3)CompletionService 接口说明
下面我们详细地介绍一下 CompletionService 接口提供的方法,CompletionService 接口提供的方法有 5 个,这 5 个方法的方法签名如下所示。
其中,submit() 相关的方法有两个。一个方法参数是Callable task,前面利用 CompletionService 实现询价系统的示例代码中,我们提交任务就是用的它。另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result)
CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值。 poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。
-
- Future<V> submit(Callable<V> task);
- Future<V> submit(Runnable task, V result);
- Future<V> take()
- throws InterruptedException;
- Future<V> poll();
- Future<V> poll(long timeout, TimeUnit unit)
- throws InterruptedException;
(4)利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果 r,那么地址转坐标这个服务就可以直接返回 r 了。这种集群模式可以容忍 2 个地图服务商服务异常,但缺点是消耗的资源偏多。
-
- geocoder(addr) {
- //并行执行以下3个查询服务,
- r1=geocoderByS1(addr);
- r2=geocoderByS2(addr);
- r3=geocoderByS3(addr);
- //只要r1,r2,r3有一个返回
- //则返回
- return r1|r2|r3;
- }
利用 CompletionService 可以快速实现 Forking 这种集群模式,比如下面的示例代码就展示了具体是如何实现的。首先我们创建了一个线程池 executor 、一个 CompletionService 对象 cs 和一个Future类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务,会返回一个 Future 对象,我们把这些 Future 对象保存在列表 futures 中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。
-
- // 创建线程池
- ExecutorService executor =
- Executors.newFixedThreadPool(3);
- // 创建CompletionService
- CompletionService<Integer> cs =
- new ExecutorCompletionService<>(executor);
- // 用于保存Future对象
- List<Future<Integer>> futures =
- new ArrayList<>(3);
- //提交异步任务,并保存future到futures
- futures.add(
- cs.submit(()->geocoderByS1()));
- futures.add(
- cs.submit(()->geocoderByS2()));
- futures.add(
- cs.submit(()->geocoderByS3()));
- // 获取最快返回的任务执行结果
- Integer r = 0;
- try {
- // 只要有一个成功返回,则break
- for (int i = 0; i < 3; ++i) {
- r = cs.take().get();
- //简单地通过判空来检查是否成功返回
- if (r != null) {
- break;
- }
- }
- } finally {
- //取消所有任务
- for(Future<Integer> f : futures)
- f.cancel(true);
- }
- // 返回结果
- return r;
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
33、 Fork/Join:单机版的MapReduce
线程池、Future、CompletableFuture 和 CompletionService,仔细观察你会发现这些工具类都是在帮助我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。
下面我用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图,相信你一定能将你的思维模式转换到现实世界里来。
上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。
分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。
(1)分治任务模型
这里你需要先深入了解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。
在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。
(2)Fork/Join 的使用
Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
接下来我们就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。
-
- static void main(String[] args){
- //创建分治任务线程池
- ForkJoinPool fjp =
- new ForkJoinPool(4);
- //创建分治任务
- Fibonacci fib =
- new Fibonacci(30);
- //启动分治任务
- Integer result =
- fjp.invoke(fib);
- //输出结果
- System.out.println(result);
- }
- //递归任务
- static class Fibonacci extends
- RecursiveTask<Integer>{
- final int n;
- Fibonacci(int n){this.n = n;}
- protected Integer compute(){
- if (n <= 1)
- return n;
- Fibonacci f1 =
- new Fibonacci(n - 1);
- //创建子任务
- f1.fork();
- Fibonacci f2 =
- new Fibonacci(n - 2);
- //等待子任务结果,并合并结果
- return f2.compute() + f1.join();
- }
- }
(2)ForkJoinPool 工作原理
Fork/Join 并行计算的核心组件是 ForkJoinPool,所以下面我们就来简单介绍一下 ForkJoinPool 的工作原理。
通过专栏前面文章的学习,你应该已经知道 ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。
ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,你可以参考下面的 ForkJoinPool 工作原理图来理解其原理。ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。
(3)模拟 MapReduce 统计单词数量
学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。
我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。
思路有了,我们马上来实现。下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。
-
- static void main(String[] args){
- String[] fc = {"hello world",
- "hello me",
- "hello fork",
- "hello join",
- "fork join in world"};
- //创建ForkJoin线程池
- ForkJoinPool fjp =
- new ForkJoinPool(3);
- //创建任务
- MR mr = new MR(
- fc, 0, fc.length);
- //启动任务
- Map<String, Long> result =
- fjp.invoke(mr);
- //输出结果
- result.forEach((k, v)->
- System.out.println(k+":"+v));
- }
- //MR模拟类
- static class MR extends
- RecursiveTask<Map<String, Long>> {
- private String[] fc;
- private int start, end;
- //构造函数
- MR(String[] fc, int fr, int to){
- this.fc = fc;
- this.start = fr;
- this.end = to;
- }
- @Override protected
- Map<String, Long> compute(){
- if (end - start == 1) {
- return calc(fc[start]);
- } else {
- int mid = (start+end)/2;
- MR mr1 = new MR(
- fc, start, mid);
- mr1.fork();
- MR mr2 = new MR(
- fc, mid, end);
- //计算子任务,并返回合并的结果
- return merge(mr2.compute(),
- mr1.join());
- }
- }
- //合并结果
- private Map<String, Long> merge(
- Map<String, Long> r1,
- Map<String, Long> r2) {
- Map<String, Long> result =
- new HashMap<>();
- result.putAll(r1);
- //合并结果
- r2.forEach((k, v) -> {
- Long c = result.get(k);
- if (c != null)
- result.put(k, c+v);
- else
- result.put(k, v);
- });
- return result;
- }
- //统计单词数量
- private Map<String, Long>
- calc(String line) {
- Map<String, Long> result =
- new HashMap<>();
- //分割单词
- String [] words =
- line.split("\\s+");
- //统计单词数量
- for (String w : words) {
- Long v = result.get(w);
- if (v != null)
- result.put(w, v+1);
- else
- result.put(w, 1L);
- }
- return result;
- }
- }
Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。
34、while(true) 总不让人省心
本意是通过破坏不可抢占条件来避免死锁问题,但是它的实现中有一个致命的问题,那就是: while(true) 没有 break 条件,从而导致了死循环。除此之外,这个实现虽然不存在死锁问题,但还是存在活锁问题的,解决活锁问题很简单,只需要随机等待一小段时间就可以了。
修复后的代码如下所示,我仅仅修改了两个地方,一处是转账成功之后 break,另一处是在 while 循环体结束前增加了Thread.sleep(随机时间)。
-
- class Account {
- private int balance;
- private final Lock lock
- = new ReentrantLock();
- // 转账
- void transfer(Account tar, int amt){
- while (true) {
- if(this.lock.tryLock()) {
- try {
- if (tar.lock.tryLock()) {
- try {
- this.balance -= amt;
- tar.balance += amt;
- //新增:退出循环
- break;
- } finally {
- tar.lock.unlock();
- }
- }//if
- } finally {
- this.lock.unlock();
- }
- }//if
- //新增:sleep一个随机时间避免活锁
- Thread.sleep(随机时间);
- }//while
- }//transfer
- }
这个思考题里面的 while(true) 问题还是比较容易看出来的,但不是所有的 while(true) 问题都这么显而易见的,很多都隐藏得比较深。
例如下面代码中本质上也是一个 while(true),不过它隐藏得就比较深了。看上去 while(!rf.compareAndSet(or, nr)) 是有终止条件的,而且跑单线程测试一直都没有问题。实际上却存在严重的并发问题,问题就出在对 or 的赋值在 while 循环之外,这样每次循环 or 的值都不会发生变化,所以一旦有一次循环 rf.compareAndSet(or, nr) 的值等于 false,那之后无论循环多少次,都会等于 false。也就是说在特定场景下,变成了 while(true) 问题。既然找到了原因,修改就很简单了,只要把对 or 的赋值移到 while 循环之内就可以了,修改后的代码如下所示:
-
- public class SafeWM {
- class WMRange{
- final int upper;
- final int lower;
- WMRange(int upper,int lower){
- //省略构造函数实现
- }
- }
- final AtomicReference<WMRange>
- rf = new AtomicReference<>(
- new WMRange(0,0)
- );
- // 设置库存上限
- void setUpper(int v){
- WMRange nr;
- WMRange or;
- //原代码在这里
- //WMRange or=rf.get();
- do{
- //移动到此处
- //每个回合都需要重新获取旧值
- or = rf.get();
- // 检查参数合法性
- if(v < or.lower){
- throw new IllegalArgumentException();
- }
- nr = new
- WMRange(v, or.lower);
- }while(!rf.compareAndSet(or, nr));
- }
- }
35、signalAll() 总让人省心
关于 signal() 和 signalAll() 的,Dubbo 最近已经把 signal() 改成 signalAll() 了,我觉得用 signal() 也不能说错,但的确是用 signalAll() 会更安全。我个人也倾向于使用 signalAll(),因为我们写程序,不是做数学题,而是在搞工程,工程中会有很多不稳定的因素,更有很多你预料不到的情况发生,所以不要让你的代码铤而走险,尽量使用更稳妥的方案和设计。Dubbo 修改后的相关代码如下所示:
-
- // RPC结果返回时调用该方法
- private void doReceived(Response res) {
- lock.lock();
- try {
- response = res;
- done.signalAll();
- } finally {
- lock.unlock();
- }
- }
36、Semaphore 需要锁中锁
对象池的例子中 Vector 能否换成 ArrayList,答案是不可以的。Semaphore 可以允许多个线程访问一个临界区,那就意味着可能存在多个线程同时访问 ArrayList,而 ArrayList 不是线程安全的,所以对象池的例子中是不能够将 Vector 换成 ArrayList 的。Semaphore 允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说 Semaphore 需要锁中锁。
37、锁的申请和释放要成对出现
bug出在没有正确地释放锁。锁的申请和释放要成对出现,对此我们有一个最佳实践,就是使用 try{}finally{},但是 try{}finally{}并不能解决所有锁的释放问题。比如示例代码中,锁的升级会生成新的 stamp ,而 finally 中释放锁用的是锁升级前的 stamp,本质上这也属于锁的申请和释放没有成对出现,只是它隐藏得有点深。解决这个问题倒也很简单,只需要对 stamp 重新赋值就可以了,修复后的代码如下所示:
-
- private double x, y;
- final StampedLock sl = new StampedLock();
- // 存在问题的方法
- void moveIfAtOrigin(double newX, double newY){
- long stamp = sl.readLock();
- try {
- while(x == 0.0 && y == 0.0){
- long ws = sl.tryConvertToWriteLock(stamp);
- if (ws != 0L) {
- //问题出在没有对stamp重新赋值
- //新增下面一行
- stamp = ws;
- x = newX;
- y = newY;
- break;
- } else {
- sl.unlockRead(stamp);
- stamp = sl.writeLock();
- }
- }
- } finally {
- //此处unlock的是stamp
- sl.unlock(stamp);
- }
38、回调总要关心执行线程是谁
CyclicBarrier 的回调函数使用了一个固定大小为 1 的线程池,是否合理?我觉得是合理的,可以从以下两个方面来分析。
第一个是线程池大小是 1,只有 1 个线程,主要原因是 check() 方法的耗时比 getPOrders() 和 getDOrders() 都要短,所以没必要用多个线程,同时单线程能保证访问的数据不存在并发问题。
第二个是使用了线程池,如果不使用,直接在回调函数里调用 check() 方法是否可以呢?绝对不可以。为什么呢?这个要分析一下回调函数和唤醒等待线程之间的关系。下面是 CyclicBarrier 相关的源码,通过源码你会发现 CyclicBarrier 是同步调用回调函数之后才唤醒等待的线程,如果我们在回调函数里直接调用 check() 方法,那就意味着在执行 check() 的时候,是不能同时执行 getPOrders() 和 getDOrders() 的,这样就起不到提升性能的作用。
-
- try {
- //barrierCommand是回调函数
- final Runnable command = barrierCommand;
- //调用回调函数
- if (command != null)
- command.run();
- ranAction = true;
- //唤醒等待的线程
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
所以,当遇到回调函数的时候,你应该本能地问自己:执行回调函数的线程是哪一个?这个在多线程场景下非常重要。因为不同线程 ThreadLocal 里的数据是不同的,有些框架比如 Spring 就用 ThreadLocal 来管理事务,如果不清楚回调函数用的是哪个线程,很可能会导致错误的事务管理,并最终导致数据不一致。
CyclicBarrier 的回调函数究竟是哪个线程执行的呢?如果你分析源码,你会发现执行回调函数的线程是将 CyclicBarrier 内部计数器减到 0 的那个线程。所以我们前面讲执行 check() 的时候,是不能同时执行 getPOrders() 和 getDOrders(),因为执行这两个方法的线程一个在等待,一个正在忙着执行 check()。
再次强调一下:当看到回调函数的时候,一定问一问执行回调函数的线程是谁。
39、共享线程池:有福同享就要有难同当
以下代码的问题,例如没有异常处理、逻辑不严谨等等,不过我更想让你关注的是:findRuleByJdbc() 这个方法隐藏着一个阻塞式 I/O,这意味着会阻塞调用线程。默认情况下所有的 CompletableFuture 共享一个 ForkJoinPool,当有阻塞式 I/O 时,可能导致所有的 ForkJoinPool 线程都阻塞,进而影响整个系统的性能。
-
- //采购订单
- PurchersOrder po;
- CompletableFuture<Boolean> cf =
- CompletableFuture.supplyAsync(()->{
- //在数据库中查询规则
- return findRuleByJdbc();
- }).thenApply(r -> {
- //规则校验
- return check(po, r);
- });
- Boolean isOk = cf.join();
利用共享,往往能让我们快速实现功能,所谓是有福同享,但是代价就是有难要同当。在强调高可用的今天,大多数人更倾向于使用隔离的方案。
40、线上问题定位的利器:线程栈 dump
本质上都是定位线上并发问题,方案很简单,就是通过查看线程栈来定位问题。重点是查看线程状态,分析线程进入该状态的原因是否合理,你可以参考《09 | Java 线程(上):Java 线程的生命周期》来加深理解。
为了便于分析定位线程问题,你需要给线程赋予一个有意义的名字,对于线程池可以通过自定义 ThreadFactory 来给线程池中的线程赋予有意义的名字,也可以在执行 run() 方法时通过Thread.currentThread().setName();来给线程赋予一个更贴近业务的名字。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。