赞
踩
在 Java 中,线程部分是一个重点,本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5 开始出现的。
进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务。
总结来说:
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程— —资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。
Thread.State
public static enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:
NEW: 初始状态,线程被创建出来但没有被调用 start()
。
RUNNABLE: 运行状态,线程被调用了 start()
等待运行的状态。
BLOCKED :阻塞状态,需要等待锁释放。
WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
TERMINATED:终止状态,表示该线程已经运行完毕。
由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start()
方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。
在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态,所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
为什么 JVM 没有区分这两种状态呢? 现在的时分(time-sharing)多任务(multi-task)操作系统架构通常都是用所谓的“时间分片(time quantum or time slice)”方式进行抢占式(preemptive)轮转调度(round-robin 式)。这个时间分片通常是很小的,一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间(此时处于 running 状态),也即大概只有 0.01 秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度。(也即回到 ready 状态)。线程切换的如此之快,区分这两种状态就没什么意义了。
当线程执行 wait()
方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。
TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)
方法或 wait(long millis)
方法可以将线程置于 TIMED_WAITING 状态。当超时时间结束后,线程将会返回到 RUNNABLE 状态。
当线程进入 synchronized
方法/块或者调用 wait
后(被 notify
)重新进入 synchronized
方法/块,但是锁被其它线程占有,这个时候线程就会进入 BLOCKED(阻塞) 状态。
线程在执行完了 run()
方法之后将会进入到 TERMINATED(终止) 状态。
sleep()
方法没有释放锁,而 wait()
方法释放了锁 。wait()
通常被用于线程间交互/通信,sleep()
通常被用于暂停执行。wait()
方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify()
或者 notifyAll()
方法。sleep()
方法执行完成后,线程会自动苏醒,或者也可以使用 wait(long timeout)
超时后线程会自动苏醒。sleep()
是 Thread
类的静态本地方法,wait()
则是 Object
类的本地方法。为什么这样设计呢?wait()
是让获得对象锁的线程实现等待,会自动释放当前线程占有的对象锁。每个对象(Object
)都拥有对象锁,既然要释放当前线程占有的对象锁并让其进入 WAITING 状态,自然是要操作对应的对象(Object
)而非当前的线程(Thread
)。
类似的问题:为什么 sleep()
方法定义在 Thread
中?
因为 sleep()
是让当前线程暂停执行,不涉及到对象类,也不需要获得对象锁。
串行表示所有任务都一一按先后顺序进行。串行意味着必须先装完一车柴才能 运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步 骤,才能进行下一个步骤。
串行是一次只能取得一个任务,并执行这个任务。
并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模 式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上 则依赖于多核 CPU。
并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可 以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不 会去扣这种字眼是否精确,并发的重点在于它是一种现象, 并发描述的是多进程同时运行的现象。但实际上,对于单核心 CPU 来说,同一时刻只能运行一个线程。所以,这里的"同时运行"表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同 时运行起来了,但实际上这些程序中的进程不是一直霸占 CPU 的,而是执行一会停一会。
要解决大并发问题,通常是将大任务分解成多个小任务, 由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:
可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果
可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了事件通知才执行某个任务。
可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这 些小任务,这时很可能要配合多路复用才能达到较高的效率
最关键的点是:是否是 同时/同一时刻 执行。
管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)。但是这样并不能保证进程以设计的顺序执行。
JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程 (monitor)对象,管程(monitor)会随着 java 对象一同创建和销毁
执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程。
用户线程:平时用到的普通线程,自定义线程
守护线程:运行在后台,是一种特殊的线程,比如垃圾回收
通过thread.setDaemon(true)设置为守护线程,默认用户线程。调用 `start`之后 setDaemon 无效
当主线程结束后,用户线程还在运行,JVM 存活
如果没有用户线程,都是守护线程,JVM 结束。守护线程不会保活进程
synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:
修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象;
修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;
虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。
修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;
修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。
Synchronized 只锁对象,对象头中保存着当前锁信息,线程ID等。所有 Synchronized 只能锁对象,或者类(Class也是对象),不能锁方法.
Java中的每一个对象都可以作为锁
对于普通同步方法,锁的是当前实例对象。
对于静态同步方法,锁的是当前类的class对象。
对于同步方法块,锁是Synchronized 括号里配置的对象
class Ticket {
//票数
private int number = 30;
//操作方法:卖票
public synchronized void sale() {
//判断:是否有票 if(number > 0) {
System.out.println(Thread.currentThread().getName()+" :
"+(number--)+" "+number);
}
}
}
如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
1) 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
2) 线程执行发生异常,此时 JVM 会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。
因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过 Lock 就可以办到。
Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock 提供了比 synchronized 更多的功能。
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
下面来逐个讲述 Lock 接口中每个方法的使用
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在 try{}catch{}
块中进行,并且将释放锁的操作放在 finally
块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用 Lock 来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类 也可以实现等待/通知模式。 用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:
await()
会使当前线程等待,同时会释放锁,当其他线程调用signal()
时,线程会重新获得锁并继续执行。signal()
用于唤醒一个等待的线程。注意:在调用 Condition
的 await()/signal()
方法前,也需要线程持有相关 的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前 Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。
ReentrantLock,意思是“可重入锁”,关于可重入锁的概念将在后面讲述。 ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用。
public class Test { private ArrayList<Integer> arrayList = new ArrayList<>(); public static void main(String[] args) { final Test test = new Test(); new Thread() { public void run() { test.insert(Thread.currentThread()); } }.start(); new Thread() { public void run() { test.insert(Thread.currentThread()); } }.start(); } public void insert(Thread thread) { Lock lock = new ReentrantLock(); //注意这个地方 lock.lock(); try { System.out.println(thread.getName() + "得到了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock(); } } }
ReadWriteLock 也是一个接口,在它里面只定义了两个方法:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的 ReentrantReadWriteLock 实现了 ReadWriteLock 接口。
下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。
假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:
public class Test { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final Test test = new Test(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); } public synchronized void get(Thread thread) { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在进行读操作"); } System.out.println(thread.getName() + "读操作完毕"); } }
而改成用读写锁的话:
public class Test { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final Test test = new Test(); new Thread() { public void run() { test.get(Thread.currentThread()); } ; }.start(); new Thread() { public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { rwl.readLock().lock(); try { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在进行读操作"); } System.out.println(thread.getName() + "读操作完毕"); } finally { rwl.readLock().unlock(); } } }
说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。
注意:
如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则 申请的线程会一直等待释放写锁。
Lock 和 synchronized 有以下几点不同:
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源 非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于 synchronized。
线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析
场景一
两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间通信
public class TestVolatile { /** * 交替加减 * @param args */ public static void main(String[] args){ DemoClass demoClass = new DemoClass(); new Thread(() ->{ for (int i = 0; i < 5; i++) {demoClass.increment(); } }, "线程 A").start(); new Thread(() ->{ for (int i = 0; i < 5; i++) { demoClass.decrement(); } }, "线程 B").start(); } } class DemoClass{ //加减对象 private int number = 0; /** * 加 1 */ public synchronized void increment() { try { while (number != 0){ this.wait(); } number++; System.out.println("--------" + Thread.currentThread().getName() + "加一成功----------,值为:" + number); notifyAll(); }catch (Exception e){ e.printStackTrace(); } } /** * 减一 */ public synchronized void decrement(){ try { while (number == 0){ this.wait(); } number--; System.out.println("--------" + Thread.currentThread().getName() + "减一成功----------,值为:" + number); notifyAll(); }catch (Exception e){ e.printStackTrace(); } } }
class DemoClass{ //加减对象 private int number = 0; //声明锁 private Lock lock = new ReentrantLock(); //声明钥匙 private Condition condition = lock.newCondition(); /** * 加 1 */ public void increment() { try { lock.lock(); while (number != 0){ condition.await(); } number++; System.out.println("--------" + Thread.currentThread().getName() + "加一成功----------,值为:" + number); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } /** * 减一 */ public void decrement(){ try { lock.lock(); while (number == 0){ condition.await(); } number--; System.out.println("--------" + Thread.currentThread().getName() + "减一成功----------,值为:" + number); condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
问题: A 线程打印 5 次 A,B 线程打印 10 次 B,C 线程打印 15 次 C,按照 此顺序循环 10 轮
class DemoClass { //通信对象:0--打印 A 1---打印 B 2----打印 C private int number = 0; //声明锁 private Lock lock = new ReentrantLock(); //声明钥匙 A private Condition conditionA = lock.newCondition(); //声明钥匙 B private Condition conditionB = lock.newCondition(); //声明钥匙 C private Condition conditionC = lock.newCondition(); /** * A 打印 5 次 */ public void printA(int j) { try { lock.lock(); while (number != 0) { conditionA.await(); } System.out.println(Thread.currentThread().getName() + "输出 A,第" + j + "轮开始"); //输出 5 次 A for (int i = 0; i < 5; i++) { System.out.println("A"); } //开始打印 B number = 1; //唤醒 B conditionB.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * B 打印 10 次 */ public void printB(int j) { try { lock.lock(); while (number != 1) { conditionB.await(); } System.out.println(Thread.currentThread().getName() + "输出 B,第" + j + "轮开始"); //输出 10 次 B for (int i = 0; i < 10; i++) { System.out.println("B"); } //开始打印 C number = 2; //唤醒 C conditionC.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * C 打印 15 次 */ public void printC(int j) { try { lock.lock(); while (number != 2) { conditionC.await(); } System.out.println(Thread.currentThread().getName() + "输出 C,第" + j + "轮开始"); //输出 15 次 C for (int i = 0; i < 15; i++) { System.out.println("C"); } System.out.println("-----------------------------------------"); //开始打印 A number = 0; //唤醒 A conditionA.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public class NotSafeDemo { /** * 多个线程同时对集合进行修改 * * @param args */ public static void main(String[] args) { List list = new ArrayList(); for (int i = 0; i < 100; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString()); System.out.println(list); }, "线程" + i).start(); } } }
异常内容
java.util.ConcurrentModificationException
问题: 为什么会出现并发修改异常?
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
那么我们如何去解决 List 类型的线程安全问题?
Vector 是矢量队列,它是 JDK1.0 版本添加的类。继承于 AbstractList,实现 了 List, RandomAccess, Cloneable 这些接口。 Vector 继承了 AbstractList, 实现了 List;所以,它是一个队列,支持相关的添加、删除、修改、遍历等功能。 Vector 实现了 RandmoAccess 接口,即提供了随机访问功能。 RandmoAccess 是 java 中用来被 List 实现,为 List 提供快速访问功能的。在 Vector 中,我们即可以通过元素的序号快速获取元素对象;这就是快速随机访 问。 Vector 实现了 Cloneable 接口,即实现 clone()函数。它能被克隆。
和 ArrayList 不同,Vector 中的操作是线程安全的。
NotSafeDemo 代码修改
public class NotSafeDemo { /** * 多个线程同时对集合进行修改 * * @param args */ public static void main(String[] args) { List list = new Vector(); for (int i = 0; i < 100; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString()); System.out.println(list); }, "线程" + i).start(); } } }
现在没有运行出现并发异常,为什么?
查看 Vector 的 add 方法
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
add 方法被 synchronized 同步修辞,线程安全!因此没有并发异常
Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的
public class NotSafeDemo {
/**
* 多个线程同时对集合进行修改
* @param args
*/
public static void main(String[] args) {
List list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}
}
}
没有并发修改异常
查看方法源码
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
首先我们对 CopyOnWriteArrayList 进行学习,其特点如下:
它相当于线程安全的 ArrayList。和 ArrayList 一样,它是个可变数组;但是和 ArrayList 不同的时,它具有以下特性:
它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
它是线程安全的。
因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代 器时,迭代器依赖于不变的数组快照。
独占锁效率低:采用读写分离思想解决
写线程获取到锁,其他写线程阻塞
复制思想:
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来得及写会内存,其他的线程就会读到了脏数据。
这就是 CopyOnWriteArrayList 的思想和原理。就是拷贝一份。
NotSafeDemo 代码修改
public class NotSafeDemo {
/**
* 多个线程同时对集合进行修改
* @param args
*/
public static void main(String[] args) {
List list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}
}
}
没有线程安全问题
原因分析(重点):动态数组与线程安全
下面从“动态数组”和“线程安全”两个方面进一步对 CopyOnWriteArrayList 的原理进行说明。
集合类型中存在线程安全与线程不安全的两种,常见例如:
ArrayList ----- Vector
HashMap -----HashTable
但是以上都是通过 synchronized 关键字实现,效率较低
CopyOnWriteArrayList CopyOnWriteArraySet 类型,通过动态数组与线程安 全个方面保证线程安全
ConcurrentHashMap
: 线程安全的 HashMap
CopyOnWriteArrayList
: 线程安全的 List
,在读多写少的场合性能非常好,远远好于 Vector
。
ConcurrentLinkedQueue
: 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList
,这是一个非阻塞队列。
BlockingQueue
: 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
ConcurrentSkipListMap
: 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
Synchronized 默认 非公平锁,不可修改。ReentrantLock 可根据入参修改,
new
ReentrantLock(true);
true 公平锁,false / 不传 非公平锁
可重入锁: 就是一个线程不用释放,可以重复的获取同一个锁n次,只是在释放的时候,也需要相应的释放n次。
Synchronized 和 Lock 多事可重入锁
所谓死锁,是指多个进程在运行过程中因争夺同一资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进。 因此我们举个例子来描述,如果此时有一个线程A,按照先锁a再获得锁b的的顺序获得锁,而在此同时又有另外一个线程B,按照先锁b再锁a的顺序获得锁。如下代码所示:
public class DeadLock { //创建两个对象 static Object a = new Object(); static Object b = new Object(); public static void main(String[] args) { new Thread(()->{ synchronized (a) { System.out.println(Thread.currentThread().getName()+" 持有锁a,试图获取锁b"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b) { System.out.println(Thread.currentThread().getName()+" 获取锁b"); } } },"A").start(); new Thread(()->{ synchronized (b) { System.out.println(Thread.currentThread().getName()+" 持有锁b,试图获取锁a"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { System.out.println(Thread.currentThread().getName()+" 获取锁a"); } } },"B").start(); } }
产生死锁的必要条件:
预防死锁:
目前我们学习了有两种创建线程的方法-一种是通过创建 Thread 类,另一种是 通过使用 Runnable 创建线程。但是,Runnable 缺少的一项功能是,当线程终止时(即 run()完成时),我们无法使线程返回结果。为了支持此功能, Java 中提供了 Callable 接口。
Callable 接口的特点如下(重点)
为了实现 Runnable,需要实现不返回任何内容的 run()方法,而对于 Callable,需要实现在完成时返回结果的 call()方法。
call()方法可以引发异常,而 run()则不能。
为实现 Callable 而必须重写 call 方法。
不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable。
//创建新类 MyThread 实现 runnable 接口
class MyThread implements Runnable{
@Override
public void run() {
}
}
//新类 MyThread2 实现 callable 接口
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 200;
}
}
当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可 以知道该线程返回的结果。为此,可以使用 Future 对象。
将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦 Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的 一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下:
如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true 时才会中断任务。
如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。
可以看到 Callable 和 Future 做两件事 Callable 与 Runnable 类似,因为它封装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结果。实际上,future 也可以与 Runnable 一起使用。 要创建线程,需要 Runnable。为了获得结果,需要 future。
Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建 FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。因此,间接地使用 Callable 创建线程。
核心原理:(重点)
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 Future 对象在后台完成
public class CallableDemo { /** * 实现 runnable 接口 */ static class MyThread1 implements Runnable { /** * run 方法 */ @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "线程进入了 run方法"); } catch (Exception e) { e.printStackTrace(); } } } /** * 实现 callable 接口 */ static class MyThread2 implements Callable { /** * call 方法 * * @return * @throws Exception */ @Override public Long call() throws Exception { try { System.out.println(Thread.currentThread().getName() + "线程进入了 call方法,开始准备睡觉"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "睡醒了"); } catch (Exception e) { e.printStackTrace(); } return System.currentTimeMillis(); } } public static void main(String[] args) throws Exception { //声明 runable Runnable runable = new MyThread1(); //声明 callable Callable callable = new MyThread2(); //future-callable FutureTask<Long> futureTask2 = new FutureTask(callable); //线程二 new Thread(futureTask2, "线程二").start(); for (int i = 0; i < 10; i++) { Long result1 = futureTask2.get(); System.out.println(result1); } //线程一 new Thread(runable, "线程一").start(); } }
JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过 多时 Lock 锁的频繁操作。这三种辅助类为:
下面我们分别进行详细的介绍和学习
CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行 减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法 之后的语句。
场景: 6 个同学陆续离开教室后值班同学才可以关门。
CountDownLatchDemo
public class CountDownLatchDemo { //6个同学陆续离开教室之后,班长锁门 public static void main(String[] args) throws InterruptedException { //创建CountDownLatch对象,设置初始值 CountDownLatch countDownLatch = new CountDownLatch(6); //6个同学陆续离开教室之后 for (int i = 1; i <=6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" 号同学离开了教室"); //计数 -1 countDownLatch.countDown(); },String.valueOf(i)).start(); } //等待 countDownLatch.await(); System.out.println(Thread.currentThread().getName()+" 班长锁门走人了"); } }
CountDownLatch
是共享锁的一种实现,它默认构造 AQS 的 state
值为 count
。当线程使用 countDown()
方法时,其实使用了tryReleaseShared
方法以 CAS 的操作来减少 state
,直至 state
为 0 。当调用 await()
方法的时候,如果 state
不为 0,那就证明任务还没有执行完毕,await()
方法就会一直阻塞,也就是说 await()
方法之后的语句不会被执行。然后,CountDownLatch
会自旋 CAS 判断 state == 0
,如果 state == 0
的话,就会释放所有等待的线程,await()
方法之后的语句得到执行。
CyclicBarrier 看英文单词可以看出大概就是循环阻塞的意思,在使用中 CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一 次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后 的语句。可以将 CyclicBarrier 理解为加 1 操作
场景: 集齐 7 颗龙珠就可以召唤神龙
CyclicBarrierDemo
public class CyclicBarrierDemo { //创建固定值 private static final int NUMBER = 7; public static void main(String[] args) { //创建CyclicBarrier CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,()->{ System.out.println("*****集齐7颗龙珠就可以召唤神龙"); }); //集齐七颗龙珠过程 for (int i = 1; i <=7; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" 星龙被收集到了"); //等待 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
CyclicBarrier
内部通过一个 count
变量作为计数器,count
的初始值为 parties
属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
//每次拦截的线程数
private final int parties;
//计数器
private int count;
下面我们结合源码来简单看看。
1、CyclicBarrier
默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用 await()
方法告诉 CyclicBarrier
我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中,parties
就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
2、当调用 CyclicBarrier
对象调用 await()
方法时,实际上调用的是 dowait(false, 0L)
方法。 await()
方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties
的值时,栅栏才会打开,线程才得以通过执行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait(false, 0L)
方法源码分析如下:
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。 private int count; /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 锁住 lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // cout减1 int index = --count; // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 将 count 重置为 parties 属性的初始化值 // 唤醒之前等待的线程 // 下一波执行开始 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方 法获得许可证,release 方法释放许可
场景: 抢车位, 6 部汽车 3 个停车位
SemaphoreDemo
public class SemaphoreDemo { public static void main(String[] args) { //创建Semaphore,设置许可数量 Semaphore semaphore = new Semaphore(3); //模拟6辆汽车 for (int i = 1; i <=6; i++) { new Thread(()->{ try { //抢占 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+" 抢到了车位"); //设置随机停车时间 TimeUnit.SECONDS.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName()+" ------离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放车位 semaphore.release(); } },String.valueOf(i)).start(); } } }
Semaphore
是共享锁的一种实现,它默认构造 AQS 的 state
值为 permits
,你可以将 permits
的值理解为许可证的数量,只有拿到许可证的线程才能执行。
调用semaphore.acquire()
,线程尝试获取许可证,如果 state >= 0
的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 state
的值 state=state-1
。如果 state<0
的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。
/** * 获取1个许可证 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取许可证,arg为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
调用semaphore.release();
,线程尝试释放许可证,并使用 CAS 操作去修改 state
的值 state=state+1
。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state
的值 state=state-1
,如果 state>=0
则获取令牌成功,否则重新进入阻塞队列,挂起线程。
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}
// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以 应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源, 就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock, 它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称 为排他锁
而读写锁有以下三个重要的特性:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { /** * 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** * 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** * 使用默认(非公平)的排序属性创建一个新的 * ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** * 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** * 返回用于写入操作的锁 */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** * 返回用于读取操作的锁 */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer { } static final class NonfairSync extends Sync { } static final class FairSync extends Sync { } public static class ReadLock implements Lock, java.io.Serializable { } public static class WriteLock implements Lock, java.io.Serializable { } }
可以看到,ReentrantReadWriteLock 实现了 ReadWriteLock 接口, ReadWriteLock 接口定义了获取读锁和写锁的规范,具体需要实现类去实现; 同时其还实现了 Serializable 接口,表示可以进行序列化,在源代码中可以看 到 ReentrantReadWriteLock 实现了自己的序列化逻辑。
场景: 使用 ReentrantReadWriteLock 对一个 hashmap 进行读和写操作
//资源类 class MyCache { //创建map集合 private volatile Map<String,Object> map = new HashMap<>(); //创建读写锁对象 private ReadWriteLock rwLock = new ReentrantReadWriteLock(); //放数据 public void put(String key,Object value) { //添加写锁 rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+" 正在写操作"+key); //暂停一会 TimeUnit.MICROSECONDS.sleep(300); //放数据 map.put(key,value); System.out.println(Thread.currentThread().getName()+" 写完了"+key); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放写锁 rwLock.writeLock().unlock(); } } //取数据 public Object get(String key) { //添加读锁 rwLock.readLock().lock(); Object result = null; try { System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key); //暂停一会 TimeUnit.MICROSECONDS.sleep(300); result = map.get(key); System.out.println(Thread.currentThread().getName()+" 取完了"+key); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放读锁 rwLock.readLock().unlock(); } return result; } } public class ReadWriteLockDemo { public static void main(String[] args) throws InterruptedException { MyCache myCache = new MyCache(); //创建线程放数据 for (int i = 1; i <=5; i++) { final int num = i; new Thread(()->{ myCache.put(num+"",num+""); },String.valueOf(i)).start(); } TimeUnit.MICROSECONDS.sleep(300); //创建线程取数据 for (int i = 1; i <=5; i++) { final int num = i; new Thread(()->{ myCache.get(num+""); },String.valueOf(i)).start(); } } }
public class ReadWriteLockTest { private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock writeLock = readWriteLock.writeLock(); private Lock readLock = readWriteLock.readLock(); private volatile boolean updateFlag; /**锁降级*/ public void test(){ readLock.lock(); if(!updateFlag){ readLock.unlock();//肯定要先开释了读锁再去获取写锁,如果间接获取写锁,以后线程会被阻塞 writeLock.lock();//step1,获取写锁 try { if(!updateFlag){ //批改数据逻辑,略。 updateFlag = true; } readLock.lock();//step2,获取读锁,排挤其它写锁批改数据 }finally { writeLock.unlock();//step3,开释写锁。到这里,锁降级实现 } } try { //如果step2不先获取读锁,在step3开释了写锁后其它线程会对数据进行批改, //会使得上面‘读取数据逻辑’里呈现数据读取不精确的问题 //读取数据逻辑,略。 }finally { readLock.unlock();step4 } } }
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发 现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写 锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全 “传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建 高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭中的所有成员,包括他们各自的功能以及常见使用场景。 阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据 由队列的一端输入,从另外一端输出;
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多 个元素或者完全清空,使队列变得空闲起来并后续新增
常用的队列主要有以下两种:
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细 节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和 “消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准 备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一 发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的 数据处理完毕,反之亦然。
public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { //创建阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //第一组 // System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //System.out.println(blockingQueue.element()); //System.out.println(blockingQueue.add("w")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //第二组 // System.out.println(blockingQueue.offer("a")); // System.out.println(blockingQueue.offer("b")); // System.out.println(blockingQueue.offer("c")); // System.out.println(blockingQueue.offer("www")); // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); //第三组 // blockingQueue.put("a"); // blockingQueue.put("b"); // blockingQueue.put("c"); // //blockingQueue.put("w"); // // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); //第四组 System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS)); }
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
一句话总结: 由数组结构组成的有界阻塞队列。
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回; 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。 而LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列 的并发性能。
ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用 的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个 类足以。
一句话总结: 由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
一句话总结: 使用优先级队列实现的延迟无界阻塞队列。
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。
因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费 数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
一句话总结: 支持优先级排序的无界阻塞队列。
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式 会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。 声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
一句话总结: 不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。 LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
一句话总结: 由链表组成的无界阻塞队列。
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。
对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况。
插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再将该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常。
读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可 以通过设置超时参数
一句话总结: 由链表组成的双向阻塞队列
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代 价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors, ExecutorService,ThreadPoolExecutor 这几个类
/** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 */ public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量 int maximumPoolSize,//线程池的最大线程数 long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列 ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可 RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 核心线程数定义了最小可以同时运行的线程数量。
maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数:
keepAliveTime
:当线程池中的线程数量大于 corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
才会被回收销毁;unit
: keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:饱和策略。关于饱和策略下面单独介绍一下。如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
: 抛出 RejectedExecutionException
来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy
: 调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。
ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求
举个例子: Spring 通过 ThreadPoolTaskExecutor
或者我们直接通过 ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy
。在默认情况下,ThreadPoolExecutor
将抛出 RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor
的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)
作用: 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程.
特点:
/** * 可缓存线程池 * @return */ public static ExecutorService newCachedThreadPool(){ /** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较 短,任务多的场景
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池 中的线程将一直存在。
特征:
/** * 固定长度线程池 * @return */ public static ExecutorService newFixedThreadPool(){ /** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ return new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严 格限制的场景
作用: 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程, 那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行
创建方式:
/** * 单一线程池 * @return */ public static ExecutorService newSingleThreadExecutor(){ /** * corePoolSize 线程池的核心线程数 * maximumPoolSize 能容纳的最大线程数 * keepAliveTime 空闲线程存活时间 * unit 存活的时间单位 * workQueue 存放提交但未执行任务的队列 * threadFactory 创建线程的工厂类:可以省略 * handler 等待队列满后的拒绝策略:可以省略 */ return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池
特征:
(1)线程池中具有指定数量的线程,即便是空线程也将保留
(2)可定时或者延迟执行线程活动
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
场景: 适用于需要多个后台线程执行周期任务的场景
jdk1.8 提供的线程池,底层使用的是 ForkJoinPool实现,创建一个拥有多个 任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务。
创建方式:
public static ExecutorService newWorkStealingPool(int parallelism) {
/**
* parallelism:并行级别,通常默认为 JVM 可用的处理器个数
* factory:用于创建 ForkJoinPool 中使用的线程。
* handler:用于处理工作线程未处理的异常,默认为 null
* asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列
*/
return new ForkJoinPool(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
}
场景: 适用于大耗时,可并行执行的场景
public class ThreadPoolDemo1 { /** * 火车站 3 个售票口, 10 个用户买票 * * @param args */ public static void main(String[] args) { //定时线程次:线程数量为 3---窗口数为 3 ExecutorService threadService = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); try { //10 个人买票 for (int i = 1; i <= 10; i++) { threadService.execute(() -> { try { System.out.println(Thread.currentThread().getName() + "窗口,开始卖票"); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "窗口买票结束"); } catch (Exception e) { e.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //完成后结束 threadService.shutdown(); } } }
在创建了线程池后,线程池中的线程数为零
当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize, 那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
当一个线程完成任务时,它会从队列中取下一个任务来执行
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子 任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
执行任务并合并结果: 分割的子任务分别放到双端队列里,然后几个启动线程 分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里, 启动一个线程从队列里取数据,然后合并这些数据。
在 Java 的 Fork/Join 框架中,使用两个类完成上述操作
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。
递归任务: 继承后可以实现递归(自己调自己)调用任务
Fork 方法的实现原理: 当我们调用 ForkJoinTask 的 fork 方法时,程序会把任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地执行这个任务,然后立即返回结果。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread) t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);//执行
} else if (n >= m) growArray();
}
}
Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回 什么结果,任务状态有 4 种:
已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出 现异常(EXCEPTIONAL)
让我们分析一下 doJoin 方法的实现
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
在 doJoin()方法流程如下:
ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接 捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查 任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。
getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。
场景: 生成一个计算任务,计算 1+2+3…+1000,每 100 个数切分一个子任务
class MyTask extends RecursiveTask<Integer> { //拆分差值不能超过10,计算10以内运算 private static final Integer VALUE = 10; private int begin ;//拆分开始值 private int end;//拆分结束值 private int result ; //返回结果 //创建有参数构造 public MyTask(int begin,int end) { this.begin = begin; this.end = end; } //拆分和合并过程 @Override protected Integer compute() { //判断相加两个数值是否大于10 if((end-begin)<=VALUE) { //相加操作 for (int i = begin; i <=end; i++) { result = result+i; } } else {//进一步拆分 //获取中间值 int middle = (begin+end)/2; //拆分左边 MyTask task01 = new MyTask(begin,middle); //拆分右边 MyTask task02 = new MyTask(middle+1,end); //调用方法拆分 task01.fork(); task02.fork(); //合并结果 result = task01.join()+task02.join(); } return result; } } public class ForkJoinDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建MyTask对象 MyTask myTask = new MyTask(0,100); //创建分支合并池对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask); //获取最终合并之后结果 Integer result = forkJoinTask.get(); System.out.println(result); //关闭池对象 forkJoinPool.shutdown(); } }
CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future 接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。
Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果, 现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能。
(3)不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成 一个链式的 pipline 调用,这在 Future 中是没法实现的。
(4)不支持多个 Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后, 执行某些函数,是没法通过 Future 实现的。
(5)不支持异常处理
Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。