赞
踩
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
1、消息生产者
- package com.es.queue;
-
- /**
- * Created by Administrator on 2018/7/1 0001.
- */
- import java.util.Random;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * 生产者线程
- *
- * @author jackyuj
- */
- public class Producer implements Runnable {
-
- private volatile boolean isRunning = true;//是否在运行标志
- private BlockingQueue queue;//阻塞队列
- private static AtomicInteger count = new AtomicInteger();//自动更新的值
- private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
-
- //构造函数
- public Producer(BlockingQueue queue) {
- this.queue = queue;
- }
-
- public void run() {
- String data = null;
- Random r = new Random();
-
- System.out.println("启动生产者线程!");
- try {
- while (isRunning) {
- System.out.println("正在生产数据...");
- Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数
-
- data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
- System.out.println("将数据:" + data + "放入队列...");
- if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
- System.out.println("放入数据失败:" + data);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- Thread.currentThread().interrupt();
- } finally {
- System.out.println("退出生产者线程!");
- }
- }
-
- public void stop() {
- isRunning = false;
- }
- }
2、消息消费者
- package com.es.queue;
-
- /**
- * Created by Administrator on 2018/7/1 0001.
- */
- import java.util.Random;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 消费者线程
- *
- * @author jackyuj
- */
- public class Consumer implements Runnable {
-
- private BlockingQueue<String> queue;
- private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
-
- //构造函数
- public Consumer(BlockingQueue<String> queue) {
- this.queue = queue;
- }
-
- public void run() {
- System.out.println("启动消费者线程!");
- Random r = new Random();
- boolean isRunning = true;
- try {
- while (isRunning) {
- System.out.println("正从队列获取数据...");
- String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
- if (null != data) {
- System.out.println("拿到数据:" + data);
- System.out.println("正在消费数据:" + data);
- Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
- } else {
- // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
- isRunning = false;
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- Thread.currentThread().interrupt();
- } finally {
- System.out.println("退出消费者线程!");
- }
- }
-
-
- }
3、测试程序入口
- package com.es.queue;
-
- /**
- * Created by Administrator on 2018/7/1 0001.
- */
- import com.es.queue.Consumer;
- import com.es.queue.Producer;
-
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
-
- public class BlockingQueueTest {
-
- public static void main(String[] args) throws InterruptedException {
- // 声明一个容量为10的缓存队列
- BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
-
- //new了三个生产者和一个消费者
- Producer producer1 = new Producer(queue);
- Producer producer2 = new Producer(queue);
- Producer producer3 = new Producer(queue);
- Consumer consumer = new Consumer(queue);
-
- // 借助Executors
- ExecutorService service = Executors.newCachedThreadPool();
- // 启动线程
- service.execute(producer1);
- service.execute(producer2);
- service.execute(producer3);
- service.execute(consumer);
-
- // 执行10s
- Thread.sleep(10 * 1000);
- producer1.stop();
- producer2.stop();
- producer3.stop();
-
- Thread.sleep(2000);
- // 退出Executor
- service.shutdown();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。