当前位置:   article > 正文

多线程消费阻塞队列(生产者消费者模型)_中间件 消费者 生产者 线程 队列 阻塞 延时

中间件 消费者 生产者 线程 队列 阻塞 延时

一.几种主要的阻塞队列

ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()不可用不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。


1、消息生产者


  1. package com.es.queue;
  2. /**
  3. * Created by Administrator on 2018/7/1 0001.
  4. */
  5. import java.util.Random;
  6. import java.util.concurrent.BlockingQueue;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * 生产者线程
  11. *
  12. * @author jackyuj
  13. */
  14. public class Producer implements Runnable {
  15. private volatile boolean isRunning = true;//是否在运行标志
  16. private BlockingQueue queue;//阻塞队列
  17. private static AtomicInteger count = new AtomicInteger();//自动更新的值
  18. private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
  19. //构造函数
  20. public Producer(BlockingQueue queue) {
  21. this.queue = queue;
  22. }
  23. public void run() {
  24. String data = null;
  25. Random r = new Random();
  26. System.out.println("启动生产者线程!");
  27. try {
  28. while (isRunning) {
  29. System.out.println("正在生产数据...");
  30. Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数
  31. data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
  32. System.out.println("将数据:" + data + "放入队列...");
  33. if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
  34. System.out.println("放入数据失败:" + data);
  35. }
  36. }
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. Thread.currentThread().interrupt();
  40. } finally {
  41. System.out.println("退出生产者线程!");
  42. }
  43. }
  44. public void stop() {
  45. isRunning = false;
  46. }
  47. }



2、消息消费者

  1. package com.es.queue;
  2. /**
  3. * Created by Administrator on 2018/7/1 0001.
  4. */
  5. import java.util.Random;
  6. import java.util.concurrent.BlockingQueue;
  7. import java.util.concurrent.TimeUnit;
  8. /**
  9. * 消费者线程
  10. *
  11. * @author jackyuj
  12. */
  13. public class Consumer implements Runnable {
  14. private BlockingQueue<String> queue;
  15. private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
  16. //构造函数
  17. public Consumer(BlockingQueue<String> queue) {
  18. this.queue = queue;
  19. }
  20. public void run() {
  21. System.out.println("启动消费者线程!");
  22. Random r = new Random();
  23. boolean isRunning = true;
  24. try {
  25. while (isRunning) {
  26. System.out.println("正从队列获取数据...");
  27. String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
  28. if (null != data) {
  29. System.out.println("拿到数据:" + data);
  30. System.out.println("正在消费数据:" + data);
  31. Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
  32. } else {
  33. // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
  34. isRunning = false;
  35. }
  36. }
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. Thread.currentThread().interrupt();
  40. } finally {
  41. System.out.println("退出消费者线程!");
  42. }
  43. }
  44. }



3、测试程序入口


  1. package com.es.queue;
  2. /**
  3. * Created by Administrator on 2018/7/1 0001.
  4. */
  5. import com.es.queue.Consumer;
  6. import com.es.queue.Producer;
  7. import java.util.concurrent.BlockingQueue;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.LinkedBlockingQueue;
  11. public class BlockingQueueTest {
  12. public static void main(String[] args) throws InterruptedException {
  13. // 声明一个容量为10的缓存队列
  14. BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
  15. //new了三个生产者和一个消费者
  16. Producer producer1 = new Producer(queue);
  17. Producer producer2 = new Producer(queue);
  18. Producer producer3 = new Producer(queue);
  19. Consumer consumer = new Consumer(queue);
  20. // 借助Executors
  21. ExecutorService service = Executors.newCachedThreadPool();
  22. // 启动线程
  23. service.execute(producer1);
  24. service.execute(producer2);
  25. service.execute(producer3);
  26. service.execute(consumer);
  27. // 执行10s
  28. Thread.sleep(10 * 1000);
  29. producer1.stop();
  30. producer2.stop();
  31. producer3.stop();
  32. Thread.sleep(2000);
  33. // 退出Executor
  34. service.shutdown();
  35. }
  36. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/968500
推荐阅读
相关标签
  

闽ICP备14008679号