赞
踩
Java面试——并发编程:线程基础、并发安全、线程池…
程序由指令和数据组成,但这些指令要运行、数据要读写,就必须将指令加载至 CPU、数据加载至内存。在指令运行过程中还需用到磁盘、网络等设备。进程(Process)就是用于加载指令、管理内存、管理IO。
当一个程序被运行,从磁盘加载这个程序的代码至内存,此时就开启了一个进程。
一个进程中可以分为一到多个线程(Thread)。一个线程就是一个指令流,将指令流中的指令以一定的顺序交给CPU执行。
Java中,线程是最小调度单位,进程是资源分配的最小单位。在windows中进程是不活动的,只是作为线程的容器。
对比
单核CPU中,线程实际是串行执行的。
操作系统中有一个组件叫做任务调度器,将cpu的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,由于cpu在线程间(时间片很短)的切换非常快,人类感觉是同时运行的——微观串行,宏观并行。
这种线程轮流使用CPU的做法称为并发(Concurrent)。
多核CPU中,每个核都可以调度运行线程,这时候线程是并行(Parallel)的。
并发(concurrent)是同一时间应对(dealing with)多件事情的能力
并行(parallel)是同一时间动手做(doing)多件事情的能力
共有四种方式可以创建线程:继承Thread类、实现Runnable接口、实现Callable接口、线程池创建线程
public class MyThread extends Thread { @Override public void run() { System.out.println("MyThread...run..."); } public static void main(String[] args) { // 创建Thread子类对象(或直接Thread传入lambda表达式有参构造,即传入run()函数) MyThread t1 = new MyThread() ; MyThread t2 = new MyThread() ; // 调用start()方法启动线程 t1.start(); t2.start(); } }
public class MyRunnable implements Runnable { @Override public void run() { System.out.println("MyRunnable...run..."); } public static void main(String[] args) { // 创建Runnable实现类对象 MyRunnable mr = new MyRunnable() ; // 创建Thread对象 Thread t1 = new Thread(mr) ; Thread t2 = new Thread(mr) ; // 调用start()方法启动线程 t1.start(); t2.start(); } }
public class MyCallable implements Callable<String> { @Override public String call() throws Exception { System.out.println("MyCallable...call..."); return "OK"; } public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建Callable实现类对象 MyCallable mc = new MyCallable() ; // 创建FutureTask对象(泛型与实现Callable时一致,为返回值类型) FutureTask<String> ft = new FutureTask<String>(mc) ; // 创建Thread对象 Thread t1 = new Thread(ft) ; Thread t2 = new Thread(ft) ; // 调用start()方法启动线程 t1.start(); // 调用FutureTask对象的get()方法获取执行结果 String result = ft.get(); System.out.println(result); } }
public class MyExecutors implements Runnable {
@Override
public void run() {
System.out.println("MyRunnable...run...");
}
public static void main(String[] args) {
// 创建线程池对象
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.submit(new MyExecutors()) ;
// 关闭线程池
threadPool.shutdown();
}
}
run()
方法没有返回值;Callable接口的call()
方法有返回值,是个泛型,和Future、FutureTask配合可以用来获取异步执行的结果。get()
得到返回的执行结果,此方法会阻塞主进程的继续往下执行,若不调用则不会阻塞。call()
方法允许抛出异常;而Runnable接口的run()
方法的异常只能在内部消化,不能继续上抛start()
: 用来启动线程,通过该线程调用run()
方法执行run()
中所定义的逻辑代码。start()
方法只能在启动时被调用一次。run()
: 封装了要被线程执行的代码,可以被调用多次。JDK5之后,Ready和Running合称Runnable状态
JDK中的Thread类中的枚举State封装了操作系统的线程状态
start()
方法转变为可执行状态wait()
方法则进入等待状态,其他线程调用notify()
唤醒后切换为可执行状态sleep()
方法则进入计时等待状态,到时间后切换为可执行状态在多线程中有多种方法让线程按特定顺序执行,可用线程类的join()
方法在一个线程中启动另一个线程,另外一个线程完成该线程继续执行。
【例】新建T1、T2、T3三个线程,为确保三个线程的顺序,应先启动最后一个(T3调用T2,T2调用T1),使得T1先完成而T3最后完成
public class JoinTest { public static void main(String[] args) { // 创建线程对象 Thread t1 = new Thread(() -> { System.out.println("t1"); }) ; Thread t2 = new Thread(() -> { try { t1.join(); // 加入线程t1,仅当t1线程执行完毕以后,再执行该线程 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t2"); }) ; Thread t3 = new Thread(() -> { try { t2.join(); // 加入线程t2,仅当t2线程执行完毕以后,再执行该线程 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t3"); }) ; // 启动线程 t1.start(); t2.start(); t3.start(); } }
notify()
:只随机唤醒一个wait()
线程notifyAll()
:唤醒所有wait()
的线程【例】
public class WaitNotify { static boolean flag = false; static Object lock = new Object(); public static void main(String[] args) { Thread t1 = new Thread(() -> { synchronized (lock){ while (!flag){ System.out.println(Thread.currentThread().getName() + "...wating..."); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "...flag is true"); } }); Thread t2 = new Thread(() -> { synchronized (lock){ while (!flag){ System.out.println(Thread.currentThread().getName() + "...wating..."); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "...flag is true"); } }); Thread t3 = new Thread(() -> { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " hold lock"); lock.notifyAll(); flag = true; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } }
wait()
、wait(long)
和sleep(long)
的效果都是让当前线程暂时放弃CPU的使用权,进入阻塞状态Thread.sleep(long)
是Thread的静态方法wait()
、wait(long)
是Object的成员方法,每个对象都有sleep(long)
和wait(long)
的线程都会在等待相应毫秒后醒来wait(long)
和wait()
还可以被notify()
打断唤醒,其中wait()
如果不唤醒就一直等下去wait()
的调用必须先获取该对象的锁,而sleep()
则无此限制wait()
执行后会释放对象锁,允许其它线程获得该对象锁;而sleep()
如果在synchronized代码块中执行,并不会释放对象锁【例】
public class WaitSleepCase { static final Object LOCK = new Object(); public static void main(String[] args) throws InterruptedException { sleeping(); } private static void illegalWait() throws InterruptedException { LOCK.wait(); } private static void waiting() throws InterruptedException { Thread t1 = new Thread(() -> { synchronized (LOCK) { try { get("t").debug("waiting..."); LOCK.wait(5000L); } catch (InterruptedException e) { get("t").debug("interrupted..."); e.printStackTrace(); } } }, "t1"); t1.start(); Thread.sleep(100); synchronized (LOCK) { main.debug("other..."); } } private static void sleeping() throws InterruptedException { Thread t1 = new Thread(() -> { synchronized (LOCK) { try { get("t").debug("sleeping..."); Thread.sleep(5000L); } catch (InterruptedException e) { get("t").debug("interrupted..."); e.printStackTrace(); } } }, "t1"); t1.start(); Thread.sleep(100); synchronized (LOCK) { main.debug("other..."); } } }
有三种方式可以停止线程:
run()
方法完成后线程终止public class MyInterrupt1 extends Thread { volatile boolean flag = false ; // 线程执行的退出标记 @Override public void run() { while(!flag) { System.out.println("MyThread...run..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { // 创建线程对象 MyInterrupt1 t1 = new MyInterrupt1() ; t1.start(); // 主线程休眠6秒 Thread.sleep(6000); // 更改标记为true t1.flag = true; } }
stop()
方法强行终止(不推荐,该方法已作废)public class MyInterrupt2 extends Thread { volatile boolean flag = false ; // 线程执行的退出标记 @Override public void run() { while(!flag) { System.out.println("MyThread...run..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { // 创建线程对象 MyInterrupt2 t1 = new MyInterrupt2() ; t1.start(); // 主线程休眠2秒 Thread.sleep(6000); // 调用stop方法 t1.stop(); } }
interrupt()
方法中断线程public class MyInterrupt3 { public static void main(String[] args) throws InterruptedException { //1.打断阻塞的线程 /* Thread t1 = new Thread(()->{ System.out.println("t1 正在运行..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1"); t1.start(); Thread.sleep(500); t1.interrupt(); System.out.println(t1.isInterrupted()); */ //2.打断正常的线程 Thread t2 = new Thread(()->{ while(true) { Thread current = Thread.currentThread(); boolean interrupted = current.isInterrupted(); if(interrupted) { System.out.println("打断状态:" + interrupted); break; } } }, "t2"); t2.start(); Thread.sleep(500); // t2.interrupt(); } }
在引入多线程后,由于线程执行的异步性,会给系统造成混乱,特别是在急用临界资源时,如多个线程急用同一台打印机,会使打印结果交织在一起,难于区分。当多个线程急用共享变量,表格,链表时,可能会导致数据处理出错,因此线程同步的主要任务是使并发执行的各线程之间能够有效的共享资源和相互合作,从而使程序的执行具有可再现性。
当线程并发执行时,由于资源共享和线程协作,使用线程之间会存在以下两种制约关系。
对于互斥可以这样理解:线程A和线程B互斥访问某个资源则它们之间就会产个顺序问题——要么线程A等待线程B操作完毕,要么线程B等待线程操作完毕,这其实就是线程的同步。因此互斥其实是一种特殊的同步,同步包括互斥。
- Synchronized对象锁采用互斥的方式让同一时刻至多只有一个线程能持有对象锁
- 其底层由monitor实现,monitor是JVM级别的对象( C++实现),线程获得锁需要使用对象(锁)关联monitor
- 在monitor内部有三个属性——Owner、EntryList、WaitSet。其中owner是关联的获得锁的线程,并且只能关联一个线程;EntryList关联的是处于阻塞状态的线程;WaitSet关联的是处于Waiting状态的线程
Synchronized(对象锁)采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个对象锁时就会阻塞住
【例】如下的抢票代码,如果不加锁,就会出现超卖或者一张票卖给多个人
public class TicketDemo { static Object lock = new Object(); int ticketNum = 10; public synchronized void getTicket() { synchronized (this) { if (ticketNum <= 0) { return; } System.out.println(Thread.currentThread().getName() + "抢到一张票,剩余:" + ticketNum); // 非原子性操作 ticketNum--; } } public static void main(String[] args) { TicketDemo ticketDemo = new TicketDemo(); for (int i = 0; i < 20; i++) { new Thread(() -> { ticketDemo.getTicket(); }).start(); } } }
Monitor(监视器)由JVM提供,用C++语言实现。在代码中想要体现monitor需要借助javap
命令查看clsss的字节码。
【例】找到以下类的class文件,在class文件目录下执行javap -v SyncTest.class
,反编译效果如下:
public class SyncTest {
static final Object lock = new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}
}
其中monitorenter为上锁开始的地方,monitorexit为解锁的地方。被monitorenter和monitorexit包围住的指令就是上锁的代码。
有两个monitorexit的原因:第二个monitorexit是为了防止锁住的代码抛异常后不能及时释放锁。
在使用了synchornized代码块时需要指定一个对象,所以synchornized也被称为对象锁。
monitor跟该对象产生关联,如下图所示
Monitor内部具体的存储结构:
wait()
方法的线程,即处于等待(Waiting)状态的线程具体执行流程:
wait()
方法,则让当前线程进入WaitSet进行等待。Monitor实现的锁属于重量级锁。何为锁升级?
- Monitor实现的锁属于重量级锁,里面涉及到了用户态和内核态的切换、进程的上下文切换,成本较高,性能比较低。
- 在JDK 1.6引入了两种新型锁机制——轻量级锁和偏向锁。它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。
Java中的synchronized有偏向锁、轻量级锁、重量级锁三种形式,分别对应了锁只被一个线程持有、不同线程交替持有锁、多线程竞争锁三种情况。
描述 | |
---|---|
重量级锁 | 底层使用的Monitor实现,涉及到用户态和内核态的切换、进程的上下文切换,成本较高,性能比较低 |
轻量级锁 | 若线程加锁的时间是错开的(即没有竞争),可以使用轻量级锁来优化。轻量级修改了对象头的锁标志,相对重量级锁性能提升很多。每次修改都是CAS操作,保证原子性 |
偏向锁 | 若很长一段时间内都只有一个线程使用锁,可以使用偏向锁。在第一次获得锁时,会有一个CAS操作,之后该线程再获取锁,只需要判断MarkWord中是否是自己的线程id即可,而不是开销相对较大的CAS命令 |
在HotSpot虚拟机中,对象在内存中存储的布局可分为3块区域:对象头(Header)、实例数据(Instance Data)和对齐填充
可以通过lock的标识来判断是哪一种锁的等级:
Monitor实现的锁属于重量级锁。每个Java对象都可以关联一个Monitor对象,使用synchronized给对象上锁(重量级)之后,该对象头MarkWord中就被设置了指向Monitor对象的指针,这就使得对象与monitor产生关联。
在很多的情况下,在Java程序运行时,同步块中的代码都是不存在竞争的,不同的线程交替的执行同步块中的代码。这种情况下,用重量级锁是没必要的。因此JVM引入了轻量级锁的概念。
static final Object obj = new Object();
public static void method1() {
synchronized (obj) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized (obj) {
// 同步块 B
}
}
4. 若CAS修改失败,说明发生了竞争,需要膨胀为重量级锁。
轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行CAS操作。
Java 6中引入了偏向锁来做进一步优化:只有第一次使用CAS将线程ID设置到对象的MarkWord头,之后发现
这个线程ID是自己的就表示没有竞争,不用重新CAS。以后只要不发生竞争,这个对象就归该线程所有。
static final Object obj = new Object(); public static void m1() { synchronized (obj) { // 同步块 A m2(); } } public static void m2() { synchronized (obj) { // 同步块 B m3(); } } public static void m3() { synchronized (obj) { } }
加锁流程如下,解锁流程参考轻量级锁
JMM(Java Memory Model,Java内存模型)是java虚拟机规范中所定义的一种内存模型,描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节。
特点:
CAS(Compare And Swap,比较再交换)体现了一种乐观锁的思想,在无锁情况下保证线程操作共享数据的原子性。在JUC(java.util.concurrent)包下实现的很多类都用到了CAS操作:AbstractQueuedSynchronizer(AQS框架)、AtomicXXX类等。
【例】基于上文的JMM内存模型进行说明
int a = 100
,同时放到各个线程的工作内存中int a = 100
,A:int a = 100
,B:修改后的值 int a = 101
(a++
)
int a = 100
,A:int a = 100
,B:修改后的值:int a = 99
(a--
)
需要不断尝试获取共享内存V中最新的值,然后再在新的值的基础上进行更新操作,如果失败就继续尝试获取新的值,直到更新成功
CAS 底层依赖于一个Unsafe类来直接调用操作系统底层的 CAS 指令:
如上所示,均为native修饰的方法,由系统提供的接口执行,并非java代码实现,一般的思路也都是自旋锁实现
在java中比较常用的如ReentrantLock和Atomic开头的线程安全类,都调用了Unsafe中的方法
【例】ReentrantLock中的一段CAS代码
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,其就具备了以下两层语义:
保证了不同线程对该变量进行操作时的可见性,即一个线程修改了某个变量的值,该新值对其他线程来说是立即可见的。volatile关键字会强制将修改的值立即写入主存。
【例】永不停止的循环:当执行以下代码时,会发现foo()
方法中的循环无法结束,即读取不到共享变量的值结束循环
public class ForeverLoop { static boolean stop = false; public static void main(String[] args) { new Thread(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } stop = true; System.out.println("modify stop to true..."); }).start(); foo(); } static void foo() { int i = 0; while (!stop) { i++; } System.out.println("stopped... c:"+ i); } }
原因:在JVM虚拟机中有一个JIT(即时编辑器)对代码做了优化。
while (!stop) { i++; }
- 1
- 2
- 3
上述这段代码在短时间内执行次数太多,当达到阈值时JIT就会优化此代码,如下所示
while (true) { i++; }
- 1
- 2
- 3
代码优化后,即使
stop
变量变为false
也依旧无法停止循环
解决方案:
-Xint
,表示禁用即时编辑器。(不推荐,得不偿失,因为其他程序还要使用)stop
变量时加上volatile,表示当前代码禁用了即时编辑器。代码如下:static volatile boolean stop = false;
用volatile修饰共享变量会在读、写共享变量时加入不同的屏障,阻止其他读写操作越过屏障,从而达到阻止重排序的效果。
【例】如下代码所示,在获取结果时,有可能会出现4种情况
actor2()
获取结果 → 0, 0
(正常)actor1()
中的第一行代码,然后执行actor2()
获取结果 → 0, 1
(正常)actor1()
中所有代码,然后执行actor2()
获取结果 → 1, 1
(正常)actor1()
中第二行代码,然后执行actor2()
获取结果 → 1, 0
(发生了指令重排序,影响结果)解决方案:在变量上添加volatile,禁止指令重排序,即可以解决问题。代码与屏障添加的示意图如下所示
两个原则:
【补充】上面的解决方案将volatile加在变量y
上。将volatile加在变量x
上是不可行的,因为违反了上述两个原则。代码与屏障添加的示意图如下所示
由此可总结出volatile的使用技巧:
AQS(AbstractQueuedSynchronizer)是阻塞式锁和相关的同步器工具的框架,它是构建锁或者其他同步组件的基础框架。
AQS常见的实现类:ReentrantLock阻塞式锁、Semaphore信号量、CountDownLatch倒计时锁
synchronized | AQS |
---|---|
是关键字,C++语言实现 | Java语言实现 |
悲观锁,自动释放锁 | 悲观锁,手动开启和关闭 |
锁竞争激烈,都是重量级锁,性能差 | 锁竞争激烈时提供了多种解决方案 |
【例】如下图所示
常见问答
比较典型的AQS实现类ReentrantLock默认就是非公平锁,新的线程与队列中的线程共同来抢资源
ReentrantLock(可重入锁)相对于synchronized具备以下特点:
ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。
构造方法接受一个可选的公平参数(默认非公平锁),当设为true时,表示公平锁,否则为非公平锁。公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。
其中NonfairSync和FairSync这两个类父类都是Sync
而Sync的父类是AQS,所以可以得出ReentrantLock底层主要实现就是基于AQS来实现的
公平锁:按照先后顺序获取锁;非公平锁:不在排队的线程也可以抢锁
死锁:一个线程需要同时获取多把锁
【例】t1线程获得A对象锁,接下来想获取B对象的锁;t2线程获得B对象锁,接下来想获取A对象的锁
public class Deadlock { public static void main(String[] args) { Object A = new Object(); Object B = new Object(); Thread t1 = new Thread(() -> { synchronized (A) { System.out.println("lock A"); try { sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } synchronized (B) { System.out.println("lock B"); System.out.println("操作..."); } } }, "t1"); Thread t2 = new Thread(() -> { synchronized (B) { System.out.println("lock B"); try { sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } synchronized (A) { System.out.println("lock A"); System.out.println("操作..."); } } }, "t2"); t1.start(); t2.start(); } }
此时程序并没有结束,这种现象就是死锁现象——线程t1持有A的锁等待获取B锁,线程t2持有B的锁等待获取A的锁。
当程序出现了死锁现象,可以使用jdk自带的工具:jps和jstack
死锁诊断步骤如下:
jstack -l 46032
其他解决工具、可视化工具:
ConcurrentHashMap是一种线程安全的高效Map集合
底层数据结构:
数据结构:
存储流程:
在JDK1.8中放弃了Segment臃肿的设计,数据结构与HashMap一样:数组+红黑树+链表
采用CAS + Synchronized来保证并发安全进行实现
根本原因:Java并发编程三大特性——原子性、内存可见性、有序性
一个线程在CPU中操作不可暂停,也不可中断,要不执行完成,要不不执行
【例】以下代码会出现超卖或者是一张票卖给同一个人,执行并不是原子性的
解决方案:
让一个线程对共享变量的修改对另一个线程可见
【例】以下代码不能保证内存可见性
解决方案:
指令重排:处理器为了提高程序运行效率,可能会对输入代码进行优化。它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。
【例】使用2.5.2中的例子
解决方案:volatile
拒绝策略:
【例】
public class TestThreadPoolExecutor { static class MyTask implements Runnable { private final String name; private final long duration; public MyTask(String name) { this(name, 0); } public MyTask(String name, long duration) { this.name = name; this.duration = duration; } @Override public void run() { try { LoggerUtils.get("myThread").debug("running..." + this); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "MyTask(" + name + ")"; } } public static void main(String[] args) throws InterruptedException { AtomicInteger c = new AtomicInteger(1); ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 3, 0, TimeUnit.MILLISECONDS, queue, r -> new Thread(r, "myThread" + c.getAndIncrement()), new ThreadPoolExecutor.AbortPolicy()); showState(queue, threadPool); threadPool.submit(new MyTask("1", 3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("2", 3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("3")); showState(queue, threadPool); threadPool.submit(new MyTask("4")); showState(queue, threadPool); threadPool.submit(new MyTask("5",3600000)); showState(queue, threadPool); threadPool.submit(new MyTask("6")); showState(queue, threadPool); } private static void showState(ArrayBlockingQueue<Runnable> queue, ThreadPoolExecutor threadPool) { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } List<Object> tasks = new ArrayList<>(); for (Runnable runnable : queue) { try { Field callable = FutureTask.class.getDeclaredField("callable"); callable.setAccessible(true); Object adapter = callable.get(runnable); Class<?> clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter"); Field task = clazz.getDeclaredField("task"); task.setAccessible(true); Object o = task.get(adapter); tasks.add(o); } catch (Exception e) { e.printStackTrace(); } } LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks); } }
workQueue:当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务。常见的有如下4个
用的最多为LinkedBlockingQueue和ArrayBlockingQueue,区别如下
LinkedBlockingQueue | ArrayBlockingQueue |
---|---|
默认无界,支持有界 | 强制有界 |
底层是链表 | 底层是数组 |
是懒惰的,创建节点的时候添加数据 | 提前初始化Node 数组 |
入队会生成新Node | Node需要提前创建好 |
两把锁(头尾) | 一把锁 |
如下图所示,左、右分别为LinkedBlockingQueue、ArrayBlockingQueue的加锁方式
CPU核数 + 1
,减少线程上下文的切换2 * CPU核数 + 1
CPU核数 + 1
java代码查看CPU核数:
在java.util.concurrent.Executors类中提供了大量创建连接池的静态方法,常见的有以下4种
Integer.MAX_VALUE
【例】
public class FixedThreadPoolCase { static class FixedThreadDemo implements Runnable{ @Override public void run() { String name = Thread.currentThread().getName(); for (int i = 0; i < 2; i++) { System.out.println(name + ":" + i); } } } public static void main(String[] args) throws InterruptedException { //创建一个固定大小的线程池,核心线程数和最大线程数都是3 ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { executorService.submit(new FixedThreadDemo()); Thread.sleep(10); } executorService.shutdown(); } }
只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行
Integer.MAX_VALUE
【例】
public class NewSingleThreadCase { static int count = 0; static class Demo implements Runnable { @Override public void run() { count++; System.out.println(Thread.currentThread().getName() + ":" + count); } } public static void main(String[] args) throws InterruptedException { //单个线程池,核心线程数和最大线程数都是1 ExecutorService exec = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(5); } exec.shutdown(); } }
Integer.MAX_VALUE
【例】
public class CachedThreadPoolCase { static class Demo implements Runnable { @Override public void run() { String name = Thread.currentThread().getName(); try { //修改睡眠时间,模拟线程执行需要花费的时间 Thread.sleep(100); System.out.println(name + "执行完了"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //创建一个缓存的线程,没有核心线程数,最大线程数为Integer.MAX_VALUE ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { exec.execute(new Demo()); Thread.sleep(1); } exec.shutdown(); } }
【例】
public class ScheduledThreadPoolCase { static class Task implements Runnable { @Override public void run() { try { String name = Thread.currentThread().getName(); System.out.println(name + ", 开始:" + new Date()); Thread.sleep(1000); System.out.println(name + ", 结束:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { //按照周期执行的线程池,核心线程数为2,最大线程数为Integer.MAX_VALUE ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); System.out.println("程序开始:" + new Date()); /** * schedule 提交任务到线程池中 * 第1个参数:提交的任务 * 第2个参数:任务执行的延迟时间 * 第3个参数:时间单位 */ scheduledThreadPool.schedule(new Task(), 0, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS); scheduledThreadPool.schedule(new Task(), 5, TimeUnit.SECONDS); Thread.sleep(5000); // 关闭线程池 scheduledThreadPool.shutdown(); } }
参考阿里开发手册《Java开发手册-嵩山版》
CountDownLatch(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)
await()
:等待计数归零countDown()
:让计数减一public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 初始化一个倒计时锁,参数为3 CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } // count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } // count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"-begin..."); try { Thread.sleep(1500); } catch (InterruptedException e) { throw new RuntimeException(e); } // count-- latch.countDown(); System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount()); }).start(); String name = Thread.currentThread().getName(); System.out.println(name + "-waiting..."); // 等待其他线程完成 latch.await(); System.out.println(name + "-wait end..."); } }
Semaphore(信号量)是JUC包下的一个工具类,可以通过其限制执行的线程数量,达到限流的效果。
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。
两个重要的方法:
acquire()
:请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)release()
:释放一个信号量,此时信号量个数+1【例】线程任务类
public class SemaphoreCase { public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { try { // 3. 获取许可 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end..."); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } } }
ThreadLocal(线程局部变量)是多线程中对于解决线程安全的一个操作类。用于实现线程内的数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。
案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的Connection上进行数据库的操作,避免A线程关闭了B线程的连接。
三个主要方法:
set(value)
:设置值get()
:获取值remove()
:清除值public class ThreadLocalTest { static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { new Thread(() -> { String name = Thread.currentThread().getName(); threadLocal.set("itcast"); print(name); System.out.println(name + "-after remove : " + threadLocal.get()); }, "t1").start(); new Thread(() -> { String name = Thread.currentThread().getName(); threadLocal.set("itheima"); print(name); System.out.println(name + "-after remove : " + threadLocal.get()); }, "t2").start(); } static void print(String str) { //打印当前线程中本地内存中本地变量的值 System.out.println(str + " :" + threadLocal.get()); //清除本地内存中的本地变量 threadLocal.remove(); } }
ThreadLocal本质是一个线程内部存储类,让多个线程只操作自己内部的值,从而实现线程数据隔离。
在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap。ThreadLocalMap中有一个属性table数组,这才是真正存储数据的位置。
set()
方法get()
/ remove()
方法Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用
每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中的Entry对象继承了WeakReference。其中key为使用弱引用的ThreadLocal实例,value为线程变量的副本
在使用ThreadLocal的时候,强烈建议 务必手动remove!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。