赞
踩
本文主要描述了java并发编程和多线程编程常见面试问题,本文涉及的内容在开发中极其重要,需要在理解的基础上灵活应用,这块内容多且抽象,需要我们反复研究学习,笔者相信多看几遍细细品味定会掌握。
J.U.C实际是java.util.concurrent(java并发包的缩写),这个包的作者是 Doug Lea,java的并发编程相关技术的维护者,对java的贡献巨大,非常厉害的一位老爷子。
AbstractQueuedSynchronizer(抽象队列同步器),是J.U.C包中的多个组件的底层实现,如Lock,CountDownLatch,Semaphore,CyclicBarrier等都用到了AQS。
关键特性如下:
同步状态管理: AQS 维护了一个单一的、原子的同步状态变量,这个状态可以用来表示锁的持有情况或者某种条件是否满足。状态值通常是一个整数,通过 CAS(Compare and Swap)操作来原子地更新。
等待队列: AQS 使用 CLH 锁队列(Craig, Landin, and Hagersten lock)作为其等待队列的基础,这是一个高性能的、非阻塞的队列算法。当线程尝试获取锁而锁不可用时,它们会被插入到队列中并被挂起,直到锁可用。
条件对象: AQS 支持条件变量,允许线程在等待特定条件满足时被挂起,当条件满足后被唤醒。
AQS可以直接理解为多线程同步器,AQS提供了两种锁机制,分别是排他锁和共享锁。
AQS内部是由两个核心部分组成:
一个由volatile修饰的state变量,作为一个竞态条件。
用双向链表结构维护的FIFO线程等待队列。
工作原理是,多个线程通过对这个state共享变量进行修改来实现竞态条件,竞争失败的线程加入FIFO队列并且阻塞,抢占到竞态资源的线程释放资源之后,后续的线程按照FIFO顺序实现有序唤醒。
这个问题考察了我们对数据结构的理解程度。首先双向链表有什么特点呢?
单向链表的所有中间结点(不关注首尾节点)由数据域和一个指针域组成,指针指向的是后置节点的内存地址。
双向链表的中间节点(不关注首尾节点)都有数据域和两个指针域组成,其中一个指针指向后置节点,另外一个指针就指向前置节点。
通过这两种数据数据结构的特点可以看出,双向链表支持在O(1)时间复杂度下找到前置节点。单向链表则是O(n),因为单向链表在查找是需要从头开始遍历,显然在进行插入和删除的时候,双向链表要比单向链表简单高效。这其实也是一种空间换时间的做法,双向需要维护两个指针,自然空间占用就更大点,但是处理时间更快了。
由双链表这个特性分析来看,AQS使用双向链表有3个原因:
没有竞争到锁的线程会加入阻塞队列,并且阻塞等待的前提是,当前线程所在的前置节点是正常状态。这么设计是为了避免链表中存在异常线程导致无法唤醒后续线程的问题。线程在进入阻塞队列前需要判断其前置节点线程的状态是否正常,那如果是单线程,则需要从头开始遍历,性能就很低下了。
Lock接口里有一个方法lockInterruptibly()方法,这个方法表示处于锁阻塞的线程允许被中断。那么这个被中断的线程是不用去竞争锁的,此时还存在于双链表中,意味着后续的锁竞争时需要移除这个中断的线程节点,不然会导致锁阻塞住的线程无法唤醒。和上一个原因类似,涉及到查询被中断的节点并移除的操作,单向链表需要从头开始遍历,效率不如双向链表。
为了避免线程阻塞和唤醒的开销,刚加入链表的线程,首先通过自旋的方式尝试竞争锁,但按照公平锁的设计,只有head节点的下一个节点才有必要竞争锁,后面的节点无需竞争锁。这就涉及到对加入节点的前置节点判断了,判断其前置节点否为head节点,是则进行锁竞争,不是就不去竞争锁。又是涉及到查找前置节点的问题,单向链表效率不如双向链表。
总结为以下三点:
CAS是java中unsafe类的方法,全称是CompareAndSwap,即比较并交换的意思。是为了确保在多线程环境下,对于共享变量的原子修改问题。下面的内容篇幅较长,较为螺丝,请仔细阅读思考。
这里举个例子,假设有个成员变量state,默认为0,定义了一个方法,方法逻辑是,判断state是否为0,如果为0,就修改成1。逻辑没问题,但是多线程环境下,会存在原子性问题。请看代码:
package com.execute.batch.executebatch; /** * @author hulei * @date 2024/7/7 23:05 */ public class Example { private int state = 0; public void doSomething() { if (state == 0) { state = 1; System.out.println("doSomething:" + state); } } }
再提供一个多线程的测试代码:
package com.execute.batch.executebatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author hulei * @date 2024/7/7 23:07 */ public class ExampleThread { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(100); // 创建一个固定大小的线程池 Example example = new Example(); for (int i = 0; i < 10000; i++) { // 多个线程将调用此方法 executor.submit(example::doSomething); } executor.shutdown(); // 关闭线程池 } }
正常合理情况如下:
大部分情况下都是先有一个线程先对state进行了修改+1操作,其他线程没有进入if条件中。但是也有可能多个线程获取到的state的值为0,同时进入if条件中,我继续多点了几次,出现了下面的结果:
这就是因为方法调用没有加锁,导致线程还没有来得及修改变量或者修改了还没有写入到内存中,其他线程就读取了state,仍然为0,出现了原子性问题。
那有什么办法呢,很简单方法上加synchronized就可以了,加了锁,哪个线程先调用就持有锁,后续线程只能阻塞挂起了。
也许会有人说synchronized是重量级锁,线程阻塞需要进行用户态和内核态的之间转换,耗资源性能不好什么之类的,不如使用ReentranLock,但是我想说的是随着近年来的不断优化synchronized,synchronized和Lock(ReentranLock的接口)之间性能已经相差无几了,性能不应该再成为我们选择Lock的理由。有时候我们需要关注下jdk的发展哈。关于synchronized和ReentranLock的区别和适用场景,笔者这里不展开描述,在下文中有专门的章节点论述。
对于Example类,我们适用CAS机制进行优化,解决原子性问题,把Example类修改如下:
package com.execute.batch.executebatch; import sun.misc.Unsafe; import java.lang.reflect.Field; /** * @author hulei * @date 2024/7/7 23:05 */ public class Example { private volatile int state = 0; private static final Unsafe unsafe; private static final long stateOffset; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); unsafe = (Unsafe) theUnsafe.get(null); stateOffset = unsafe.objectFieldOffset(Example.class.getDeclaredField("state")); } catch (Exception e) { throw new Error(e); } } public void doSomething() { if (unsafe.compareAndSwapInt(this, stateOffset, 0, 1)) { System.out.println("do something:"+state); } } }
在上面的代码中,定义了一个state变量,使用了关键字volatile 修饰,保证内存可见性,说白了就是有线程对其值进行了修改,其他操作这个共享变量的线程都能立即可见。这一点很重要,对于CAS机制在修改变量的时候能否获取到最新内存中存在的值,判断是否已经被其他线程修改过,从而决定是直接修改变量还是自旋一次继续判断。
Unsafe 类中提供了一个int类型比较并交换的cas方法compareAndSwapInt(),我们通过反射的方式获取了Unsafe类。
stateOffset是什么呢?是state这个变量的内存地址偏移量,就是state这个变量的内存地址位置,可以根据这个内存地址偏移量找到state位置,读取state变量的值。
compareAndSwapInt()方法有四个参数,分别是当前对象实例,成员变量state的内存地址偏移量,预期的旧值0,待修改的新值1。
CAS的工作原理笔者在这里解释下:现在有个变量a = 0,且a被volatile修饰,具备内存可见性,有两个线程分别对其进行加一操作,假设两个线程读取到a的值都为0 ,其中一个线程对其进行加一,且值已经更新到内存上,另外一个线程在修改变量时会有以下操作:
当然这里有个ABA问题,即有线程把值修改后,又改回原值,这在其他线程看来无法判断是否修改过,尽管CAS可以操作成功,但这可能会出现程序逻辑上的错误。
解决ABA问题的方法有以下几种:
使用版本号或标记时间戳:为每个变量添加一个版本号或时间戳,在每次修改变量值的同时更新版本号或时间戳。这样即使变量值回到原值,版本号或时间戳也会不同,从而避免ABA问题。
使用原子引用类型:Java中的AtomicStampedReference和AtomicMarkableReference类提供了带有版本号或标记的原子引用,可以用来检测ABA问题。
使用更强的一致性模型:例如,使用锁或其他更复杂的同步机制来确保不会发生ABA问题,但这可能会影响性能。
注意点:在java中,CAS是一个native方法,native方法通常是jvm调用外部的实现方法,使用其他语言实现的,一般是C或C++实现,这种设计允许开发者利用底层语言的性能优势,同时保持高级语言的易用性和抽象性。但是不管怎么实现,最终都会面临read-write问题,就是先从内存地址读取state的值,然后去比较,最后修改。这个过程不管在什么层面上实现,都会存在原子性问题,因为从读取到修改,这中间必然有时间差,其他线程都有可能先操作了这个值。在CAS的底层实现中,在多核CPU的环境下,会增加一个Lock指令对缓存或者总线加锁,从而保证比较并替换这两个指令的原子性。所以CAS不是不加锁,是锁隐藏在了底层。
乐观锁,顾名思义,就是乐观态度的锁。乐观锁在操作数据时认为不会有其他线程同时修改数据,不会加锁,只是在修改数据时进行比较判断有没有其他线程修改过这个数据。比如CAS就是乐观锁的一个实现。
悲观锁,悲观态度的锁,在操作数据时认为别的线程会修改这个数据,所以直接加锁,即读取数据时就是加锁,直到修改完毕后才会释放锁。其他线程在读取数据时就会阻塞,直到拿到锁。
两种锁的应用场景:
乐观锁:适合于写少读多的场景,为什么呢,因为读的时候不加锁,只有修改数据时才比较判断。减少了操作冲突,只有修改的时候才会有操作冲突
悲观锁:适合于写多读少的场景,如果还是用乐观锁,那么会经常出现自旋。大量消耗性能。
我们记住一点:在任何场景下,锁的作用就是解决并发冲突问题,即对于共享资源的操作问题。
死锁可以理解为死节,无解的意思。就是两个或多个线程在等待永远无法发生的事。导致系统的一部分或全部变得无响应。死锁产生的四个必要条件如下:
互斥: 某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束。
占有且等待: 一个进程本身占有资源(一种或多种),同时还有资源未得到满足,正在等待其他进程释放该资源。
不可抢占:别的线程已经占有了某项资源,不能因为自己也需要该资源,就去把别人的资源抢过来。
循环等待:存在一个进程链,使得每个进程都占有下一个进程所需的至少一种资源。
线程如果已经产生死锁,自己是没办法终止的,只能外部干预,比如kill掉程序或重启,这也是死锁后我们唯一的操作手段,没有其他办法。
上面说的是死锁已经发生后的处理方式,那我们如何避免死锁发生呢?其实只要从死锁产生的几个必要条件下手即可:
互斥条件是无法破坏的,这是互斥锁的基本约束,破坏了资源可以被多个线程占有,互斥锁也就没有意义了。
占有且等待条件,我们在首次执行时申请获取线程执行所需要的全部资源即可,这样线程在获取资源时就不存在等待其他线程释放资源的情况。
不可抢占条件,占有部分资源的线程在进一步申请其他资源时如果申请不到,那么我们就释放出自己占有的资源,给其他线程用,这样就破坏了抢占条件。需要注意的是这里是主动释放自己占有的资源,而不是其他线程持有的资源。要么全都要,要么全都不要,得到你的人必须也要得到你的心,要不然就自己主动放手成全他人,对吧铁子们?
synchronized和Lock都是java中用来解决线程安全问题的工具,即解决并发冲突。可以从以下四个方面来比较分析:
特性区别
synchronized是java内置的一个线程同步关键字,而Lock是J.U.C包下面的一个接口,它有很多实现类,我们常用的ReentranLock就是它的一个实现类。
用法区别
synchronized可以写在需要同步的对象、方法或者特定代码块中。主要有两种写法,比如这样:
//控制方法
public synchronized void sync(){
}
再比如代码块同步:
Object lock = new Object();
//控制代码块
public void sync(){
synchronized(lock){
}
}
代码块加锁可以实现更加精准的控制,提升加锁后损失的性能。再看ReentrantLock的相关用法示例
package com.execute.batch.executebatch; import lombok.Getter; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample { @Getter private volatile int count = 0; private final ReentrantLock lock = new ReentrantLock(); public void someMethod() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
ReentrantLockExample 类中有一个内存共享的变量,方法中对其进行自增操作,我们使用了ReentrantLock锁机制进行了并发控制。
测试多线程测试代码:
public class ThreadTest { public static void main(String[] args) throws InterruptedException { ReentrantLockExample example = new ReentrantLockExample(); ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池 for (int i = 0; i < 100; i++) { executor.submit(example::someMethod); } executor.shutdown(); // 关闭线程池 boolean finished = executor.awaitTermination(1, TimeUnit.MINUTES); // 等待所有任务完成 if (!finished) { System.out.println("Some tasks did not finish within the timeout."); } System.out.println("Final count: " + example.getCount()); // 输出最终的 count 值 } }
使用多线程对于count变量的自增修改操作是安全的,运行结果始终如下:
Lock加锁的方式相对灵活,可以手动的加锁释放锁,需要注意的是我们一般把unlock()放在finally中确保锁被释放。另外Lock还提供了非阻塞的竞争锁方法tryLock(),就是在竞争锁之前先判断锁有没有被其他线程持有,如果tryLock()返回false,需要我们自己决定做什么操作,执行临界区代码还是干其他的事情。
如果不使用Lock锁,注释相关代码
那么并发自增就会出现安全问题,比如下面的运行结果:
这个前面已经说过,synchronized和Lock的性能已经相差无几,jdk1.6以后做了优化,采用了偏向锁、轻量级锁、重量级锁、锁升级的方式进行优化。而Lock采用了CAS自旋锁进行优化。synchronized实现采用了悲观锁,Lock实现采用了乐观锁。
一般情形下没什么区别,不想自己管理锁的获取和释放,就用synchronized。想在更细粒度的层次上控制锁就用Lock。不过你的同步过程非常复杂,那么建议使用Lock,synchronized只提供了非公平锁的机制,Lock则提供了公平锁和非公平锁两种机制。
公平锁:过来枪锁的线程得加入到FIFO队列里,排队等待。
非公平锁:线程不管先来后到,不管是否在排队,都会去尝试竞争一次锁。
可重入:意思是运行中的某个方法或代码片段,因为抢占资源或者中断等原因,导致方法或者代码片段的运行中断,等待中断程序执行结束后,重新进入这个方法或者代码片段中运行,运行结果不受影响。
可重入锁:简单的说,就是一个线程如果已经抢占到了互斥锁资源,在它自己释放锁资源之前,再去竞争锁的时候不需要等待,只需要记录重入次数。
可重入锁工作原理:
锁的内部计数器: 当一个线程第一次获取到可重入锁时,锁内部会记录这个线程的信息,并设置一个计数器(通常称为嵌套计数器)为1。这意味着锁已经被持有一次。
多次获取锁: 如果同一个线程再次尝试获取同一把锁,锁会检查请求锁的线程是否已经是锁的持有者。如果是,锁不会阻塞这个线程,而是会递增嵌套计数器的值,表示锁被同一线程再次获取。
锁的释放: 当线程释放锁时,嵌套计数器会递减。只有当嵌套计数器归零时,锁才会真正被释放,此时其他线程才有机会获取这把锁。
在多线程并发编程里,绝大部分锁都是可重入的,比如synchronized,ReentranLock等,当然也有不支持重入的锁,比如StampedLock,StampedLock 不支持可重入性是因为它被设计为一种高性能、轻量级的锁,专注于读取密集型场景,并且通过牺牲可重入性来换取更高的并发性能和更低的锁竞争。
锁的可重入性,主要用于避免死锁。因为一个已经获取同步锁X的线程,在释放锁X之前再去竞争锁X的时候,会出现自己等待自己释放锁的情况,显然这是无法成立的。
下面的代码展示了synchronized的可重入特性:
package com.execute.batch.executebatch; public class SynchronizedExample { public synchronized void outerMethod() { System.out.println("Entered outerMethod"); innerMethod(); // 调用内部方法 System.out.println("Exiting outerMethod"); } public synchronized void innerMethod() { System.out.println("Entered innerMethod"); // 执行一些操作 System.out.println("Exiting innerMethod"); } public static void main(String[] args) { SynchronizedExample example = new SynchronizedExample(); // 直接调用 outerMethod example.outerMethod(); } }
这证明了 synchronized 的可重入性,即一个已经拥有锁的线程可以再次获取同一个锁而不会导致死锁。在这个例子中,outerMethod 在调用 innerMethod 时,即使 innerMethod 也需要获取相同的锁,也不会发生死锁,因为两次获取锁都是由同一个线程执行的。
不过需要注意的是,synchronized 的可重入性是基于锁对象的。在上面的例子中,我们使用 this 作为锁对象,这意味着 outerMethod 和 innerMethod 都使用同一个对象实例作为锁,从而实现了可重入性。如果两个方法使用不同的锁对象,那么它们将不会相互重入。
ReenTranLock是一种可重入的排他锁,主要是用来解决多线程对共享资源的竞争问题。
核心特性如下:
支持可重入,也就是获得锁的线程在释放锁之前,再次去获取同一把锁时,不需要加锁就可以直接访问。
支持公平和非公平性。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Lock lock = new ReenTranLock(),默认创建的是非公平锁,不过可以通过 Lock lock = new ReenTranLock(true),创建公平锁。
在回答这个问题前,先来说下公平和非公平的概念:
ReenTranLock默认采用了非公平锁的策略来实现锁的竞争逻辑,这一点在上一节中通过创建对象时的代码展示过。ReenTranLock内部使用了AQS来实现锁资源的竞争,没有竞争到锁资源的就加入AQS内部维护的一个同步队列,是FIFO的双向链表。
基于此背景,公平锁的实现方式是,线程在竞争锁资源的时候判断AQS同步队列里有没有同步等待的线程。如果有,就加入队列的尾部等待。而非公平锁的实现方式就是,不管队列里有没有线程等待,新线程都会先尝试竞争锁资源,抢不到再加入AQS同步队列进行等待。
ReenTranLock和synchronized默认都是非公平锁机制(synchronized只有非公平锁机制),这么设计的原因是考虑到了性能问题。竞争锁的线程按照公平锁的机制去等待,先不枪锁,直接排队阻塞等待,等到要执行时,需要重新去唤醒线程,涉及用户态和内核态的互相转换,对性能影响较大。如若使用非公平锁的的策略去竞争锁,当前线程正好赶在上一个线程释放锁的临界点抢到了锁,就不需要从用户态切换到内核态了,尽管这对原本排队的线程不公平,性能确实提升了很多。
这些锁是MySQL的InnoDB引擎下解决事务隔离性的一系列排他锁,在MySQL的InnoDB存储引擎中,行锁、间隙锁和临键锁(Next-Key Locks)是用于确保数据一致性和并发控制的重要概念。这些锁类型主要用于处理事务隔离级别下的并发操作,尤其是在可重复读(Repeatable Read)隔离级别下。
行锁(Record Lock) 行锁是最基本的锁类型,它锁定的是具体的数据行。当一个事务开始更新或删除某一行数据时,InnoDB会在这一行上放置一个排他锁(X锁),阻止其他事务对该行的更新或删除操作。同样,当一个事务开始读取某一行数据时,InnoDB会在这行上放置一个共享锁(S锁),允许其他事务同时读取同一行,但阻止它们更新或删除这一行。
间隙锁(Gap Lock) 间隙锁锁定的是索引项之间的“间隙”。在可重复读隔离级别下,为了防止幻读(Phantom Reads),即防止新插入的行出现在两次相同的范围查询结果中,InnoDB会使用间隙锁。例如,如果有两个事务,事务A正在读取一个范围内的数据,而事务B试图在这个范围内插入新的行,那么事务A会使用间隙锁来阻止事务B的插入操作,直到事务A完成。
临键锁(Next-Key Locks) 临键锁实际上是行锁和间隙锁的组合。在可重复读隔离级别下,InnoDB默认使用临键锁。临键锁不仅锁定行本身,还锁定所有可能插入到该行前面的间隙。这意味着,如果事务A正在读取某个范围内的数据,事务B不仅不能在这个范围内插入新的行,也不能在范围的起始点之前插入行,因为这样也会改变事务A看到的数据集。
临键锁的目的是解决幻读问题,同时也解决了不可重复读(Non-Repeatable Read)和脏读(Dirty Read)问题。但是,临键锁也可能导致死锁,因为它们锁定了比行锁更多的资源。在某些情况下,为了提高并发性能,可以考虑使用读已提交(Read Committed)隔离级别,这时InnoDB只使用行锁。
总结来说,行锁、间隙锁和临键锁是InnoDB为了在不同的事务隔离级别下提供数据一致性而采用的不同锁机制。了解这些锁的工作原理可以帮助数据库开发者和管理员更好地优化应用程序的性能和并发性。
乐观锁就是每次拿数据时都认为其他线程不会修改数据,所以不加锁,只在更新数据时才会把之前读到的数据和当前内存实际值相比较,判断有没有其他线程修改了这个数据。比较结果相同则认为数据没有被其他线程修改,当前线程把自己修改的数据写入。比较结果不同则说明已经被其他线程更新了,要么报错,要么自动重试(自旋)。
乐观锁,适合读多写少场景
悲观锁,适合写多读少场景
多个线程是否共享一把锁
在并发情况下,如果多个线程共享一把锁,就使用共享锁,如果不能共享一把锁,那就使用独占锁,也叫排他锁或者共享锁。
共享锁是指锁可以被多个线程持有。如果一个线程对数据加上共享锁,那么其他线程只能对数据再加共享锁,不能加独占锁。获得共享锁的线程只能读数据,不能修改数据。
独占锁是指锁一次只能被一个线程持有。如果一个线程对数据加上独占锁,那么其他线程不能再对该数据加任何类型的锁,获得独占锁的线程既能读取数据又能修改数据。
互斥锁是独占锁的一种常规实现,是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排他性。
互斥锁一次只能被一个线程持有,其他线程只能等待。读写锁是共享锁的一种实现,读写锁管理一组锁,一个是只读的锁,一个是写锁。
读锁可以在没有写锁的时候被多个线程持有,而写锁是独占的。写锁的优先级要高于读锁,一个获得了读锁的线程必须能看到前一个释放的写锁更新的内容。
读写锁相对于互斥锁的并发程度更高,每次只有一个写线程,但同时可以有多个线程并发读,互斥锁是读和写都只能有一个线程操作。
JDK中定义了一个读写锁的接口ReadWriteLock,代码如下
public interface ReadWriteLock {
/* 获取读锁 */
Lock readLock();
/*获取写锁 */
Lock writeLock();
}
ReentrantReadWriteLock实现了ReadWriteLock接口,ReentrantReadWriteLock支持锁降级,不支持锁升级,可以由写锁降级为读锁。
以下是一个读写锁的代码示例,有两个方法分别使用读锁和写锁完成对共享变量的读取和更新操作:
package com.execute.batch.executebatch; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockExample { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private Object sharedResource = new Object(); public void read() { lock.readLock().lock(); try { // 读取共享资源 System.out.println("Reading: " + sharedResource); } finally { lock.readLock().unlock(); } } public void write(Object newValue) { lock.writeLock().lock(); try { // 更新共享资源 sharedResource = newValue; System.out.println("Writing: " + sharedResource); } finally { lock.writeLock().unlock(); } } }
测试代码如下:
package com.execute.batch.executebatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class ReadWriteLockTest { public static void main(String[] args) throws InterruptedException { ReadWriteLockExample example = new ReadWriteLockExample(); ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池 CountDownLatch startSignal = new CountDownLatch(1); // 控制所有线程同时开始 CountDownLatch doneSignal = new CountDownLatch(20); // 等待所有线程完成 // 启动读取线程 IntStream.range(0, 15).forEach(i -> executor.submit(() -> { try { startSignal.await(); // 等待开始信号 example.read(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { doneSignal.countDown(); // 完成后减小计数 } })); // 启动写入线程 IntStream.range(0, 5).forEach(i -> executor.submit(() -> { try { startSignal.await(); // 等待开始信号 example.write("New Value " + i); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { doneSignal.countDown(); // 完成后减小计数 } })); startSignal.countDown(); // 发送开始信号 boolean result = doneSignal.await(10, TimeUnit.SECONDS); // 等待所有线程完成或超时 if (!result){ System.out.println("Timeout occurred. Some threads may not have completed."); } executor.shutdown(); // 关闭线程池 } }
程序通过创建一个固定大小的线程池,同时启动15个读取线程和5个写入线程,读取线程执行example.read()方法,写入线程执行example.write("New Value " + i)方法。通过读写锁,可以保证同一时间内只有一个写入线程在执行,而多个读取线程可以同时执行。读写锁确保在写操作进行时没有其他读操作或写操作同时进行,而在读操作进行时可以有多个读操作同时进行,但不能有写操作。程序使用CountDownLatch来控制线程的启动和等待,确保所有线程同时开始执行,并等待所有线程完成执行。
java中通过构造函数初始化公平锁,代码如下:
Lock lock = new ReentrantLock(true)
非公平锁是指多线程获取锁的顺序不是按照申请锁的顺序进行的,有可能后面申请的线程比先申请的线程优先获得锁,高并发场景下,可能造成优先级翻转,或者某个线程一直得不到锁的饥饿状态,示例图如下:
java中syncronized关键字是非公平锁,ReenTrantLock默认是非公平锁,如下:
Lock lock = new ReentrantLock(false);
Lock lock2 = new ReentrantLock();
上述两行代码创建的效果是一样的,都是创建了一个非公平锁。
一个线程的多个流程是否能获得同一把锁
如果一个线程的多个流程能获取同一把锁,就用可重入锁;如果一个线程的多个流程不能获取同一把锁,就使用不可重入锁。可重入锁又称为递归锁,是指同一个线程在外层方法获得了锁,如下图所示:
对于ReenTrantLock,从名字可以看出它是一个可重入锁。syncronized也是一个可重入锁。可重入锁在提升性能的同时,更重要的是可以避免死锁。关于线程自己等待自己释放锁会造成死锁的分析如下。
假设有一个线程T,它获取了一个锁L,然后在没有释放锁的情况下进入了等待状态,等待某个条件满足,而这个条件正是T释放锁L。此时,T将永远等待下去,因为它自身持有锁,而它又在等待自己释放锁。由于T永远不会释放锁(除非外部干预,如中断线程),所以T将永远处于等待状态,这就形成了一个循环依赖,导致线程挂起。
虽然这看起来像是一个死锁,但实际上它是一种特殊的自我挂起情况。在这种情况下,没有任何其他线程参与,只有一个线程陷入了无限等待的状态,这通常是因为逻辑错误或设计缺陷造成的。
再来看下面这段代码:
public syncronized void methodA(){
methodB();
}
public syncronized void methodB(){
//do something
}
这段代码中,methodA调用methodB(),如果一个线程调用methodA()已经获得了锁,再调用methodB()就不需要再获得锁了,这就使用了可重入锁的特性。如果是不可重入锁,则methodB()可能不会被当前线程执行,造成线程无线等待,也就是死锁。
如果锁被另一个线程占用时间较长,即使自旋了,之后当前线程还是会被挂起,之前的忙循环也就变成了无意义的浪费资源的操作,反而降低了系统的性能。所以自旋锁不适合锁占用时间长的并发情况
下面的代码是AtomicInteger原子类就有自旋操作:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVol
atile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
循环条件weakCompareAndSetInt()就是CAS操作,如果失败就会刷新获取到的预期旧值v为当前内存实际值,继续下一次循环获得获得变量的内存实际值进行CAS比较。
JDK1.6引入了自适应自旋锁,这就比较智能了,自旋时间不固定,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定。如果虚拟机认为这次自旋很有可能再次成功,那么就会自旋较多的时间,否则就直接省略自旋过程,避免浪费资源。
第一个线程访问加锁的资源自动获取锁,不存在多线程竞争的情况,资源偏向第一个访问锁的线程,每次访问,这个线程都不需要重复获取锁,这种状态称为偏向锁。偏向锁是通过控制锁对象Mark Word 的标志位来实现的,如果当前是可偏向状态,则需要进一步判断对象头存储的线程ID是否与当前线程ID一致,如果一致则直接进入。
当线程竞争较为激烈,偏向锁会升级成为轻量级锁。轻量级锁认为竞争虽然存在,理想情况下竞争程度较低,所以通过自旋方式等待上一个线程释放锁。
如果并发进一步加剧,线程的自旋超过一定次数,或者自旋时间过久,就会升级成重量级锁,会使拥有锁的线程以外的线程都阻塞挂起。
重量级锁就是互斥锁,java中的syncronized关键字内部的实现原理就是这样一个锁升级的过程。
锁粗化:
请看下面代码:
private static final Object lock = new Object();
for(int i = 0;i<100;i++){
syncronized(lock){
//do something
}
}
经过锁粗化后:
syncronized(lock){
for(int i = 0; i<100 ; i++){
//do something
}
}
锁粗化就是把多次加锁和释放锁的操作合并成一次同步请求。
锁消除:
请看下面的代码:
private String test(String s1,String s2){
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
return sb.toString();
}
上述代码中所有的变量都是局部变量,不涉及到全局变量的操作,局部变量是存储在栈上的,栈是线程私有的,多线程访问test()方法也是线程安全的。
StringBuffer是线程安全的,append方法如下:
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}
这个方法虽然加了同步锁,但由于其外部test方法是线程安全的,所以内部的同步锁会被虚拟机消除。
这一小节的内容还是挺多的,下面是一张思维导图,方便记忆
首先阻塞队列本身是符合FIFO特性的队列,也就是存储进去的元素符合先进先出的规则。
其次,在阻塞队列里,使用了condition条件等来维护两个等待队列。如下图所示:
一个是存储阻塞生产者线程的队列,此队列是当任务队列满了的时候,生产线程无法再生产任务放入时,生产者线程需要阻塞挂起,为了保证后续的任务生产顺序,需要把阻塞的生产线程有序放入一个FIFO队列中。
另一个是存储消费者线程的队列,此队列是当任务队列为空的时候,消费者线程无法从任务队列中获取任务消费,消费者线程需要阻塞挂起,为了保证后续的任务消费顺序,需要把阻塞的消费者线程有序放入FIFO队列中。
ArrayBlockingQueue实现原理其实就是上一节所描述的那样,阻塞队列在队列的基础上增加了两个附加操作:当队列为空时,获取元素的线程会等待队列变为非空;队列满的时候,存储元素的线程会等待队列可用,如下图所示。
在Java中,Thread和Runnable是用于创建和管理线程的两种不同方式,它们的主要区别在于实现方式和用途上。以下是两者的主要区别:
类与接口
继承与实现
启动线程
资源共享
线程状态
面向对象的设计
总结起来,Runnable和Thread在Java中用于多线程编程的不同层面。Runnable接口用于封装线程要执行的任务逻辑,而Thread类不仅封装了任务逻辑,还提供了线程的生命周期管理。在实际开发中,通常推荐使用Runnable接口,因为它提供了更好的设计灵活性和代码复用性。
守护线程是一种专门为用户线程提供服务的线程,生命周期依赖于用户线程。
只有jvm中仍然存在用户线程,守护线程才有存在的意义。
以下代码是创建守护线程的简单示例:
public class DaemonThreadExample { public static void main(String[] args) { // 创建一个守护线程 Thread daemonThread = new Thread(() -> { while (true) { System.out.println("守护线程正在运行..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); // 设置为守护线程 daemonThread.setDaemon(true); // 启动守护线程 daemonThread.start(); // 主线程做一些工作,然后结束 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程结束,守护线程应该停止了。"); } }
通过代码看到,只需要调用setDaemon(),并传入参数为true即可,就表示这个线程是守护线程。守护线程不会阻止JVM的退出。当JVM中只剩下守护线程时,JVM将会自动退出,JVM退出时,守护线程也就退出了,但是用户线程会影响JVM退出时间,一般情况下,JVM会等到所有用户线程结束才会退出。
Java中的垃圾回收线程,就是典型的使用场景。这个场景的特殊之处在于,当JVM进程结束的时候,内存回收线程存在的意义也就不存在了。
需要注意的是:守护线程不会阻止jvm的退出,并且也会随着jvm退出而结束自己的生命周期,所以在一些I/O场景或者线程池中,不能使用守护线程,否则可能会导致任务没有执行完,或者资源没有正确释放。
首先这两种都是线程的阻塞等待状态。
BLOCKED是指线程在阻塞等待锁释放时的状态。
WAITING是指线程等待状态。
二者核心区别如下:
下面试WAITING方法的示例:
package com.netty.chat; public class WaitingExample { private static final Object lock = new Object(); private static boolean conditionMet = false; public static void main(String[] args) { Thread waitingThread = new Thread(() -> { synchronized (lock) { System.out.println("WaitingThread is waiting for the condition to be true."); while (!conditionMet) { try { lock.wait(); // 进入WAITING状态 } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("WaitingThread was interrupted."); return; } } System.out.println("WaitingThread was notified and the condition is now true."); } }); Thread notifyingThread = new Thread(() -> { try { Thread.sleep(2000); // 等待2秒后再改变条件 } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } synchronized (lock) { conditionMet = true; System.out.println("NotifyingThread sets the condition to true."); lock.notifyAll(); // 唤醒所有等待的线程 } }); waitingThread.start(); notifyingThread.start(); } }
java中线程的运行状态有如下6种:
New (新建):
线程对象已经创建,但是尚未调用start()方法,此时线程处于新建状态。
Runnable (就绪):
线程调用了start()方法之后,线程状态变为可运行状态。处于此状态的线程可能正在执行,也可能在等待CPU时间片以便执行。
Blocked (阻塞):
这个状态通常指的是线程因为等待某种资源(如锁)而被阻塞的情况。当线程试图获取一个已经被其他线程锁定的锁时,就会进入阻塞状态,直到锁被释放。
Waiting (等待):
线程调用了Object.wait(), Thread.join()或某些带TimeUnit参数的等待方法(如Semaphore.acquire())时,线程会进入等待状态,直到被其他线程唤醒或满足等待条件。
Timed Waiting (计时等待):
当线程调用了一些有时间限制的等待方法(如Thread.sleep(long millis), Object.wait(long timeout)等),线程会进入计时等待状态,直到等待时间到期或被中断。
Terminated (已终止):
线程已经完成了其run()方法的执行,或者由于异常而提前结束,此时线程状态变为终止状态,线程生命周期结束。
当我们调用start()方法时,线程可能处于终止或其他非NEW状态,这是不合理的,所以要先判断下线程的初始状态是不是NEW。
线程池本质是一种池化技术,利用了资源复用的思想,常见的有连接池、内存池、对象池。
线程池设计的核心目标有两个:
线程的频繁创建和销毁带来的性能开销,因为线程创建会涉及CPU上下文切换、内存分配等工作。
线程池本身会有参数控制线程创建的数量,这样就可以避免无休止地创建线程带来的资源利用率过高的问题,发生OOM。
为了实现线程的复用,线程池使用到了阻塞队列,简单来说,就是线程池里的工作线程处于一直运行的状态,它会从阻塞队列中获取待执行的任务,一旦队列空了,这个工作线程就会就会被阻塞,直到下次有新任务进来。一句话就是工作线程会根据任务情况实现阻塞或者唤醒,从而达到线程复用的目的。
线程池里面的资源限制,是通过两个关键参数来控制的,分别是核心线程数和最大线程数。
核心线程数表示默认长期存在的工作线程,最大线程数是指根据任务的情况能动态创建的最大线程数,动态创建线程可以提升阻塞队列中任务的处理效率。
java种默认的线程池是通过JDK中的工具类Executors来构建的,线程池内部的最终实现类是ThreadPoolExecutor,如下图所示:
下面分别对这五种方式进行详细解释
Executors.newCachedThreadPool()
是一个可缓存的线程池。它会根据需要创建新线程来执行任务,当线程空闲超过一定时间后会被终止并移除缓存。这种线程池适用于处理耗时较短的任务,能够快速响应并充分利用系统资源。
Executors.newFixedThreadPool(10)
是一个固定大小的线程池,它包含10个线程。当有任务提交时,如果线程池中的线程都处于活动状态,新任务将会等待,直到有线程空闲出来。这种线程池适用于处理耗时较长的任务,以避免线程过多导致系统资源浪费。
Executors.newSingleThreadExecutor()
是一个单线程的线程池。它只有一个线程用于执行任务,任务会按照提交的顺序依次执行。这种线程池适用于需要保证任务执行顺序的场景。
Executors.newScheduledThreadPool(10)
是一个定长的线程池,可以定时或周期性地执行任务。它包含10个线程,可以使用ScheduledExecutorService接口的方法来安排任务的执行。这种线程池适用于需要定时执行或周期性执行任务的场景,例如定时任务调度。下面是一个代码示例:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolExample { public static void main(String[] args) { // 创建一个包含10个线程的ScheduledExecutorService ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10); // 定义一个Runnable任务 Runnable task = () -> System.out.println("Task executed at " + System.currentTimeMillis()); // 延迟5秒后执行一次任务 scheduledThreadPool.schedule(task, 5, TimeUnit.SECONDS); // 每隔2秒重复执行任务,首次执行将在2秒后开始 scheduledThreadPool.scheduleAtFixedRate(task, 2, 2, TimeUnit.SECONDS); // 每次执行间隔2秒,但实际执行间隔取决于上一次任务完成的时间 scheduledThreadPool.scheduleWithFixedDelay(task, 2, 2, TimeUnit.SECONDS); // 在一段时间后关闭线程池 scheduledThreadPool.shutdown(); // 让主线程暂停10秒,确保其他线程有足够的时间运行 Thread.sleep(100000); } }
线程池中的线程分为核心线程和非核心线程。核心线程是常驻在线程池里面的工作线程。有两种初始化方式:
向线程池里添加任务时,被动初始化。
主动调用prestartAllCoreThreads()方法初始化。prestartAllCoreThreads()是ThreadPoolExecutor类里面的方法。
核心线程是常驻的工作线程,会长期存在,没有任务时会阻塞挂起,有任务时会唤醒执行任务。
**ThreadPoolExecutor.getTask()**方法内部有个获取队列任务的方法
底层在调用take()方法时会有一个判断任务数量的循环,为空就进入阻塞等待状态
非核心线程是因为任务太多,为了增加线程池的处理能力而临时创建的,任务执行结束后,这些非核心线程会被回收。
我们来看下ThreadPoolExecutor.getTask()方法的内部实现
这个函数的作用是获取一个任务(Runnable)来执行,它是一个无限循环,直到成功获取到任务或者线程池关闭。在循环中,它首先检查线程池的状态,如果线程池已经关闭或者停止,并且工作队列为空,则减少工作线程数并返回null。接着,它检查是否允许核心线程超时或者当前工作线程数超过了最大线程数,如果是,则减少工作线程数并继续循环。如果工作队列为空,也会减少工作线程数并继续循环。最后,它尝试从工作队列中获取一个任务,如果获取成功,则返回这个任务;如果获取失败,则设置超时标志为true并继续循环。如果发生中断,则重置超时标志为false并继续循环。
线程池采用了生产者-消费者模型。生产者-消费者模型就是通过一个中间容器来解耦生产者和消费者的任务处理过程。生产者不断生产任务保存到容器中,消费者不断从容器中消费任务。
线程池中的核心线程创建后一般不会销毁,为了保证线程资源的重复使用,这些核心工作线程在没有任务时会阻塞挂起,释放CPU资源,有任务时再唤醒,从任务的阻塞队列容器中获取任务执行。
从两个方面去回答:
package com.netty.chat; import java.util.concurrent.*; public class CustomThreadPoolExecutor extends ThreadPoolExecutor { public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; if (future.isDone()) { Object result = future.get(); System.out.println("Task " + r + " completed with result: " + result); } } catch (Exception e) { System.out.println("Task " + r + " failed"); } } else if (t != null) { System.err.println("Task " + r.toString() + " threw exception: " + t.getMessage()); } } public static void main(String[] args) { try(CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor( 2, // 核心线程数 4, // 最大线程数 60L, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new ArrayBlockingQueue<>(10)); // 工作队列 ){ // 提交任务 for (int i = 0; i < 10; i++) { final int index = i; Future<?> future = executor.submit(() -> { try { Thread.sleep(1000); // 模拟耗时操作 System.out.println("Executing task " + index); return index; // 返回一个结果,使任务成为Future } catch (InterruptedException e) { throw new IllegalStateException(e); } }); } executor.shutdown(); }catch (Exception e) { System.out.println("Error: " + e.getMessage()); } } }
package com.netty.chat; import java.util.concurrent.CountDownLatch; public class WorkerThreadExample { public static void main(String[] args) throws InterruptedException { // 初始化CountDownLatch,计数器设置为3 CountDownLatch latch = new CountDownLatch(3); // 创建工作者线程 for (int i = 0; i < 3; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " is running..."); Thread.sleep(1000); // 模拟耗时操作 System.out.println(Thread.currentThread().getName() + " finished."); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 计数器减1 latch.countDown(); } }, "Worker-" + i).start(); } // 创建等待者线程 Thread waiter = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " is waiting..."); // 等待计数器归零 latch.await(); System.out.println(Thread.currentThread().getName() + " all workers finished, continuing..."); } catch (InterruptedException e) { e.printStackTrace(); } }, "Waiter"); // 启动等待者线程 waiter.start(); // 主线程也可以等待,但这不是必须的,取决于具体的应用场景 waiter.join(); } }
这个问题挺有意思,就是不想额外创建非核心线程。我们可以通过SynchronousQueue这个阻塞队列去作为任务的中转,这个队列本身不存储任何元素,每产生一个任务放入时,就必须立刻要有一个消费者线程取出任务,否则会阻塞生产者。
import java.util.concurrent.*; public class ThreadPoolExecutorExample { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // corePoolSize 2, // maximumPoolSize 0L, // keepAliveTime TimeUnit.MILLISECONDS, // unit new SynchronousQueue<>(), // workQueue Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() // handler ); for (int i = 0; i < 5; i++) { Runnable task = () -> { System.out.println("Task executed by " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }; executor.execute(task); } executor.shutdown(); } }
结果如下:
在这个例子中,我们使用了SynchronousQueue作为工作队列,这是一个特殊的阻塞队列,它不会保存元素,而是在一个线程放入元素时必须有另一个线程立即取出元素。这样,一旦核心线程数达到上限,后续的任务就会直接触发拒绝策略。
注意,maximumPoolSize被设置为等于corePoolSize,意味着不会有额外的线程创建。keepAliveTime被设置为0,因为SynchronousQueue不会存储任务,所以不需要额外的非核心线程。
还有一种方式就是设置最大线程数为Integer.MAX_VALUE,这样的话,线程池就没有最大线程数的限制,因此新任务可以直接创建新线程来执行,而不是进入阻塞队列。
伪共享(False Sharing)是指在多核处理器架构中,当多个线程访问或修改同一个缓存行(Cache Line)上的不同变量时,由于缓存一致性协议(Cache Coherence Protocol),即使线程间实际上修改的是不相关的变量,也可能导致缓存行的频繁无效化和重载,从而降低程序的性能。
在现代处理器中,缓存行通常是64字节大小,如果多个线程同时访问或修改位于同一缓存行内的不同变量,那么当一个线程修改了其中一个变量时,它会将整个缓存行标记为脏(dirty),其他拥有相同缓存行副本的处理器核心就必须将其对应的缓存行作废,然后从主内存或其他处理器核心重新加载最新版本的缓存行。这种现象称为缓存行的“污染”或“伪共享”。
为了避免伪共享,可以采取以下几种策略:
缓存行对齐:确保每个线程访问的变量位于不同的缓存行上,可以通过在变量前添加足够数量的填充字节(padding)来实现。
使用原子变量:使用AtomicLong、AtomicInteger等原子变量类型,它们通常会被处理器分配到单独的缓存行,减少伪共享的影响。
使用缓存行锁定:虽然这种方法可以减少伪共享,但它可能会引入锁的竞争,因此需要谨慎使用。
伪共享是多线程编程中一个常见的性能陷阱,特别是在高并发场景下,理解并避免伪共享对于优化程序性能至关重要。
wait和notify是成对出现的,wait用于让线程处于阻塞等待状态,notify是用于唤醒线程。
现在有这样的场景:假设有两个线程要对一个共享变量做修改,其中线程1在对变量做修改之前必须要等到线程2先修改后才可以进行修改,这就涉及到一个顺序控制问题,
我们知道线程具有并行执行的特性,在没有加任何控制之前,无法保证线程2先对变量进行了修改,而线程1在其后,有可能线程1先抢到了锁,先执行了修改。
对共享变量的修改,要有一定的互斥条件,谁先抢到锁,另外一个线程便不能再操作,必须等待抢到锁的线程释放锁,否则没有互斥锁的控制,所有线程都能对变量进行修改,就没法实现线程对共享变量修改顺序的控制了。
package com.netty.chat; public class WaitingExample { private static final Object lock = new Object(); private static boolean conditionMet = false; public static void main(String[] args) { Thread waitingThread = new Thread(() -> { synchronized (lock) { System.out.println("WaitingThread is waiting for the condition to be true."); while (!conditionMet) { try { lock.wait(); // 进入WAITING状态 } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("WaitingThread was interrupted."); return; } } System.out.println("WaitingThread was notified and the condition is now true."); } }); Thread notifyingThread = new Thread(() -> { try { Thread.sleep(2000); // 等待2秒后再改变条件 } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } synchronized (lock) { conditionMet = true; System.out.println("NotifyingThread sets the condition to true."); lock.notifyAll(); // 唤醒所有等待的线程 } }); //等待线程启动 waitingThread.start(); //通知线程启动 notifyingThread.start(); } }
上述代码中的锁对象如下:
Object lock = new Object();
conditionMet 这是一个共享变量,默认为false,根据它的值来判断哪一个线程先修改它,在修改的时候需要互斥锁。notifyingThread 通知唤醒线程会修改它的值为true。
private static boolean conditionMet = false;
对于等待线程,如果它获取到锁,判断conditionMet的值,如果为false,说明waitingThread线程先抢到的锁,进入while循环后,变为WAITING状态,释放了锁,等线程2 notifyingThread执行修改conditionMet的值为true。
对于通知线程notifyingThread,在获取到锁之后,会修改conditionMet的值为true
然后唤醒等待线程waitingThread,继续执行waitingThread任务。
代码中特地让notifyingThread线程内部睡2秒,目的就是让waitingThread先抢到锁进入阻塞等待。
conditionMet的值决定了waitingThread线程是否进入阻塞,若线程notifyingThread先获取锁先执行并修改conditionMet的值为true,则线程waitingThread就不会进入阻塞了。
拓展下ReentrantLock 中也有类似的概念,只是等待和唤醒统一使用Condition 来操作,以下是一个简单的生产者消费者模型,任务队列没有使用阻塞队列,需要自己去管理锁和条件变量:
package com.netty.chat; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerExample { private final Queue<Integer> queue = new LinkedList<>(); private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public void produce(int item) throws InterruptedException { lock.lock(); try { // Wait for space to become available. int capacity = 10; while (queue.size() == capacity) { System.out.println("Queue is full, producer is waiting, size: " + queue.size()); notFull.await(); } // Add an item to the queue. queue.offer(item); System.out.println("Produced: " + item); // Notify consumers that an item is available. notEmpty.signal(); } finally { lock.unlock(); } } public void consume() throws InterruptedException { lock.lock(); try { // Wait for an item to become available. while (queue.isEmpty()) { System.out.println("Queue is empty, consumer is waiting"); notEmpty.await(); } // Remove an item from the queue. int item = queue.poll(); System.out.println("Consumed: " + item); // Notify producers that space is available. notFull.signal(); } finally { lock.unlock(); } } public static void main(String[] args) { ProducerConsumerExample example = new ProducerConsumerExample(); Thread producerThread = new Thread(() -> { try { for (int i = 0; i < 20; i++) { example.produce(i); TimeUnit.MILLISECONDS.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 0; i < 20; i++) { example.consume(); TimeUnit.MILLISECONDS.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); producerThread.start(); consumerThread.start(); } }
如果使用ArrayBlockingQueue阻塞队列,重写上面的代码就更简单了,不需要自己手动判断队列为空,消费者线程阻塞,唤醒生产者线程;队列满了,生产者线程阻塞,唤醒消费者线程:
package com.netty.chat; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class ProducerConsumerExampleWithABQ { private final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); public void produce(int item) throws InterruptedException { // Add an item to the queue. The put method will block until there is space. queue.put(item); System.out.println("Produced: " + item); } public void consume() throws InterruptedException { // Remove an item from the queue. The take method will block until an item is available. int item = queue.take(); System.out.println("Consumed: " + item); } public static void main(String[] args) { ProducerConsumerExampleWithABQ example = new ProducerConsumerExampleWithABQ(); Thread producerThread = new Thread(() -> { try { for (int i = 0; i < 20; i++) { example.produce(i); TimeUnit.MILLISECONDS.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 0; i < 20; i++) { example.consume(); TimeUnit.MILLISECONDS.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); producerThread.start(); consumerThread.start(); } }
wait() 方法:
sleep(long millis) 方法:
总结:
wait() 会释放锁并释放 CPU 资源。
sleep() 只会释放 CPU 资源,但不会释放锁。
这个问题老八股文了,比较简单,核心就两点:
内存可见性: 当一个线程修改了 volatile 变量的值,新值对所有线程都是立即可见的。这意味着,一旦某个线程对 volatile 变量进行了写操作,那么这个新的值将会对所有线程可见,从而确保了数据的最新状态能够被所有线程获取到。
禁止指令重排序: 编译器和处理器会对代码进行优化,可能会重新安排语句的执行顺序。然而,这种重排序可能会影响程序的正确性,尤其是在多线程环境下。volatile 关键字可以阻止编译器和处理器对涉及该变量的代码进行重排序,从而避免了潜在的并发问题。
另外还有以下两点:
不保证原子性: 虽然 volatile 提供了可见性和禁止指令重排序的特性,但它并不保证复合操作的原子性。例如,即使一个变量是 volatile 的,像 i++ 这样的操作在多线程环境下仍然不是原子性的,可能需要使用锁或其他同步机制来保证原子性。
读取和写入缓存一致性: volatile 关键字确保了不同 CPU 核心之间的缓存一致性。在多核处理器上,每个核心都有自己的缓存,volatile 变量的读写操作会强制更新这些缓存,确保所有核心看到的是最新的值。
CountDownLatch实际开发中有两个场景:
单个线程等待多个线程的场景
让多个线程等待的场景
CountDownLatch的简单示例代码如下:
package com.netty.chat; import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { // 创建一个 CountDownLatch 实例,参数表示需要计数的事件数量 final CountDownLatch latch = new CountDownLatch(3); // 创建并启动三个线程,每个线程在完成工作后都会调用 latch.countDown() for (int i = 0; i < 3; i++) { new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 开始执行"); Thread.sleep(1000); // 模拟耗时操作 System.out.println(Thread.currentThread().getName() + " 完成执行"); latch.countDown(); // 完成任务后减少计数器 } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("线程被中断:" + e.getMessage()); } }).start(); } // 主线程等待所有子线程完成 System.out.println("主线程等待所有子线程完成..."); latch.await(); // 当所有子线程完成,主线程继续执行 System.out.println("所有子线程已完成,主线程继续执行..."); } }
结果如下:
从结果看出CountDownLatch 可以实现线程的等待场景,不过有个问题需要注意:
CountDownLatch的await可能会引起死锁。如果是线程池中线程数量少,高并发时多个请求占用全部线程,每个请求需要“await”其他线程,这些被“await”的线程又拿不到资源,无法执行,多个线程进入阻塞,形成死锁。
再看一个例子,CountDownLatch 和 CompletableFuture结合使用:
package com.netty.chat; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CompletableFutureCountDownLatchExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5); CountDownLatch latch = new CountDownLatch(1); // 创建并提交三个 CompletableFuture 任务 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("Task 1 started"); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } System.out.println("Task 1 completed"); latch.countDown(); }, executor); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("Task 2 started"); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } System.out.println("Task 2 completed"); latch.countDown(); }, executor); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { System.out.println("Task 3 started"); try { Thread.sleep(1500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } System.out.println("Task 3 completed"); latch.countDown(); }, executor); // 等待所有 CompletableFuture 任务完成 //CompletableFuture.allOf(future1, future2, future3).join(); // 等待 CountDownLatch 计数器归零 System.out.println("Waiting for all tasks to complete..."); latch.await(); // 所有任务完成后,执行后续操作 System.out.println("All tasks have completed. Proceeding with further actions..."); executor.shutdown(); } }
这个列子中创建了一个计数器CountDownLatch 为1,有三个CompletableFuture异步任务,只要有一个任务完成后,计数器归零,就表示完成任务,此时使用CountDownLatch 非常方便。如果要等待全部任务完成,可以使用CompletableFuture.allOf()方法,当然也可以创建数量为3的计数器,CountDownLatch的await()方法等待其归零。
CyclicBarrier,直译是循环栅栏,好像不太容易理解。举个例子,假设有三个线程,每个线程内部都有较复杂的逻辑,暂定为1,2,3,4,5这几个步骤,需求是必须要等待三个线程都执行到步骤3后,才能分别继续执行后面的4,5步骤,因为线程执行有并行性,每个线程内部执行速度也不一样,不知道哪个线程先执行到了步骤3,其他线程还没有执行到3,所以需要一个屏障等待的控制,先执行到3的先阻塞等着,知道其他线程都执行到了这个屏障,才一起执行后续步骤。
下面的代码是一个较为简单的示例,可以理解CyclicBarrier的用法
package com.netty.chat; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { final int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> { System.out.println("所有线程都已经到达屏障点,现在可以继续执行..."); }); Thread thread1 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 正在执行初始化..."); Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + " 步骤一执行完成"); Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + " 步骤二执行完成"); Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + " 步骤三执行完成"); System.out.println(Thread.currentThread().getName() + " 到达屏障点"); barrier.await(); // 等待所有线程到达这个屏障 System.out.println(Thread.currentThread().getName() + " 继续执行..."); } catch (Exception e) { System.out.println("线程1执行发生异常"); } }, "线程1"); Thread thread2 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 正在执行初始化..."); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " 步骤一执行完成"); Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 步骤二执行完成"); Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 步骤三执行完成"); System.out.println(Thread.currentThread().getName() + " 到达屏障点"); barrier.await(); // 等待所有线程到达这个屏障 System.out.println(Thread.currentThread().getName() + " 继续执行..."); } catch (Exception e) { System.out.println("线程2执行发生异常"); } }, "线程2"); Thread thread3 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 正在执行初始化..."); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " 步骤一执行完成"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 步骤二执行完成"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " 步骤三执行完成"); System.out.println(Thread.currentThread().getName() + " 到达屏障点"); barrier.await(); // 等待所有线程到达这个屏障 System.out.println(Thread.currentThread().getName() + " 继续执行..."); } catch (Exception e) { System.out.println("线程3执行发生异常"); } }, "线程3"); thread1.start(); thread2.start(); thread3.start(); } }
上面的屏障点不一定都是步骤三,可以分别是步骤一、步骤二、步骤三,只要保证每个线程内部执行到对应的屏障点就阻塞等待其他线程执行到对应的屏障点即可。
CountDownLatch 和CyclicBarrier区别:
CountDownLatch 的计数器只能使用一次,CyclicBarrier可以使用reset方法重置
CyclicBarrier能处理更为复杂的业务场景,比如计算错误,可以结束阻塞,重置计数器,重新执行程序
CyclicBarrier提供getNumberWaiting,可以获得CyclicBarrier阻塞的线程数量,还提供isBroken方法,判断阻塞的线程是否被中断等
CountDownLatch 会阻塞主线程,CyclicBarrier则不会,只会阻塞子线程,因为它本来就是用在子线程中的。
Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
使用场景
通常用于那些资源有明确访问数量限制的场景,常用于限流 。
比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。
比如:停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。
Semaphore核心方法如下:
acquire() 获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。 acquire(int permits) 获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。 acquireUninterruptibly() 获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。 tryAcquire() 尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。 tryAcquire(long timeout, TimeUnit unit) 尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。 release() 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。 hasQueuedThreads() 等待队列里是否还存在等待线程。 getQueueLength() 获取等待队列里阻塞的线程数。 drainPermits() 清空令牌把可用令牌数置为0,返回清空令牌的数量。 availablePermits() 返回可用的令牌数量。
业务场景 :每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。
1、停车场容纳总停车量10。
2、当一辆车进入停车场后,显示牌的剩余车位数响应的减1.
3、每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
4、停车场剩余车位不足时,车辆只能在外面等待。
下面时一个停车场灯牌显示代码的经典实现:
package com.netty; import java.util.Random; import java.util.concurrent.Semaphore; public class TestCar { //停车场同时容纳的车辆10 private static final Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { //模拟100辆车进入停车场 for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { public void run() { try { System.out.println("====" + Thread.currentThread().getName() + "来到停车场"); if (semaphore.availablePermits() == 0) { System.out.println("车位不足,请耐心等待"); } semaphore.acquire();//获取令牌尝试进入停车场 System.out.println(Thread.currentThread().getName() + "成功进入停车场"); Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间 System.out.println(Thread.currentThread().getName() + "驶出停车场"); semaphore.release();//释放令牌,腾出停车场车位 } catch (InterruptedException e) { System.out.println("error"); } } }, i + "号车"); thread.start(); } } }
Happens-Before原则是Java内存模型(Java Memory Model, JMM)中的一个核心概念,用于描述在多线程环境中操作之间的偏序关系,确保了操作的内存可见性和有序性。当一个操作A happens-before 操作B时,意味着操作A对操作B可见,并且操作B可以依赖于操作A的结果。具体而言,这意味着:
内存可见性:如果操作A修改了一个共享变量的值,而操作B读取了这个变量的值,那么如果A happens-before B,B将看到A修改后的值。
有序性:操作A happens-before 操作B,保证了从某个观察者的角度看,操作A不会被重排序到操作B之后。
在Java中,Happens-Before原则可以通过以下几种方式建立:
程序顺序规则:在一个线程内的操作按照代码的顺序发生,前面的操作 happens-before 后面的操作。
锁规则:解锁一个锁 happens-before 在另一个线程中对同一锁的加锁。
volatile变量规则:写入一个volatile变量 happens-before 之后对该变量的读取。
线程启动规则:线程的启动调用 happens-before 任何在该线程中执行的操作。
线程中断规则:线程中断的请求 happens-before 检查到中断状态。
线程终止规则:线程的所有操作 happens-before 该线程的终止。
对象终结规则:对象的构造完成 happens-before 对该对象的引用赋给volatile变量或同步锁定的字段。
线程安全是针对多线程来说的,单线程没有这个概念。
具体来说就是在多线程环境下,访问同一个对象时,不考虑线程本身的调度和交替执行,在不做任何干预情况下,多线程怎么操作这个对象,都能得到正确的预期结果,那么就称这个对象是线程安全的。
要在多线程环境下保证线程安全,无非就是保证对对象访问的原子性、有序性、可见性。
原子性问题:指当一个线程执行一系列程序指令时,应该是不可中断的,一旦中断,可能会出现执行前后预期结果不一致问题。
有序性问题:指程序的编写指令的顺序和最终执行的顺序不一致,称为指令重排序。
可见性问题:指多线程环境下,读和写可能发生在不同线程,可能出现某个线程对共享变量的修改,对其他线程不是实时可见的。
针对原子性
JDK提供了很多原子类AtomicInteger、AtomicLong、AtomicBoolean,通过CAS来保证原子性的。
针对有序性
可以使用syncronized关键字定义同步代码块或者同步方法,保证原子性,或者使用Lock接口相关实现类保证有序性。
针对可见性
可以使用syncronized关键字加锁解决,当然也可以使用volatile,性能更高。
在Java中,安全地中断一个正在运行的线程涉及到使用Thread.interrupt()方法和检查线程的中断状态。下面是一种推荐的做法:
调用interrupt()方法: 当你想要中断一个线程时,应该调用该线程的interrupt()方法。这不会立即停止线程,而是设置线程的中断标志。
检查中断状态: 在线程的运行代码中,应该定期检查线程的中断状态。这通常是通过调用Thread.currentThread().isInterrupted()或thread.isInterrupted()来完成的。如果线程被中断,这个方法会返回true。
响应中断: 如果检测到线程已被中断,线程应做出适当的响应,比如退出循环或清理资源。通常,这涉及抛出InterruptedException,然后在调用者那里捕获并处理这个异常。
示例代码如下:
package com.netty.chat; public class SafeThread extends Thread { @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { // 执行任务... // 模拟耗时操作,实际应用中可能是IO操作或计算 Thread.sleep(1000); // 检查中断状态 if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); } } } catch (InterruptedException e) { // 处理中断 System.out.println("线程被中断"); } finally { // 清理资源 } } } // 在主线程中中断子线程 class Main { public static void main(String[] args) throws InterruptedException { SafeThread thread = new SafeThread(); thread.start(); // 假设我们想在5秒后中断线程 Thread.sleep(5000); thread.interrupt(); // 中断线程 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。