当前位置:   article > 正文

JavaEE 第9节 阻塞队列详解

JavaEE 第9节 阻塞队列详解

一、概念

阻塞队列是在普通队列(先进先出的数据结构)的基础上增加了阻塞属性的特殊队列

       1)当阻塞队列空的时候,如果继续出队元素会进入阻塞状态,直到其他线程入队元素。

       2)当阻塞队列满的时候,如果继续入队元素也会进入阻塞状态,直到其他线程出元素。

阻塞队列队列的一个典型应用场景就是“生产者消费者模型”。


二、生产者消费者模型

生产者消费者模型共有三大模块:

1、生产者 

2、缓冲区(一般指的就是阻塞队列)

3、消费者

这里面生产者产生的数据需要传递给消费者去处理,但是生产者与消费者之间不是直接通信的而是借助中间模块“缓冲区”,也就是阻塞队列来完成

生产者把产生的数据都抛给阻塞队列,消费者获取数据也全在阻塞队列里面找。

举一个包饺子的形象例子:

如果左孩子,擀饺子皮速度比右孩子包饺子速度快,那么盖帘(阻塞队列)很可能就满了,满了的话,左孩子就会停下来玩会儿手机(进入阻塞状态),等右孩子(消费者)从盖帘(阻塞队列)拿走一个饺子皮,然后左孩子在取擀饺子皮。

阻塞队列优点

1、平衡生产者和消费者的处理能力,保护下游服务器

在上述包饺子的例子中,当生产者产生的产生的数据比较多的时候(擀面擀的包饺子的人快很多),如果没有阻塞队列,数据直接全部都给到消费者那么消费者处理数据的压力就非常大,甚至是服务器崩溃。但是有了阻塞队列作为缓冲区,就可以避免这种事情的发生。

有没有这样的疑问,数据量的激增是从生产者开始,然后是阻塞队列,最后是消费者的,为什么生产者、阻塞的队列不会先挂?


这实际上是与生产者和消费者的处理数据的规模量有关。

生产者实际上就是一个请求接收和转移窗口,它对数据的处理其实非常少(阻塞队列同理),相当于就是负责传输数据的,但是消费者不仅要接收数据,还有对数据进行计算处理,处理每个数据的任务量远远大于生产者、阻塞队列。

2、生产者和消费者之间解耦合

通过阻塞队列,生产者与消费者之间的联系实际上就变少了,他们的代码这和缓冲区有关,当其中的一个模块挂了或者需要增加消费者\生产者,对其他模块的影响或者修改比较少,利于代码的维护。

缺点

缺点也很明显,多增加了一个缓冲区模块,那么程序响应速度必然会降低!


三、JAVA标准库中阻塞队列的使用方式

常用接口认识

调用方法的选择

在BlockingQueue中,与普通队列一样带有poll和offer两个方法,但是一般都不用这两个方法,因为它们不带有阻塞效果

与之对应的带来了两个新的方法takeput。这两个方法是带有阻塞效果的:

  1. public class Queue {
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<Integer>(3);
  4. blockingQueue.put(1);
  5. blockingQueue.put(1);
  6. blockingQueue.put(1);
  7. blockingQueue.put(1);//在插入一个主线程就会WAITING
  8. }
  9. }

接下来用put和take模拟一个生产者


