赞
踩
上一篇文章中说到可以让多个线程在临界区上的相互排斥来让线程间同步,从而让多线程程序能安全的运行。线程同步可以避免竞争条件的发生,但有时候,我们也需要线程之间的相互协作。
如果我们需要让不同的线程也能进行通信,我们可以使用条件对象来实现线程间的通信。
我们可以用条件对象让线程在执行到一半时进入等待阻塞状态,其他线程可以通过条件对象来等待阻塞的线程。
当线程进入临界区后却发现只有满足某个条件之后才能往下执行,这时我们可以使用一个条件对象来管理那些已经获得了一个锁却不能做有用的工作的线程。
简单来说,当一个线程获得了锁准备运行程序的时候,但是代码中有一个if语句,如果if语句不满足,原则上程序不应该往下执行,但是线程占着锁不放是不合理的,这时我们可以使用条件对象让这个线程进入阻塞状态并释放锁,让锁资源能及时的被其他线程使用,当if条件满足后,其他线程会使用条件对象通知这个阻塞的线程可以往下执行了,这时这个线程就能顺利的往下执行了。
条件对象从编程的角度上看就是通过调用Lock对象的newCondition()方法而创建的条件对象。
然后我们可以使用这个条件对象的await(),signal(),signalAll()方法来实现线程之间的通信,让线程获得锁的利用率最大化。
接口图如下:
可以看到这个接口主要有三个方法:await, signal, signaAll。
注意:使用锁的条件对象前必须要先获得条件对象的锁。
下面两个任务,一个是提款任务,一个是存款的。
提款任务的顺利执行的前提是余额足够,所以当余额不足时,它会等待存款任务执行直到符合条件。
程序逻辑图如下:
//完整代码链接在文章底部
public static void main(String[] args) {
ExecutorService executor= Executors.newFixedThreadPool(10);//创建一个并行线程池
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
运行结果:
阻塞队列这样类型的一种的队列: 当我们试图向一个满队列中添加元素或者从空队列中删除元素时,会导致线程阻塞的一种队列。
我们可以使用阻塞队列来简化生产者消费者模式的编程,因为阻塞队列本身就是线程安全的并支持阻塞。
BlockingQueue是一个接口,它继承了Queue接口,是Queue的一种增强。
它在Queue的基础上增加了线程安全的同步定义处理。
其接口类图如下:
可以看到,阻塞队列主要的是put,take方法。其中put方法是向队列中安全的添加元素,take从队列中安全的取出元素。
简单来说,阻塞队列就是内部有条件锁的队列。使用它可以简化我们的多线程代码,阻塞队列本身就是线程安全的,我们无需为了保证队列安全性再去进行编程做同步处理。
我们可以看一下BlockQueue接口的定义:
//向队列添加元素,如果队列满了则抛出异常
boolean add(E e);
//向队列添加元素,如果队列满了返回false
boolean offer(E e);
//向队列添加元素,如果队列满了,进入等待阻塞状态,
//直到队列有空余位置或者线程被中断
void put(E e) throws InterruptedException;
//向队列添加元素,如果队列满了,进入等待阻塞状态,
//直到队列有空余位置或者超时或者线程被中断
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//从队列中获取元素,如果队列为空,则进入等待阻塞状态
//直到队列有元素或线程被中断
E take() throws InterruptedException;
从队列中获取元素,如果队列为空,则进入等待阻塞状态
//直到队列有元素或线程被中断或超时
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//预估队列剩余容量
int remainingCapacity();
//从队列中移除指定元素
boolean remove(Object o);
//判断指定的元素是否存在于队列中
public boolean contains(Object o);
//清空队列并向队列添加c中包含的元素
int drainTo(Collection<? super E> c);
//清空队列并向队列添加c中包含的元素,但限制最大数量
int drainTo(Collection<? super E> c, int maxElements);
看图:
我们可以看到在Java中,阻塞队列的实现有三种:
ArrayBlockingQueue 数组阻塞队列
LinkedBlockingQueue 链表阻塞队列
PriorityBlockingQueue 优先阻塞队列
这个队列是基于数组实现的阻塞队列。
阻塞队列中,主要用到的存取方法有:
//存
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//取
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
通过查阅源码可以知道,其中线程安全是依赖于ReentrantLock以及条件对象实现的。
final ReentrantLock lock;
下面我们基于阻塞队列实现生产者消费者,有兴趣的同学可以对比一下上一节的基于锁的实现:
//完整代码链接在文章底部
public class BlockingQueueStudy {
//使用阻塞队列来实现线程安全
private static final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
//创建一个并行线程池
ExecutorService executor= Executors.newFixedThreadPool(10);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
}
链表阻塞队列,链表结构实现,通过引用指向将链表的元素串起来。
//完整代码链接在文章底部
private static final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
//创建一个并行线程池
ExecutorService executor= Executors.newFixedThreadPool(10);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
优先阻塞队列,通过数组实现,存储原理和优先队列一样。
private static final BlockingQueue<Integer> buffer = new PriorityBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new ProducerTask());
Thread.sleep(5000);
executorService.execute(new ConsumerTask());
executorService.shutdown();
}
Java基础学习/src/main/java/Progress/exa28_2 · 严家豆/Study - 码云 - 开源中国 (gitee.com)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。