赞
踩
Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。启动线程的方法就是通过 Thread 类的 start() 实例方法。start() 方法是一个 native 方法,它将启动一个线程,并执行 run() 方法。
- public class MyThread extends Thread {
- @Override
- public void run() {
- System.out.println("MyThread.run()");
- }
-
- public static void main(String[] args) {
- MyThread myThread1 = new MyThread();
- myThread1.start();
- }
- }
如果自己的类已经 extends 另一个类,就无法直接 extends Thread,此时,可以实现一个 Runnable 接口。
- public class MyThread implements Runnable {
- @Override
- public void run() {
- System.out.println("MyThread.run()");
- }
- }
启动 Mythread,需要先实例化一个 Thread,并传入自己的 MyThread 实例:
- MyThread myThread = new MyThread();
- Thread thread = new Thread(myThread);
- thread.start();
事实上,当传入一个 Runnable target 参数给 Thread 后,Thread 的 run() 方法就会调用 target.run()。
- //此未 Thread 类的 run 方法的实现。
- public void run() {
- if (target != null) {
- target.run();
- }
- }
有返回值的任务必须实现 Callable 接口,类似的,无返回值的任务必须实现 Runnable 接口。执行 Callable 任务后,可以获取一个 Future 对象,在该对象上调用 get 就可以获取到 Callable 任务返回的 Object 了,再结合线程池接口 ExecutorService 就可以实现传说中有返回结果的线程了。
- //创建一个线程池
- ExecutorService pool = Executors.newFixedThreadPool(taskSize);
- //创建多个有返回值的任务
- List<Future> list = new ArrayList<>();
- for (int i=0; i<taskSize; i++) {
- Callable c = new MyCallable(i+"");
- //执行任务并获取 Future 对象
- Future f = pool.submit(c);
- list.add(f);
- }
- pool.shutdown();
- for (Future f : list) {
- System.out.println("res: "+ f.get().toString());
- }
线程和数据库连接这些资源都是非常宝贵的资源。那么每次需要的时候创建,不需要的时候撤销,是非常浪费资源。那么我们可以使用缓存的策略,也就是线程池。
- //创建线程池
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- while (true){
- threadPool.execute(new Runnable() { //提交多个线程那屋,并执行
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+"is running..");
- try {
- Thread.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
Java 里面线程池的顶级接口是 Executor,但是严格意义上来将 Executor 并不是一个线程池,而只是一个指向线程的工具。真正的线程池接口是 ExecutorService。
创建一个可以根据需要创建新线程的线程池,但是之前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个线程并添加到池中。终止并从缓冲中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任务资源。
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。如果在所有线程处于活动状态时提交附加任务,则在可用线程终止之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
创建一个线程池,它可安排在给定延迟后运行或者定期地执行。
- ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
- pool.schedule(new Runnable() {
- @Override
- public void run() {
- System.out.println("延迟三秒");
- }
- },3, TimeUnit.SECONDS);
- pool.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- System.out.println("延迟一秒后没3秒执行一次");
- }
- },1,3,TimeUnit.SECONDS);
Executors.newSingleThreadExecutor() 返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或者发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
线程被创建以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。在线程生命周期中,它需要经过新建(NEW)、就绪(RUNNABLE)、运行(RUNNING)、阻塞(BLOCKED)和死亡(DEAD)5中状态。尤其是当线程启动以后,它不能一直“霸占”这 CPU 独自运行,所以 CPU 需要在多条线程之间切换,于是线程状态也会多次在运行、阻塞之间切换。
当程序使用 new 关键字创建了一个线程后,该线程就处于新建状态,此时仅由 JVM 为其分配内存,并初始化其成员变量。
当线程对象调用了 start() 方法治好,该线程处于就绪状态。Java 虚拟机会为其创建方法调用栈和程序计数器,等待调度运行。
如果处于就绪状态的线程获得了 CPU,并开始执行 run() 方法的线程执行体,则该线程处于运行状态。
阻塞状态是指线程因为某种原因放弃了 CPU 使用权,即让出了 cpu timeslice,暂时停止其运行。直到线程进入可运行( Runnable )状态,才有机会再次获得 cpu timeslice 转到运行状态。阻塞的情况分三种:
运行的线程执行 o.wait() 方法,JVM 会把该线程放入等待队列(waiting queue)中。
运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则 JVM 会把该线程放入锁池(lock pool)中。
运行的线程执行 Thread.sleep() 或 t.join() 方法,或者发出了 I/O 请求时,JVM 会把该线程置为阻塞状态。当 sleep() 状态超时、join() 等待线程终止或者超时、或者 I/O 处理完毕时,线程重新转入可运行状态。
线程会以下面三种方式结束,结束后就是死亡状态。
run() 或 call() 方法执行完成,线程正常结束。
线程抛出一个未被捕获的 Exception 或 Error。
直接调用该线程的 stop() 方法来结束该线程——该方法通常容易导致死锁,不推荐使用。
程序运行结束,线程自动结束。
一般 run() 方法执行完,线程就会正常结束,然而,常常有些线程是伺服线程。它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程。使用一个变量来控制循环,例如:最直接的方法就是设一个 boolean 类型的标志,并通过设置这个标志位 true 或 false 来控制 while 循环是否退出,代码实例:
- public class ThreadSafe extends Thread {
- public volatile boolean exit = false;
- @Override
- public void run() {
- while (!exit) {
- //do something
- }
- }
- }
定义了一个退出标志 exit,当 exit 为 true 时,while 循环就退出, exit 默认值为false。在定义 exit 时,使用了一个 Java 关键字 volatile ,这个关键字的目的是使 exit 同步,也就是说在同一时刻只能由一个线程来修改 exit 的值。
使用 interrupt() 方法来中断线程有两种情况:
- public class ThreadSafe extends Thread {
- @Override
- public void run() {
- while (!isInterrupted()) { //非阻塞过程中通过判断终端标志来退出
- try {
- System.out.println("ThreadSafe running...");
- Thread.sleep(5000); //阻塞过程捕获中断异常来退出
- } catch (InterruptedException e) {
- System.out.println("捕获到异常");
- e.printStackTrace();
- break;//捕获到异常后,执行 break 跳出循环
- }
- }
- }
- }
程序中可以直接使用 thread.stop() 来强行终止线程,但是 stop 方法很危险,就像突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果,不安全主要是:thread.stop() 调用之后,创建子线程的线程就会抛出 ThreadDeatherror 的错误,并会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用 thread.stop() 后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用 stop 方法来终止线程。
乐观锁一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采用在写时先读出当前的版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作。
Java 中的乐观锁基本都是通过 CAS 操作实现的, CAS 是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败。
悲观锁就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会 block 直到拿到锁。 Java 中的悲观锁就是 synchronized, AQS 框架下的锁则是先尝试 CAS 乐观锁去获取锁,获取不到才会转换为悲观锁,如 RetreenLock。
自旋锁原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁以后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。
线程自旋是需要消耗 CPU 的,说白了就是让 CPU 在做无用功,如果一直获取不到锁,那线程也不能一直占用 CPU 自旋做无用功,所以需要设定一个自旋等待的最大时间。
如果持有锁的线程执行时间超过自旋等待的最大时间仍没有释放锁,就会导致其他争用锁的线程在最大等待时间内还是获取不到锁,这时争用线程会停止自旋进入阻塞状态。
自旋锁的优缺点
自旋锁尽可能的减少线程阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换!
但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,这时候就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用 CPU 做无用功,占着 XX 不 XX,同时又大量线程在竞争一个锁,会导致获取锁的时间很长,线程自旋的消耗大于线程阻塞挂起操作的消耗,其他需要 CPU 的线程又不能得到 CPU,造成 CPU 的浪费。所以这种情况下我们要关闭自旋。
自旋锁时间阀值(1.6 引入了适应性自旋锁)
自旋锁的目的是为了占着 CPU 的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢? 如果自旋执行时间太长,会有大量的线程处于自旋状态占用 CPU 资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!
JVM 对于自旋周期的选择, JDK1.5 这个限度是写死的,在 1.6 引入了适应性自旋锁,适应性自旋锁的目的意味着自旋的时间不再是固定的了,而是由前一次在同一个锁上的自旋时间以及锁的拥有者状态来决定的,基本认为一个线程上下文切换的时间是最佳的一个时间,同时 JVM 还针对当前 CPU 的符合情况做了较多的优化,如果平均负载小于 CPUs 则一直自旋,如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞,如果正在自旋的线程发现 Owner 发了变化则延迟自旋时间(自旋计数)或进入阻塞,如果 CPU 处于节电模式则停止自旋,自旋时间的最坏情况是 CPU 的存储延迟(CPU A 存储了一个数据,到 CPU B 得到这个数据直接的时间差),自旋会释放放弃线程优先级之间的差异。
自旋锁的开启
JDK 1.6 -XX:+UseSpinning 开启
-XX:PreBlockSpin=10 为自旋次数;
JDK 1.7 后,去掉此参数,由 JVM 控制;
synchronized 它可以把一个非 NULL 的对象当作锁。它属于独占式的悲观锁,同时属于可重入锁。
synchronized 作用范围
synchronized 核心组件
synchronized 实现
ReentrantLock 继承接口 Lock 并实现了接口中定义的方法,它是一种可重入锁,除了能完成 synchronized 所能完成的所有工作外,还提供了诸如可响应中断锁、可轮询锁请求、定时锁等避免多线程死锁的方法。
Lock 接口的主要方法
非公平锁
JVM 按随机、就近原则分配锁的机制则称为非公平锁,ReentrantLock 在构造函数中提供了是否公平锁的初始化方式,默认为非公平锁。非公平锁实际执行的效率要远远超出公平锁,除非程序有特殊需要,否则最常用的是非公平锁的分配机制。
公平锁
公平锁指的是锁的分配机制是公平的,通常先对锁提出获取请求的线程会先分配到锁,ReentrantLock 在构造函数中提供了是否公平锁的初始化方式来定义公平锁。
ReentrantLock 与 synchronized
ReentrantLock 实现
- public class MyService {
- private Lock lock = new ReentrantLock();//默认非公平锁
- // private Lock lock = new ReentrantLock(true);//公平锁
- // private Lock lock = new ReentrantLock(false);//非公平锁
- private Condition condition = lock.newCondition();//创建 Condition
-
- public void testMethod() {
- try {
- lock.lock();//加锁
- //1: await 方法等待
- System.out.println("开始 await");
- condition.await();
- //通过创建 Condition 对象来使线程 wait,必须先执行 lock.lock() 方法获得锁
- //2: signal 方法唤醒
- condition.signal();//condition 对象的 signal() 方法可以唤醒 wait 线程
- for (int i=0; i<5; i++) {
- System.out.println("ThreadName=" + Thread.currentThread().getName() + (i+1));
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
Condition 类 和 Object 类锁方法区别
tryLock() 和 lock() 和 lockInterruptibly() 的区别
Semaphore 是一种基于计数的信号量。它可以设定一个阀值,基于此,多个线程竞争获取许可信号,做完自己的事情后归还,超过阀值后,线程申请许可信号将会被阻塞。Semaphore 可以用来构建一些对象池,资源池之类的,比如数据库连接池
实现互斥锁( 计数器为 1 )
我们也可以创建计数为 1 的 Semaphore,将其作为一种类似互斥锁的机制,这也叫做二元信号量,表示两种互斥状态。
代码实现
它的用法如下:
- // 创建一个计数阀值为 5 的信号量对象
- // 只能 5 个线程同时访问
- Semaphore semaphore = new Semaphore(5);
-
- public void testMethod() {
- try {
- // 申请许可
- semaphore.acquire();
- try {
- // 业务逻辑
- } catch (Exception e) {
- } finally {
- // 释放许可
- semaphore.release();
- }
- } catch (InterruptedException e) {
- }
- }
Semaphore 与 ReentrantLock
Semaphore 基本能完成 ReentrantLock 的所有工作,使用方法也与之类似,通过 acquire() 与 release() 方法来获得和释放临界资源。经实测,Semaphore.acquire() 默认为可响应中断锁,与 ReentrantLock.lockInterruptibly() 作用效果一致,也就是说在等待临界资源过程中可以被 Thread.interrupt() 方法中断。
此外,Semaphore 也实现了可轮询的锁请求与定时锁的功能,除了方法名 tryAcquire 与 tryLock 不同,其使用方法域 ReentrantLock 几乎一致。Semaphore 也提供了公平与非公平锁的机制,也可在构造函数中进行设定。
Semaphore 的锁释放操作也由手动进行,因此 ReentrantLock 一样,为避免线程应抛出异常而无法正常释放锁的情况发生,释放锁的操作也必须在 finally 代码块中完成。
首先说明,此处 AtomicInteger ,一个提供原子操作的 Integer 的类,常见的还有 AtomicBoolean、AtomicLong、AtomicReference 等,他们的实现原理相同,区别在于运算对象的类型不同。令人兴奋到,还可以通过 AtomicReference 将一个对象的所有操作转化成原子操作。
我们知道,在多线程中,诸如 ++i 或 i++ 等运算不具有原子性,是不安全的线程操作之一。通过我们会使用 synchronized 将该操作变成一个原子操作,但 JVM 为此类操作特意提供了一些同步类,使得使用更方便,且使程序运行效率变得更高。通过相关资料显示,通常 AtomicInteger 的性能是 ReentrantLock 的好几倍。
本文里面讲的是广义上的可重入锁,而不单指 Java 下的 ReentrantLock。可重入锁也叫递归锁,指的是同一线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响。在 Java 环境下 ReentrantLock 和 synchronized 都是可重入锁。
公平锁
加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得。
非公平锁
加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待。
为了提高性能,Java 提供了读写锁,在读的地方使用读锁,在写的地方使用写锁,灵活控制,如果没有写锁的情况下,读是无阻塞的,在一定程度上提高了程序的执行率。读写锁分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由 jvm 自己控制,你只要上好相应的锁即可。
读锁
如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上锁。
写锁
如果你的代码,只能有一个人在写,但不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁。
Java 中读写锁有个接口 java.util.concurrent.locks.ReadWriteLock ,也有具体的实现 ReentrantReadWriteLock 。
JAVA 并发包提供的加锁模式分为独占锁和共享锁。
独占锁
独占锁模式下,每次只能有一个线程能持有锁,ReentrantLock 就是以独占方式实现的互斥锁。独占锁是一种悲观保守的加锁策略,它避免了读/写冲突,如果某个只读线程获取锁,则其他读线程只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。
共享锁
共享锁则允许多个线程同时获取锁,并发访问资源,如:ReadWriteLock。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。
synchronized 是通过对象内部的一个叫做监视器锁(monitor)来实现的。但是监视器锁本质又依赖于底层的操作系统的 Mutex Lock 来实现的。而操作系统实现线程之间的切换这就需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么 synchronized 效率低的原因。因此,这种依赖于操作系统 Mutex Lock 所实现的锁我们称之为“重量级锁”。JDK 中对 synchronized 做的种种优化,其核心是为了较少这种重量级锁的使用。JDK 1.6 以后,为了减少获得锁和释放锁所带来的性能消耗,提供性能,引入了“轻量级锁”和“偏向锁”。
锁的状态共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。
锁升级
随着锁的竞争,锁可以从偏向锁升级到轻量级锁,在升级到重量级锁(但是锁的升级时单向的,也就是说只能从低到高升级,不会出现锁的降级)。
“轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的。但是,首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。在解释轻量级锁的执行过程之前,先明白一点,轻量级锁所适应的场景是线程交替执行同步块的情况,如果存在同一时间访问同一锁的情况,就会导致轻量级锁膨胀为重量级锁。
Hotspot 的作者经过以往的研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。偏向锁的目的是在某个线程获得锁之后,消除这个线程锁重入 ( CAS ) 的开销,看起来这个线程得到了偏护。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的 CAS 原子指令的性能消耗)。上面说过,轻量级锁是为了在线程交替执行同步块时提供性能,而偏向锁则是在只有一个线程执行同步块时进一步提高。
分段锁也并非一种实际的锁,而是一种思想。ConcurrentHashMap 是学习分段锁的最好实践。
减少锁持有时间
只用在有线程安全要求的程序上加锁
减小锁粒度
将大对象(这个对象可能会被很多线程访问),拆成小对象,大大增加并行度,降低锁竞争。降低了锁的竞争,偏向锁,轻量级锁成功率才会提高。最典型的减小锁粒度的案例就是ConcurrentHashMap 。
锁分离
最常见的锁分离就是 ReadWriteLock,根据功能进行分离成读锁和写锁,这样读读不互斥,读写互斥,写写互斥,即保证了线程安全,又提高了性能,具体也请查看【高并发 Java 五】 JDK 并发包 1。读写分离思想可以延伸,只要操作互不影响,锁就可以分离。比如 LinkedBlockingQueue 从头部去除,从尾部放数据。
锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。但是,凡是都有一个读,如果对同一个锁不停的进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化。
锁消除
锁消除是在编译器级别的事情。在即时编译时,如果发现不可能被共享的对象,则可以消除这些对象的锁操作,多数是因为程序员编码不规范引起。
线程相关的基本方法有 wait、notify、notifyAll、sleep、join、yield 等。
调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回,需要注意的是调用 wait() 方法后,会释放对象的锁。因此,wait 方法一般用在同步方法或同步代码块中。
sleep 导致当前线程休眠,与 wait 方法不同的是 sleep 不会释放当前占有的锁, sleep(long) 会导致线程进入 TIMED-WAITING 状态,而 wait() 方法会导致当前线程进入 WATING 状态。
yield 会使当前线程让出 CPU 执行时间片,与其他线程一起重新竞争 CPU 时间片。一般情况下,优先级高的线程有更大的可能性成功竞争到 CPU 时间片,但这又不是绝对的,有的操作系统对优先级并不敏感。
中断一个线程,其本意是给这个线程一个通知信号,会影响这个线程内部一个中断标识位。这个线程本身并不会因此而改变状态(如阻塞、终止等)。
join() 方法,等待其他线程终止,在当前线程中调用一个线程 t1.join() 方法,则当前线程转为阻塞状态,等到 t1 线程结束,当前线程由阻塞状态变为就绪状态,等待 CPU 的宠幸。
很多情况下,主线程生成并启动了子线程,需要用到子线程返回的结果,也就是主线程需要在子线程活动结束后再结束,这时候就要用到 join() 方法。
- System.out.println(Thread.currentThread().getName()+"线程开始运行");
- Thread1 thread = new Thread1();
- thread.setName("线程 B");
- thread.start();
- thread.join();
- System.out.println("这是 thread 执行完毕之后才能执行主线程");
Object 类中的 notify() 方法,唤醒在此对象监视器上等待的单个线程,如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意的。线程通过调用其中的一个对象的 wait() 方法,在对象的监视器上等待,直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程,被唤醒的线程将以常规的方式与在该对象上主动同步的其他所有线程进行竞争。类似的方法还有 notifyAll(),唤醒在此监视器上等待的所有线程。
- wai、notify、notifyAll 方法是 Object 类的本地 final 方法,无法被重写。
- wait 使当前线程阻塞,前提是 必须先获得锁,一般配合 synchronized 关键字使用,即一般在 synchronized 同步代码块里使用 wait、notify、notifyAll 方法。
- 由于 wait、notify、notifyAll 在 synchronized 代码块执行,说明当前线程一定获取了锁的。
当线程执行 wait 方法时候,会释放当前的锁,然后让出 CPU,进入等待状态。
只有当 notify|notifyAll 被执行时候, 才会唤醒 一个|多个 正处于等待状态的线程,然后继续往下执行,直到执行完 synchronized 代码块的代码 或是 中途遇到 wait,再次释放锁。
也就是说,notify|notifyAll 的执行只是唤醒沉睡的线程,而不会立即释放锁,锁的释放要看代码块的具体执行情况。所以在编译中,尽量在使用了 notify|notifyAll 后立即退出临界区,以唤醒其他线程让其获得锁
- wait 需要被 try catch 保卫,以便在发送异常中断也可以使 wait 等待的线程唤醒。
- notify 和 wait 的顺序不能错,如果 A 线程先执行 notify 方法,B 线程再执行 wait方法,那么 B 线程是无法被唤醒的。
- notify 唤醒沉睡的线程后,线程会接着上次的执行继续往下执行。所以在进行条件判断时可以先把 wait 语句忽略不计来进行考虑;显然要确保程序一定要执行,并且要保证程序直到 满足一定的条件 再执行,要使用 while 进行等待,直到满足条件才继续往下执行。如下代码:
- public class K {
- //状态锁
- private Object lock;
- //条件变量
- private int now,need;
- public void produce(int num){
- //同步
- synchronized (lock){
- //当前有的不满足需要,进行等待,直到满足条件
- while(now < need){
- try {
- //等待阻塞
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("我被唤醒了!");
- }
- // 做其他的事情
- }
- }
- }
巧妙地利用了时间片轮转的方式,CPU给每个任务都服务一定的时间,然后把当前任务的状态保存下来,再加载下一个任务的状态后,继续服务下一个任务,任务的状态保存及再加载,这段过程叫做上下文切换。时间片轮转的方式使多个任务在同一颗 CPU 上执行变成了可能。
进程(有时候也称作任务)是指一个程序运行的实例。在 Linux 系统中,线程就是能并行运行且与他们的父进程(创建它们的进程)共享共享同一地址空间(一段内存区域)和其他资源的轻量级进程。
是指某一时间点的寄存器和程序计数器的内容。
是 CPU 内部的数量较少但速度很快的内存(与之对应的是CPU外部相对较慢的 RAM 主内存)。寄存器通过对常用值(通常是运算的中间值)的快速访问来提供计算机程序运行的速度。
是一个专用的寄存器,用于表明指令序列中 CPU 正在执行的位置,存的值为正在执行的指令的位置或者下一个将要被执行的指令的位置,具体依赖于特定的系统,
上下文切换可以认为是内核(操作系统的核心)在 CPU 上对于进程(包括线程)进行切换,上下文切换过程中的信息是保存在进程控制块(PCB, process control block)中的。PCB 还进程被称为“切换桢”(switchframe)。信息会一直保存到 CPU 的内存中,直到他们被再次使用。
当多个线程同时访问一个数据时,很容易出现问题。为了避免这种情况出现,我们要保证线程同步互斥,就是指并发执行的多个线程,在同一时间内只允许一个线程访问共享数据。Java 中可以使用 synchronized 关键字来取得同一个对象的同步锁。
就是指多个线程被阻塞,他们中的一个或者全部都在等待某个资源的释放。
线程池的主要工作是控制运行的线程数量,处理过程中将任务放入队列,然后在创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。
每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java虚拟机会调用该类的 run 方法。那么该类的 run 方法中就是调用了 Runnable 对象的 run() 方法。我们可以继承重写 Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的。
一般的线程池主要分为以下4个组成部分:
Java 中的线程是通过 Executor 框架实现的,该框架中用到了 Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask 这几个类。
ThreadPoolExecutor 的构造方法如下:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK 内置的拒绝策略如下:
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。
阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
插入操作
1)public boolean add(E e):将指定元素插入此队列中,成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。如果元素是 NULL,则会抛出 NullPointerException 异常。
2)public boolean offer(E e):将指定元素插入此队列中,成功时返回 true,如果当前没有可用空间,则返回 false。
3)public void put(E e) throw InterruptedException:将指定元素插入此队列中,若当前没有可用空间则阻塞等待。源码如下:
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length)
- notFull.await();
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
4)public boolean offer(E e, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 e,则返回失败。
获取数据操作
1)poll(time):取走 BlockingQueue 里排在首位的对象,取不到时返回null。
2)poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回 null 。
3)take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻塞进入等待状态直到 BlockingQueue 有新的数据被加入。
4)drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或锁释放。
检查操作
1)peek():直接返回 LinkedBlockingQueue 队列的头元素但不删除任何元素,若队列中无元素则返回 null 。
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
基于链表的有界阻塞队列,同 ArrayBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可并行的操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue 会默认一个类似无限大小的容量 (Integer.MAX_VALUE)。
是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现 compareTo() 方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
是一个不存储元素的阻塞队列。每个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
SynchronousQueue 可以看出是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue 。
是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的事件再返回,如果超时还没有消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 等方法,以 First 单次结尾的方法,表示插入、获取(peek)或者移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入、获取或移除队列的最后一个元素。另外,插入方法 add 等同于 addLast,移除方法 remove 等同于removeLast。但是 take 方法却等同于 takeFirst,不知道是不是 JDK 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。
在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
CountDownLatch 类位于 java.util.concurrent 包下,利用它可以实现类似计数器的功能。比如一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时可以利用 CountDownLatch 来实现这种功能了。
- public static void main(String[] args) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(2);
- new Thread(){
- @SneakyThrows
- @Override
- public void run() {
- System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
- Thread.sleep(3000);
- System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
- latch.countDown();
- }
- }.start();
- new Thread(){
- @SneakyThrows
- @Override
- public void run() {
- System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
- Thread.sleep(3000);
- System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
- latch.countDown();
- }
- }.start();
- System.out.println("等待两个子线程支持完毕...");
- latch.await();
- System.out.println("两个子线程支持完毕");
- System.out.println("继续执行主线程");
- }
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有的线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做 barrier,当调用 await() 方法之后,线程就处于 barrier 了。
CyclicBarrier 中最重要的方法就是 await() 方法,它由两个重载版本:
具体使用如下,另外 CyclicBarrier 是可以重用的。
- public static void main(String[] args) {
- int N = 4;
- CyclicBarrier barrier = new CyclicBarrier(N);
- for (int i=0;i<N;i++) {
- new Writer(barrier).start();
- }
- }
-
- static class Writer extends Thread {
- private CyclicBarrier cyclicBarrier;
- public Writer(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
-
- @Override
- public void run() {
- try {
- System.out.println("线程"+Thread.currentThread().getName()+"开始写入数据");
- Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
- System.out.println("线程"+Thread.currentThread().getName()+"写入数据完成,等待其他线程写入完毕");
- cyclicBarrier.await();
- System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
Semaphore 翻译成字母意思为 信号量,Semaphore 可以控制同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
Semaphore 类中比较重要的几个方法:
上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
例子:若一个工程有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过 Semaphore 来实现。
- public static void main(String[] args) {
- int N = 8; // 工人数
- Semaphore semaphore = new Semaphore(5);// 机器数
- for (int i=0; i<N;i++){
- new Worker(i, semaphore).start();
- }
- }
-
- static class Worker extends Thread {
- private int num;
- private Semaphore semaphore;
-
- public Worker(int num, Semaphore semaphore) {
- this.num = num;
- this.semaphore = semaphore;
- }
-
- @Override
- public void run() {
- try {
- semaphore.acquire();
- System.out.println("工人"+num+"占用一个机器在生产...");
- Thread.sleep(2000);
- System.out.println("工人"+num+"使用完机器");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保变量的更新操作通知到其他线程。volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。
变量可见性
其一是保证该变量对所有线程可见,这里的可见性指的是当一个线程修改了变量的值,那么新的值对于其他线程是可以立即获取的。
禁止重排序
volatile 禁止了指令重排。
比 synchronized 更轻量级的同步锁
在访问 volatile 变量时不会执行加锁操作,因此也就不会执行线程阻塞,因此 volatile 是一种比 synchronized 关键字更轻量级的同步机制。volatile 适合这种场景:一个变量被多个线程共享,线程直接给这个变量赋值。
当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到 CPU 缓存中。如果计算机有多个 CPU ,每个线程可能在不同的 CPU 上被处理,这意味着每个线程可以拷贝到不同的 CPU cache 中。而声明变量是 volatile 的,JVM 保证了每次读变量都从内存中读,跳过 CPU cache 这一步。
使用场景
值得说明的是对 volatile 变量的单次读/写操作可以保证原子性的,如 long 和 double 类型变量,但是并不能保证 i++ 这种操作的原子性,因为本质上 i++ 是读、写两次操作。在某些场景下可以代替 synchronized。但是 volatile 不能完全取代 synchronized 的位置,只有在一些特殊的场景下,才能使用 volatile 。总的来说,必须同时满足下面两个条件才能保证在并发环境的线程安全:
JAVA 里面进行多线程通讯的主要方式就是解决共享内存的方式,共享内存主要的关注点有两个:可见性和原子性。Java 内存模型解决了可见性和有序性的问题,而锁决绝了原子性的问题,理想情况下,我们希望做到“同步”和“互斥”。有以下常规实现方式:
这么设计可以很容易做到同步,只要在方法上加“synchronized”。
- public class MyData {
- private int j=0;
- public synchronized void add() {
- j++;
- System.out.println("线程"+Thread.currentThread().getName() + "j为:"+j);
- }
- public synchronized void dec() {
- j--;
- System.out.println("线程"+Thread.currentThread().getName() + "j为:"+j);
- }
- public int getData() {
- return j;
- }
- }
-
- public class AddRunnable implements Runnable {
- MyData myData;
-
- public AddRunnable(MyData myData) {
- this.myData = myData;
- }
-
- @Override
- public void run() {
- myData.add();
- }
- }
-
- public class DecRunnable implements Runnable {
- MyData myData;
-
- public DecRunnable(MyData myData) {
- this.myData = myData;
- }
-
- @Override
- public void run() {
- myData.dec();
- }
- }
-
- public static void main(String[] args) {
- MyData myData = new MyData();
- Runnable add = new AddRunnable(myData);
- Runnable dec = new DecRunnable(myData);
- for (int i=0;i<2;i++){
- new Thread(add).start();
- new Thread(dec).start();
- }
- }
将 Runnable 对象作为一个类的内部类,共享数据作为这个类的成员变量,每个线程对共享数据的操作方法也封装在外部类,以便实现对数据的各个操作的同步和互斥,作为内部类的各个 Runnable 对象调用外部类的这些方法。
- public class MyData {
- private int j=0;
- public synchronized void add() {
- j++;
- System.out.println("线程"+Thread.currentThread().getName() + "j为:"+j);
- }
- public synchronized void dec() {
- j--;
- System.out.println("线程"+Thread.currentThread().getName() + "j为:"+j);
- }
- public int getData() {
- return j;
- }
- }
-
- public class ThreadTest {
- public static void main(String[] args) {
- final MyData myData = new MyData();
- for (int i=0;i<2;i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- myData.add();
- }
- }).start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- myData.dec();
- }
- }).start();
- }
- }
- }
ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储,ThreadLocal 的作用是提供线程内的局部变量,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。
ThreadLocalMap(线程的一个属性)
- /* ThreadLocal values pertaining to this thread. This map is maintained
- * by the ThreadLocal class. */
- ThreadLocal.ThreadLocalMap threadLocals = null
使用场景
最常见的 ThreadLocal 使用场景为用来解决数据库连接、Session 管理等。
- private static final ThreadLocal threadSession = new ThreadLocal();
-
- public static Session getSession() throws InfrastructureProxy {
- Session s = (Session) threadSession.get();
- if (s == null) {
- s = getSessionFactory().openSession();
- threadSession.set(s);
- } catch (HibernateException ex) {
- throw new InfrastructureProxy(ex);
- }
- return s;
- }
共同点:
不同点:
减小锁粒度是指缩小锁定对象的范围,从而减小冲突的可能性,从而提高系统的并发能力。减小锁粒度是一种削弱多线程锁竞争的有效手段,这种技术典型的应用方法是 ConcurrentHashMap(高性能 HashMap )类的实现。对于 HashMap 而言,最重要的两个方法是 get 与 set,如果我们对整个 HashMap 加锁,可以得到线程安全的对象,但是加锁粒度太大了。Segment 的大小也被称为 ConcurrentHashMap 的并发度。
ConcurrentHashMap,它内部细分了若干个小的 HashMap,称之为(Segment)。默认情况下 一个 ConcurrentHashMap 被进一步细分为 16 个段,既就是锁的并发度。
如果需要在 ConcurrentHashMap 中添加一个新的表项,并不是将整个 HashMap 加锁,而是首先根据 hasCode 得到该表项存放在哪个段中,然后对该段加锁,并完成 put 操作。在多线程环境中,如果多个线程同时进行 put 操作,只要被加入的表项不存放在同一个段中,则线程可以做到真正的并行。
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用户存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和联表结构,一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对于的 Segment 锁。
抢占式调度指的是每条线程执行的时间、线程的切换都由系统控制,系统控制指的是在系统某种运行机制下,可能每条线程都分同样的执行时间片,也可能是某些线程执行的时间片较长,甚至某些线程得不到执行的时间片。在这种机制下,一个线程的堵塞不会导致整个进程的堵塞。
协同式调度指某一线程执行完后主动通知系统切换到另一线程上执行,这种模式就像接力赛一样,一个人跑完自己的路径就把接力棒交给下一个人,下个人继续往下跑。线程的执行时间由线程本身控制,线程切换可以预知,不存在多线程同步问题,但它由一个致命的弱点:如果一个线程编写有问题,运行到一半就一直堵塞,那么可能导致整个系统崩溃。
java 使用的线程调度是抢占式调度,Java 中的线程会按优先级分配 CPU 时间片运行,且优先级越高越先执行,单优先级高并不代表能独自占有执行时间片,可能是优先级高得到越多的执行时间,反之,优先级低的分到的执行时间少但不会分配不到执行时间。
先来先服务调度算法(FCFS)
当在作业调度中采用该算法时,每次调度都是从后备作业队列中选择一个或多个最先进入该队列的作业,将它们调入内存,为它们分配资源、创建进程,然后放入就绪队列。在进程调度中采用 FCFS 算法时,则每次调度是从就绪队列中选择一个最先进入该队列的进程,为之分配处理器,使之投入运行。该进程一直运行到完成或发生某事件而阻塞后才放弃处理器,特点是:算法比较简单,可以实现基本上的公平。
短作业(进程)优先调度算法
短作业优先(SJF)的调度算法是从后备队列中选择一个或若干个估计运行时间最短的作业,将它们调入内存运行。而短进程优先(SPF)调度算法则是从就绪队列中选出一个估计运行时间最短的进程,将处理器分配给它,使它立即执行到完成,或发生某事件而阻塞放弃处理器时再重新调度。该算法未照顾作业紧迫型作业。
为了照顾紧迫型作业,使之在进入系统后便获得优先处理,引入了最高优先权优先(FPF)调度算法。当把该算法用于作业调度时,系统将从后备队列中选择若干个优先权最高的作业装入内存。当用于进程调度时,该算法是把处理器分配给就绪队列中优先级最高的进程。
非抢占式优先权调度算法
在这种方式下,系统一旦把处理器分配给就绪队列中优先权最高的进程后,该进程便一直执行下去,直至完成;或因发生某事件使该进程放弃处理器时。这种调度算法主要用于批处理系统中;当用于进程调度时,该算法是把处理机分配给就绪队列中优先权最高的进程。
抢占式优先权调度算法
在这种方式下,系统同样是把处理器分配给优先权最高的进程,使之执行。但在其执行期间,只要出现另一个优先权更高的进程,进程调度程序就立即停止当前进程(原优先权最高的进程)的执行,重新将处理机分配给新到的优先权更高的进程。显然,这种抢占式的优先权调度算法能更好地满足紧迫作业的要求,故而常用于要求比较严格的实时系统中,以及对性能要求较高的批处理和分时系统中。
高响应比优先调度算法
在批处理系统中,短作业优先算法是一种比较好的算法,其主要的不足之处是长作业的运行得不到保证。如果我们能为每个作业引入前面所述的动态优先权,并使作业的优先级随着等待时间的增加而以速率 a 提高,则长作业在等待一定的时间后,必然有机会分配到处理器。该优先权的变化规律可描述为:
时间片轮转法
在早期的时间片轮转法中,系统将所有的就绪进程按先来先服务的原则排成一个队列,每次调度时,把 CPU 分配给队首进程,并令其执行一个时间片。时间片的大小从几 ms 到 几百 ms 。当执行的时间片用完时,由一个计时器发出时钟中断请求,调度程序根据此信号来停止该进程的执行,并将它送往就绪队列的末尾;然后再把处理器分配给就绪队列中的队首进程,同时也让它执行一个时间片。这样就可以保证就绪队列中的所有进程在一给定的时间内均能获得一时间片的处理器执行时间。
多级反馈队列调度算法
在多级反馈队列调度算法中,如果规定第一个队列的时间片略大于多数人机交互所需之处理时间时,便能够很好的满足各种类型用户的需要。
CAS(Compare And Swap/Set)比较并交换,CAS 算法的过程是这样:它包含 3 个参数 CAS(V,E,N)。V 表示要更新的变量(内存值),E 表示预期值(旧的),N 表示新值。当且仅当 V 值等于 E 值时,才会将 V 的值设置为 N,如果 V 值 和 E 值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后 CAS 返回当前 V 的真实值。
CAS 操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用 CAS 操作一个变量时,只会有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且运行再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS 即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
JDK1.5 的原子包: java.util.concurrent.atomic 这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择另一个线程进入,这只是一种逻辑上的理解。
相对于 synchronized 这种阻塞算法, CAS 是非阻塞算法的一种常见实现。由于一般 CPU 切换时间比 CPU 指令集操作更加长,所以 J.U.C 在性能上有了很大的提升。
AtomicInteger 的 getAndIncrement 采用了 CAS 操作,每次从内存中读取数据然后将此数据和 +1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。而 compareAndSet 利用 JNI 来完成 CPU 指令操作。
CAS 会导致 “ABA 问题”。CAS 算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差内会导致数据的变化。
比如一个线程 one 从内存位置 V 取出 A,这时另一个线程 two 也从内存取出 A,并且 two 进行了一些操作变成了 B ,然后 two 又将 V 位置的数据变成 A ,这时候线程 one 进行 CAS 操作发现内存中仍然是 A ,然后 one 操作成功。尽管 one 的 CAS 操作成功,但是不代表这个过程就是没有问题的。
部分乐观锁的实现是通过版本号(version) 的方式来解决 ABA 的问题,乐观锁每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行 +1 操作,否则就执行失败。因为每次操作的版本号都会随之增加,所以不会出现 ABA 问题,因为版本号只会增加不会减少。
AbstractQueueSynchronizer 类如其名,抽象的队列式的同步器, AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock/Semaphore/CountDownLatch。
它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里 volatile 是核心关键词,具体 volatile 的语义,在此不述。state 的访问方式有三种:getState()、setState()、compareAndSetState()。
AQS 定义两种资源共享方式:
AQS 只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过 state 的 get/set/CAS),之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。如果定义成 abstract ,那么每个模式也要去实现另一种模式下的接口。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在底层实现好了。自定义同步器实现时主要实现以下几种方法:
同步器的实现是 ABS 的核心(state 资源状态技术)
同步器的实现是 ABS 核心,以 ReentrantLock 为例, state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1 。此后,其他线程再 tryAcquire() 时就会失败,直到 A线程 unlock() 到 state=0 为止,其他线程才有机会获得该锁。当然,释放锁之前, A 线程自己是可以重复获取此锁的 (state 会累加),这就是可重入的概念。单要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到 0 。
以 CountDownLatch 为例,任务分为 N 个子线程去执行,state 也初始化为 N (注意 N 要与线程个数保持一致)。这 N 个子线程是并行执行的,每个子线程执行完成后 countDown() 一次,state 就会 CAS 减 1。等到所有的子线程都执行完成后(即 state=0),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。
ReentrantReadWriteLock 实现独占和共享两种方式
一般来说,自定义同步器要么是独占方法,要么是共享方法,我们也需要实现 tryAcquire-tryRelease、 tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同步实现独占和共享两种方式,如 ReentrantReadWriteLock 。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。