消费者模型:

  1. public class Queue {
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<Integer>(10);
  4. Thread thread1=new Thread(()->{
  5. int i=0;
  6. while(true){
  7. i++;
  8. System.out.println("生产一个元素:"+i);
  9. try {
  10. blockingQueue.put(i);
  11. } catch (InterruptedException e) {//调用put或者take都需要处理中断异常
  12. e.printStackTrace();
  13. }
  14. }
  15. });
  16. Thread thread2=new Thread(()->{
  17. while(true){
  18. try {
  19. int i=blockingQueue.take();
  20. Thread.sleep(1000);//模拟消费者处理速度慢的情况
  21. System.out.println("消费一个元素:"+i);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. });
  27. thread1.start();
  28. thread2.start();
  29. }
  30. }

执行了大约1分钟后的结果,程序稳定执行:

虽然生产速度要快于消费速度,但是没有出现程序崩溃的情况,最终程序会稳定执行下去,虽然可能会慢一点,这就是阻塞队列的优点和缺点的体现。


四、简单的阻塞队列实现

这里就不用泛型了,直接用一个String类的队列来实现,简单了解它的原理。

底层用数组,使用循环队列(含useSize)创建一个普通队列:

  1. class MyBlockingQueue {
  2. //用数组实现
  3. String[] data = null;
  4. //分别表示头、尾、队列长度
  5. int head = 0, tail = 0, useSize = 0;
  6. /**指向有效元素的指针是左闭右开的,[head,tail)
  7. * tail端put进
  8. * head端take出*/
  9. //构造方法,设置最大容量
  10. public MyBlockingQueue(int capacity) {
  11. this.data = new String[capacity];
  12. }
  13. //插入方法
  14. public void put(String s){
  15. //如果队列满了,不做处理
  16. if(useSize==data.length)return;
  17. data[tail]=s;//直接赋值,因为tail区间是开的
  18. tail++;
  19. if(tail>=data.length)tail=0;
  20. useSize++;//记得长度加一
  21. }
  22. //取出方法
  23. public String take(){
  24. //没有就取不出来
  25. if(useSize==0)return null;
  26. String ret=data[head];
  27. head++;
  28. if(head>=data.length)head=0;
  29. useSize--;
  30. return ret;
  31. }
  32. }

在这个普通队列的基础上,我们要实现阻塞的效果,在线程中,所以还需要对put和take方法进行改造,对方法进行加锁,并使用wait和notify方法来相互之间通信:

  1. class MyBlockingQueue {
  2. //用数组实现
  3. private String[] data = null;
  4. //分别表示头、尾、队列长度
  5. private volatile int head = 0, tail = 0, useSize = 0;//加volatile没有坏处,虽然出现内存可加性优化概率很低(while循环执行速度并不快)
  6. /**指向有效元素的指针是左闭右开的,[head,tail)
  7. * tail端put进
  8. * head端take出*/
  9. //构造方法,设置最大容量
  10. public MyBlockingQueue(int capacity) {
  11. this.data = new String[capacity];
  12. }
  13. //插入方法
  14. public void put(String s) throws InterruptedException {
  15. // //如果队列满了,不做处理
  16. // if(useSize==data.length)return;
  17. synchronized(this){/**this指的是当前类,创建的一个实例,其他实例对象也可以,只要对象是匹配的*/
  18. if(useSize==data.length){
  19. this.wait();//让当前线程先睡眠,等其他线程take元素了,然后再唤醒它
  20. }
  21. data[tail]=s;//直接赋值,因为tail区间是开的
  22. tail++;
  23. if(tail>=data.length)tail=0;
  24. useSize++;//记得长度加一
  25. this.notify();//put完了,反过来通知take
  26. }
  27. }
  28. //取出方法
  29. public String take() throws InterruptedException {
  30. // //没有就取不出来
  31. // if(useSize==0)return null;
  32. synchronized(this){
  33. if(useSize==0){
  34. this.wait();
  35. }
  36. String ret=data[head];
  37. head++;
  38. if(head>=data.length)head=0;
  39. useSize--;
  40. this.notify();//take完了,通知一下put
  41. return ret;
  42. }
  43. }
  44. }

这个代码其实还有一个小瑕疵,不知道大家有没有发现。

wait方法除了notify方法可以把他唤醒,之前学过的interrupt方法实际上也会把调用了wait方法的线程唤醒然后终止该线程!

倘若一个不小心在put或者take的时候,调用了interrupt方法,那么即使if条件不成立,程序还会继续往下执行(因为Java中断线程是柔和的方式,没有用try catch具体处理中断后的情况,那么程序就会继续往下执行),这就会造成不可预期的后果:

解决这个问题也很简单,直接把if换成while循环即可,即使中断了循环在进入睡眠即可:

  1. class MyBlockingQueue {
  2. //用数组实现
  3. private String[] data = null;
  4. //分别表示头、尾、队列长度
  5. private volatile int head = 0, tail = 0, useSize = 0;
  6. /**指向有效元素的指针是左闭右开的,[head,tail)
  7. * tail端put进
  8. * head端take出*/
  9. //构造方法,设置最大容量
  10. public MyBlockingQueue(int capacity) {
  11. this.data = new String[capacity];
  12. }
  13. //插入方法
  14. public void put(String s) throws InterruptedException {
  15. // //如果队列满了,不做处理
  16. // if(useSize==data.length)return;
  17. synchronized(this){/**this指的是当前类,创建的一个实例,其他实例对象也可以,只要对象是匹配的*/
  18. while(useSize==data.length){
  19. this.wait();//让当前线程先睡眠,等其他线程take元素了,然后再唤醒它
  20. }
  21. data[tail]=s;//直接赋值,因为tail区间是开的
  22. tail++;
  23. if(tail>=data.length)tail=0;
  24. useSize++;//记得长度加一
  25. this.notify();//put完了,反过来通知take
  26. }
  27. }
  28. //取出方法
  29. public String take() throws InterruptedException {
  30. // //没有就取不出来
  31. // if(useSize==0)return null;
  32. synchronized(this){
  33. while(useSize==0){
  34. this.wait();
  35. }
  36. String ret=data[head];
  37. head++;
  38. if(head>=data.length)head=0;
  39. useSize--;
  40. this.notify();//take完了,通知一下put
  41. return ret;
  42. }
  43. }
  44. }

注意:

以上代码成立的条件是put和take不可能同时调用进入WAITING状态,因为useSize不可能同时满足useSize==0&&useSize==data.length。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/984491
推荐阅读
相关标签
  

闽ICP备14008679号