当前位置:   article > 正文

28.2 Java进阶之线程间协作,生产者消费者Demo,阻塞队列_java队列 drainto

java队列 drainto

1.线程间协作

上一篇文章中说到可以让多个线程在临界区上的相互排斥来让线程间同步,从而让多线程程序能安全的运行。线程同步可以避免竞争条件的发生,但有时候,我们也需要线程之间的相互协作。
如果我们需要让不同的线程也能进行通信,我们可以使用条件对象来实现线程间的通信。
我们可以用条件对象让线程在执行到一半时进入等待阻塞状态,其他线程可以通过条件对象来等待阻塞的线程。

1.1 条件对象是什么?

当线程进入临界区后却发现只有满足某个条件之后才能往下执行,这时我们可以使用一个条件对象来管理那些已经获得了一个锁却不能做有用的工作的线程。

简单来说,当一个线程获得了锁准备运行程序的时候,但是代码中有一个if语句,如果if语句不满足,原则上程序不应该往下执行,但是线程占着锁不放是不合理的,这时我们可以使用条件对象让这个线程进入阻塞状态并释放锁,让锁资源能及时的被其他线程使用,当if条件满足后,其他线程会使用条件对象通知这个阻塞的线程可以往下执行了,这时这个线程就能顺利的往下执行了。

条件对象从编程的角度上看就是通过调用Lock对象的newCondition()方法而创建的条件对象。
在这里插入图片描述

然后我们可以使用这个条件对象的await(),signal(),signalAll()方法来实现线程之间的通信,让线程获得锁的利用率最大化。

1.1.1 Condition是一个接口

接口图如下:
在这里插入图片描述
可以看到这个接口主要有三个方法:await, signal, signaAll。
注意:使用锁的条件对象前必须要先获得条件对象的锁。

1.1.2 用例子来理解Condition

下面两个任务,一个是提款任务,一个是存款的。
提款任务的顺利执行的前提是余额足够,所以当余额不足时,它会等待存款任务执行直到符合条件。
在这里插入图片描述

1.2.使用条件对象实现生产者消费者多线程程序

程序逻辑图如下:
在这里插入图片描述

1.2.1 示例代码

//完整代码链接在文章底部
public static void main(String[] args) {
    ExecutorService executor= Executors.newFixedThreadPool(10);//创建一个并行线程池
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行结果:
在这里插入图片描述

2.阻塞队列

阻塞队列这样类型的一种的队列: 当我们试图向一个满队列中添加元素或者从空队列中删除元素时,会导致线程阻塞的一种队列。
我们可以使用阻塞队列来简化生产者消费者模式的编程,因为阻塞队列本身就是线程安全的并支持阻塞。

2.1.BlockingQueue

BlockingQueue是一个接口,它继承了Queue接口,是Queue的一种增强。
它在Queue的基础上增加了线程安全的同步定义处理。
其接口类图如下:
在这里插入图片描述

可以看到,阻塞队列主要的是put,take方法。其中put方法是向队列中安全的添加元素,take从队列中安全的取出元素。

简单来说,阻塞队列就是内部有条件锁的队列。使用它可以简化我们的多线程代码,阻塞队列本身就是线程安全的,我们无需为了保证队列安全性再去进行编程做同步处理。

我们可以看一下BlockQueue接口的定义:

//向队列添加元素,如果队列满了则抛出异常
boolean add(E e);
//向队列添加元素,如果队列满了返回false
boolean offer(E e);

//向队列添加元素,如果队列满了,进入等待阻塞状态,
//直到队列有空余位置或者线程被中断
void put(E e) throws InterruptedException;

//向队列添加元素,如果队列满了,进入等待阻塞状态,
//直到队列有空余位置或者超时或者线程被中断
boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;
    
//从队列中获取元素,如果队列为空,则进入等待阻塞状态
//直到队列有元素或线程被中断
E take() throws InterruptedException;

从队列中获取元素,如果队列为空,则进入等待阻塞状态
//直到队列有元素或线程被中断或超时
E poll(long timeout, TimeUnit unit)
    throws InterruptedException;
    
//预估队列剩余容量
int remainingCapacity();

//从队列中移除指定元素
boolean remove(Object o);

//判断指定的元素是否存在于队列中
public boolean contains(Object o);

//清空队列并向队列添加c中包含的元素
int drainTo(Collection<? super E> c);

//清空队列并向队列添加c中包含的元素,但限制最大数量
int drainTo(Collection<? super E> c, int maxElements);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2.2 .阻塞队列的具体实现

看图:
在这里插入图片描述

我们可以看到在Java中,阻塞队列的实现有三种:
ArrayBlockingQueue 数组阻塞队列
LinkedBlockingQueue 链表阻塞队列
PriorityBlockingQueue 优先阻塞队列

2.2.1 ArrayBlockingQueue 数组阻塞队列

这个队列是基于数组实现的阻塞队列。
阻塞队列中,主要用到的存取方法有:

//存
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;

//取
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
    throws InterruptedException;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

通过查阅源码可以知道,其中线程安全是依赖于ReentrantLock以及条件对象实现的。

final ReentrantLock lock;
  • 1

下面我们基于阻塞队列实现生产者消费者,有兴趣的同学可以对比一下上一节的基于锁的实现:

//完整代码链接在文章底部
public class BlockingQueueStudy {
   //使用阻塞队列来实现线程安全
private static final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(10);

public static void main(String[] args) {
    //创建一个并行线程池
    ExecutorService executor= Executors.newFixedThreadPool(10);
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这里插入图片描述

2.2.2 LinkedBlockingQueue

链表阻塞队列,链表结构实现,通过引用指向将链表的元素串起来。

//完整代码链接在文章底部
private static final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(10);

public static void main(String[] args) {
    //创建一个并行线程池
    ExecutorService executor= Executors.newFixedThreadPool(10);
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述

2.2.3 PriorityBlockingQueue

优先阻塞队列,通过数组实现,存储原理和优先队列一样。

private static final BlockingQueue<Integer> buffer = new PriorityBlockingQueue<>(10);

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    executorService.execute(new ProducerTask());
    Thread.sleep(5000);
    executorService.execute(new ConsumerTask());
    executorService.shutdown();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述

3 .代码仓库地址

Java基础学习/src/main/java/Progress/exa28_2 · 严家豆/Study - 码云 - 开源中国 (gitee.com)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/968503?site
推荐阅读
相关标签
  

闽ICP备14008679号