赞
踩
阻塞队列是在普通队列(先进先出的数据结构)的基础上增加了阻塞属性的特殊队列
1)当阻塞队列空的时候,如果继续出队元素会进入阻塞状态,直到其他线程入队元素。
2)当阻塞队列满的时候,如果继续入队元素也会进入阻塞状态,直到其他线程出元素。
阻塞队列队列的一个典型应用场景就是“生产者消费者模型”。
生产者消费者模型共有三大模块:
1、生产者
2、缓冲区(一般指的就是阻塞队列)
3、消费者
这里面生产者产生的数据需要传递给消费者去处理,但是生产者与消费者之间不是直接通信的而是借助中间模块“缓冲区”,也就是阻塞队列来完成。
生产者把产生的数据都抛给阻塞队列,消费者获取数据也全在阻塞队列里面找。
举一个包饺子的形象例子:
如果左孩子,擀饺子皮速度比右孩子包饺子速度快,那么盖帘(阻塞队列)很可能就满了,满了的话,左孩子就会停下来玩会儿手机(进入阻塞状态),等右孩子(消费者)从盖帘(阻塞队列)拿走一个饺子皮,然后左孩子在取擀饺子皮。
在上述包饺子的例子中,当生产者产生的产生的数据比较多的时候(擀面擀的包饺子的人快很多),如果没有阻塞队列,数据直接全部都给到消费者,那么消费者处理数据的压力就非常大,甚至是服务器崩溃。但是有了阻塞队列作为缓冲区,就可以避免这种事情的发生。
有没有这样的疑问,数据量的激增是从生产者开始,然后是阻塞队列,最后是消费者的,为什么生产者、阻塞的队列不会先挂?
这实际上是与生产者和消费者的处理数据的规模量有关。
生产者实际上就是一个请求接收和转移窗口,它对数据的处理其实非常少(阻塞队列同理),相当于就是负责传输数据的,但是消费者不仅要接收数据,还有对数据进行计算处理,处理每个数据的任务量远远大于生产者、阻塞队列。
通过阻塞队列,生产者与消费者之间的联系实际上就变少了,他们的代码这和缓冲区有关,当其中的一个模块挂了或者需要增加消费者\生产者,对其他模块的影响或者修改比较少,利于代码的维护。
缺点也很明显,多增加了一个缓冲区模块,那么程序响应速度必然会降低!
在BlockingQueue中,与普通队列一样带有poll和offer两个方法,但是一般都不用这两个方法,因为它们不带有阻塞效果!
与之对应的带来了两个新的方法take和put。这两个方法是带有阻塞效果的:
- public class Queue {
- public static void main(String[] args) throws InterruptedException {
-
- BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<Integer>(3);
- blockingQueue.put(1);
- blockingQueue.put(1);
- blockingQueue.put(1);
- blockingQueue.put(1);//在插入一个主线程就会WAITING
-
- }
- }
接下来用put和take模拟一个生产者
消费者模型:
- public class Queue {
- public static void main(String[] args) throws InterruptedException {
-
- BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<Integer>(10);
-
- Thread thread1=new Thread(()->{
- int i=0;
- while(true){
- i++;
- System.out.println("生产一个元素:"+i);
- try {
- blockingQueue.put(i);
- } catch (InterruptedException e) {//调用put或者take都需要处理中断异常
- e.printStackTrace();
- }
- }
- });
-
- Thread thread2=new Thread(()->{
- while(true){
- try {
- int i=blockingQueue.take();
- Thread.sleep(1000);//模拟消费者处理速度慢的情况
- System.out.println("消费一个元素:"+i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- });
- thread1.start();
- thread2.start();
-
- }
- }
执行了大约1分钟后的结果,程序稳定执行:
虽然生产速度要快于消费速度,但是没有出现程序崩溃的情况,最终程序会稳定执行下去,虽然可能会慢一点,这就是阻塞队列的优点和缺点的体现。
这里就不用泛型了,直接用一个String类的队列来实现,简单了解它的原理。
底层用数组,使用循环队列(含useSize)创建一个普通队列:
- class MyBlockingQueue {
- //用数组实现
- String[] data = null;
-
- //分别表示头、尾、队列长度
- int head = 0, tail = 0, useSize = 0;
- /**指向有效元素的指针是左闭右开的,[head,tail)
- * tail端put进
- * head端take出*/
-
-
-
- //构造方法,设置最大容量
- public MyBlockingQueue(int capacity) {
- this.data = new String[capacity];
- }
-
- //插入方法
- public void put(String s){
- //如果队列满了,不做处理
- if(useSize==data.length)return;
-
- data[tail]=s;//直接赋值,因为tail区间是开的
- tail++;
- if(tail>=data.length)tail=0;
- useSize++;//记得长度加一
- }
-
- //取出方法
- public String take(){
-
- //没有就取不出来
- if(useSize==0)return null;
-
- String ret=data[head];
- head++;
- if(head>=data.length)head=0;
- useSize--;
- return ret;
- }
-
- }
在这个普通队列的基础上,我们要实现阻塞的效果,在线程中,所以还需要对put和take方法进行改造,对方法进行加锁,并使用wait和notify方法来相互之间通信:
- class MyBlockingQueue {
- //用数组实现
- private String[] data = null;
-
- //分别表示头、尾、队列长度
- private volatile int head = 0, tail = 0, useSize = 0;//加volatile没有坏处,虽然出现内存可加性优化概率很低(while循环执行速度并不快)
- /**指向有效元素的指针是左闭右开的,[head,tail)
- * tail端put进
- * head端take出*/
-
-
-
- //构造方法,设置最大容量
- public MyBlockingQueue(int capacity) {
- this.data = new String[capacity];
- }
-
- //插入方法
- public void put(String s) throws InterruptedException {
- // //如果队列满了,不做处理
- // if(useSize==data.length)return;
-
- synchronized(this){/**this指的是当前类,创建的一个实例,其他实例对象也可以,只要对象是匹配的*/
- if(useSize==data.length){
- this.wait();//让当前线程先睡眠,等其他线程take元素了,然后再唤醒它
- }
- data[tail]=s;//直接赋值,因为tail区间是开的
- tail++;
- if(tail>=data.length)tail=0;
- useSize++;//记得长度加一
- this.notify();//put完了,反过来通知take
- }
- }
-
- //取出方法
- public String take() throws InterruptedException {
-
- // //没有就取不出来
- // if(useSize==0)return null;
-
-
- synchronized(this){
- if(useSize==0){
- this.wait();
- }
- String ret=data[head];
- head++;
- if(head>=data.length)head=0;
- useSize--;
- this.notify();//take完了,通知一下put
- return ret;
- }
- }
- }
这个代码其实还有一个小瑕疵,不知道大家有没有发现。
wait方法除了notify方法可以把他唤醒,之前学过的interrupt方法实际上也会把调用了wait方法的线程唤醒然后终止该线程!
倘若一个不小心在put或者take的时候,调用了interrupt方法,那么即使if条件不成立,程序还会继续往下执行(因为Java中断线程是柔和的方式,没有用try catch具体处理中断后的情况,那么程序就会继续往下执行),这就会造成不可预期的后果:
解决这个问题也很简单,直接把if换成while循环即可,即使中断了循环在进入睡眠即可:
- class MyBlockingQueue {
- //用数组实现
- private String[] data = null;
-
- //分别表示头、尾、队列长度
- private volatile int head = 0, tail = 0, useSize = 0;
- /**指向有效元素的指针是左闭右开的,[head,tail)
- * tail端put进
- * head端take出*/
-
-
-
- //构造方法,设置最大容量
- public MyBlockingQueue(int capacity) {
- this.data = new String[capacity];
- }
-
- //插入方法
- public void put(String s) throws InterruptedException {
- // //如果队列满了,不做处理
- // if(useSize==data.length)return;
-
- synchronized(this){/**this指的是当前类,创建的一个实例,其他实例对象也可以,只要对象是匹配的*/
- while(useSize==data.length){
- this.wait();//让当前线程先睡眠,等其他线程take元素了,然后再唤醒它
- }
- data[tail]=s;//直接赋值,因为tail区间是开的
- tail++;
- if(tail>=data.length)tail=0;
- useSize++;//记得长度加一
- this.notify();//put完了,反过来通知take
- }
- }
-
- //取出方法
- public String take() throws InterruptedException {
-
- // //没有就取不出来
- // if(useSize==0)return null;
-
-
- synchronized(this){
- while(useSize==0){
- this.wait();
- }
-
- String ret=data[head];
- head++;
- if(head>=data.length)head=0;
- useSize--;
- this.notify();//take完了,通知一下put
- return ret;
- }
- }
- }
注意:
以上代码成立的条件是put和take不可能同时调用进入WAITING状态,因为useSize不可能同时满足useSize==0&&useSize==data.length。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。