当前位置:   article > 正文

Java中的阻塞队列使用以及详解_java中阻塞队列的使用

java中阻塞队列的使用

文章目录

一、Queue接口

queue

1. 常见方法以及功能(不具有阻塞队列特性)

1.1 add(E e)

1、add(): add()方法在向队列添加元素时,如果队列已满,会抛出IllegalStateException异常。
offer(E e)

1.2. offer():

offer()方法在向队列添加元素时,如果队列已满,会返回false,并且不会抛出异常。

1.3. remove()

remove()方法在从队列头部移除元素时,如果队列为空,会抛出NoSuchElementException异常。

1.4. poll()

poll(): poll()方法在从队列头部移除元素时,如果队列为空,会返回null,并且不会抛出异常。

1.5. element()

element()方法在查看队列头部元素时,如果队列为空,会抛出NoSuchElementException异常。

1.6. peek()

peek()方法在查看队列头部元素时,如果队列为空,会返回null,并且不会抛出异常。

2. add和offer对比:

  1. 两者区别主要在于处理满队列时的行为不同。如果在队列已满时,你不希望抛出异常,而是希望通过返回值来判断添加是否成功,则可以使用offer()方法。如果你希望在队列已满时抛出异常,则可以使用add()方法。

  2. 验证:AbstractQueue#add

public boolean add(E e) {
	//调用add方法,如果添加成功,返回true,失败则抛出IllegalStateException
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. remove和poll对比:

  1. 区别主要在于处理空队列时的行为不同。如果你在移除元素时希望在队列为空时抛出异常,则可以使用remove()方法。如果你希望在队列为空时返回null,则可以使用poll()方法。

  2. 验证:AbstractQueue#remove

public E remove() {
   E x = poll();
   //不为空时直接返回,为空时抛出异常NoSuchElementException
   if (x != null)
       return x;
   else
       throw new NoSuchElementException();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. element#peek方法对比:

区别主要在于处理空队列时的行为不同。如果你在查看队列头部元素时希望在队列为空时抛出异常,则可以使用element()方法。如果你希望在队列为空时返回null,则可以使用peek()方法。
验证:AbstractQueue#element

public E element() {
    E x = peek();
    //不为空直接返回,为空抛出NoSuchElementException
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

二、BlockingQueue阻塞队列

1. BlockingQueue的定义

blockingqueue

  • BlockingQueue是Java中的接口(jdk1.5),它是一个支持阻塞操作的队列。阻塞操作意味着当队列为空时,从队列中获取元素的操作会阻塞,直到队列中有可用元素为止;当队列已满时,向队列中添加元素的操作会阻塞,直到队列有空闲位置为止。
  • BlockingQueue接口继承自java.util.Queue接口,并添加了一些额外的方法,用于支持阻塞操作。
  • 通过使用BlockingQueue,可以实现线程之间的安全通信和协调。例如,一个线程可以将数据放入BlockingQueue中,而另一个线程可以从队列中获取数据并进行处理。BlockingQueue会自动处理线程间的阻塞和唤醒操作,使得线程间的通信更加简单和可靠。

2. 核心方法(具有阻塞队列特性)

2.1 put方法

put方法用于将元素放入队列中。如果队列已满,put()方法会阻塞当前线程,直到队列有空闲位置可放入元素。当队列有空闲位置时,put()方法会将元素放入队列,并立即返回。

2.2 take方法

take方法用于从队列中取出元素。如果队列为空,take()方法会阻塞当前线程,直到队列中有元素可供取出。当队列中有元素可取出时,take()方法会将队列头部的元素取出并返回。

3. put#take对比

  1. 这两个方法都是阻塞方法,可以确保在队列满或空的情况下线程能够正确等待和唤醒。它们适用于生产者-消费者模式,其中生产者通过put()方法将元素放入队列,而消费者通过take()方法从队列中取出元素。
  2. 需要注意的是,put()方法和take()方法都会抛出InterruptedException异常,表示在等待过程中被中断。当线程在等待放入或取出元素时,如果线程被中断,它们会抛出InterruptedException异常,以便进行相应的处理。

4. 常见API以及处理方式

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()不可用不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

5. 阻塞队列的使用场景:生产者和消费者模型

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
    private static final int BUFFER_SIZE = 5;
    private static BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);

    public static void main(String[] args) {
        Thread producerThread = new Thread(new Producer());
        Thread consumerThread = new Thread(new Consumer());

        producerThread.start();
        consumerThread.start();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                int value = 0;
                while (true) {
                    buffer.put(value);
                    System.out.println("生产者生产了:" + value);
                    value++;
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int value = buffer.take();
                    System.out.println("消费者消费了:" + value);
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

案例分析:

  • 在这个示例中,我们创建了一个大小为5的阻塞队列作为缓冲区。生产者线程通过不断向队列中放入数据来模拟生产过程,消费者线程通过不断从队列中取出数据来模拟消费过程。当缓冲区已满时,生产者线程将被阻塞,直到有空间可用。当缓冲区为空时,消费者线程将被阻塞,直到有数据可用。

  • 在生产者线程中,我们使用put方法将数据放入队列中,并打印相应的信息。在消费者线程中,我们使用take方法从队列中取出数据,并打印相应的信息。在生产者和消费者的run方法中,我们通过捕获InterruptedException来处理阻塞过程中可能出现的异常。

三、Java里的阻塞队列

seven-queue

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表结构实现的一个有界阻塞队列
PriorityBlockingQueue支持按优先级排序的无界阻塞队列
DelayQueue基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue不存储元素的阻塞队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列

1. ArrayBlockingQueue

1.1 ArrayBlockingQueue的定义

ArrayBlockingQueue是Java中的一个阻塞队列实现,它在内部使用一个固定大小的数组来存储元素。它的特点是先进先出(FIFO)的顺序,即最先插入的元素会最先被取出。

1.2. ArrayBlockingQueue的一些重要特点:

1.2.1. 有界性:

ArrayBlockingQueue是一个有界队列,即在创建时需要指定队列的容量大小,一旦达到容量上限,后续的插入操作将被阻塞,直到队列中的元素被消费或者被移除。

1.2.2. 公平性:

ArrayBlockingQueue支持公平性策略,即按照线程的到达顺序来处理元素。当公平性设置为true时,线程会按照插入的顺序进行获取元素;当公平性设置为false时,线程获取元素的顺序是不确定的。

1.2.3. 阻塞操作:

ArrayBlockingQueue提供了阻塞的插入和获取操作。当队列已满时,插入操作会被阻塞,直到有空间可用。当队列为空时,获取操作会被阻塞,直到队列中有元素可用。

1.2.4. 线程安全:

ArrayBlockingQueue是线程安全的,多个线程可以同时对队列进行操作。它内部使用了锁和条件变量来保证线程安全性。

1.2.5. 支持可选的超时操作:

ArrayBlockingQueue的插入和获取操作可以设置一个超时时间,如果在指定的时间内仍然无法完成操作,那么操作将返回特定的结果。

1.2.6. 小结

总的来说,ArrayBlockingQueue是一个功能强大且线程安全的阻塞队列实现,适用于多线程环境下的生产者-消费者模式等场景,可以帮助处理并发操作的同步问题。

1.3. ArrayBlockingQueue的基本使用

1.3.1 代码如下:

下面是一个简单的示例代码,展示如何使用ArrayBlockingQueue:

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        // 创建一个容量为3的ArrayBlockingQueue
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        // 生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                // 向队列中插入元素
                queue.put(1);
                System.out.println("Producer: 1 added to the queue");
                Thread.sleep(1000);

                queue.put(2);
                System.out.println("Producer: 2 added to the queue");
                Thread.sleep(1000);

                queue.put(3);
                System.out.println("Producer: 3 added to the queue");
                Thread.sleep(1000);

                // 尝试插入第四个元素,会被阻塞
                queue.put(4);
                System.out.println("Producer: 4 added to the queue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                Thread.sleep(3000);

                // 从队列中获取元素
                int element1 = queue.take();
                System.out.println("Consumer: " + element1 + " removed from the queue");

                int element2 = queue.take();
                System.out.println("Consumer: " + element2 + " removed from the queue");

                int element3 = queue.take();
                System.out.println("Consumer: " + element3 + " removed from the queue");

                // 尝试获取第四个元素,会被阻塞
                int element4 = queue.take();
                System.out.println("Consumer: " + element4 + " removed from the queue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 启动生产者和消费者线程
        producerThread.start();
        consumerThread.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
1.3.2 案例分析:

在上面的示例中,我们创建了一个容量为3的ArrayBlockingQueue。生产者线程向队列中不断插入元素,并在插入后等待1秒钟。消费者线程在启动后等待3秒钟,然后从队列中不断获取元素。由于队列的容量是有限的,当队列满时,生产者线程会被阻塞,直到队列中有空间可用。同样地,当队列为空时,消费者线程也会被阻塞,直到队列中有元素可用。

运行上述代码,你将看到类似以下的输出:

Producer: 1 added to the queue
Producer: 2 added to the queue
Producer: 3 added to the queue
Consumer: 1 removed from the queue
Consumer: 2 removed from the queue
Consumer: 3 removed from the queue
Producer: 4 added to the queue
Consumer: 4 removed from the queue
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.4. 源码分析

1.4.1 ArrayBlockingQueue类的属性

array

1.4.1.1 items:这是一个数组,用于存储队列中的元素。
1.4.1.2 takeIndex:这是一个指示下一个要取出元素的索引。
1.4.1.3 putIndex:这是一个指示下一个要放入元素的索引。
1.4.1.4 count:这是当前队列中的元素数量。
1.4.1.5 lock:这是一个可重入锁,用于对队列的操作进行同步。
1.4.1.6 notEmpty:这是一个条件变量,用于表示队列非空。
1.4.1.7 notFull:这是一个条件变量,用于表示队列未满。
1.4.1.8 itrs:迭代器的快速失败机制。

通过使用itrs属性,ArrayBlockingQueue可以实现迭代器的快速失败机制,即在迭代器创建后,如果在迭代过程中队列发生结构性修改(例如添加或删除元素),迭代器会立即抛出ConcurrentModificationException异常,以保证迭代器的操作是安全和一致的。

1.4.2 ArrayBlockingQueue类的构造方法:
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
	if (capacity <= 0)
	   throw new IllegalArgumentException();
	this.items = new Object[capacity];
	lock = new ReentrantLock(fair);
	notEmpty = lock.newCondition();
	notFull =  lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
                         Collection<? extends E> c) {
   this(capacity, fair);

   final ReentrantLock lock = this.lock;
   lock.lock(); // Lock only for visibility, not mutual exclusion
   //通过在构造方法中使用锁,可以确保在构造过程中对共享变量(items)的修改对其他线程可见。这是因为锁的获取和释放都会导致主内存和工作内存之间的数据同步,保证了线程之间的可见性。
   try {
       int i = 0;
       try {
           for (E e : c) {
               checkNotNull(e);
               items[i++] = e;
           }
       } catch (ArrayIndexOutOfBoundsException ex) {
           throw new IllegalArgumentException();
       }
       count = i;
       putIndex = (i == capacity) ? 0 : i;
   } finally {
       lock.unlock();
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
1.4.2.1 ArrayBlockingQueue(int capacity)
  1. 作用:创建一个具有给定容量的ArrayBlockingQueue实例。
  2. 参数:capacity:队列的容量,即队列可以存储的最大元素数量。
  3. 描述:这个构造方法创建一个有界队列,其容量由capacity参数指定。队列中的元素按照FIFO(先进先出)的顺序进行存储和访问。
1.4.2.2 ArrayBlockingQueue(int capacity, boolean fair)
  1. 作用:创建一个具有给定容量和公平性设置的ArrayBlockingQueue实例。
  2. 参数:capacity:队列的容量,即队列可以存储的最大元素数量。
    2.1 fair:公平性设置,如果为true,则等待时间最长的线程将优先获得访问队列的权限;
    2.2 如果为false,则不保证公平性。
  3. 描述:这个构造方法与上一个方法类似,但多了一个fair参数,用于设置队列是否具有公平性。如果设置为true,队列将按照FIFO的顺序授予等待时间最长的线程访问权限,即先等待的线程先获得访问权限。如果设置为false,不保证公平性,线程的访问顺序可能是不确定的。
1.4.2.3 ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
  1. 作用:创建一个具有给定容量、公平性设置和初始元素的ArrayBlockingQueue实例。
  2. 参数:
    2.1 capacity:队列的容量,即队列可以存储的最大元素数量。
    2.2 fair:公平性设置,如果为true,则等待时间最长的线程将优先获得访问队列的权限;
    2.3 如果为false,则不保证公平性。
    2.4 c:初始元素的集合,可以为空。
  3. 描述:这个构造方法与上一个方法类似,但多了一个c参数,用于初始化队列。c参数是一个集合,其中的元素将按照FIFO的顺序被添加到队列中。如果c参数为null或为空集合,则创建一个空队列。
  4. 总结来说,ArrayBlockingQueue的三个构造方法分别用于创建具有不同特性的有界阻塞队列实例。通过这些构造方法,可以指定队列的容量、公平性设置和初始元素,以满足不同的需求。
1.4.3 put方法(入队):
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         while (count == items.length)
             notFull.await();
         enqueue(e);
     } finally {
         lock.unlock();
     }
 }
private void enqueue(E x) {
   // assert lock.getHoldCount() == 1;
   // assert items[putIndex] == null;
   final Object[] items = this.items;
   items[putIndex] = x;
   if (++putIndex == items.length)
       putIndex = 0;
   count++;
   notEmpty.signal();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  1. 针对元素非空校验
  2. 获取一把独占锁,支持中断
  3. 当队列中元素数量等于数组长度(队列满了),调用notFull.await()将生产者线程阻塞
  4. 如果队里没有满,需要调用enqueue入队
  5. 将当前要添加的元素放入数组(putIndex指向的是下一个索引位置,所以可以直接定位)
  6. 如果++putIndex正好等于数组长度将putIndex置为0(环形数组)
  7. count++ 元素个数+1
  8. 准备唤醒从阻塞队列中获取元素的消费者线程notEmpty.signal(),这里涉及条件队列到同步队列的转换(此处不展开)
  9. 最后调用lock.unlock()唤醒队里的消费者线程
1.4.3 take方法(出队)
public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         while (count == 0)
             notEmpty.await();
         return dequeue();
     } finally {
         lock.unlock();
     }
 }
 private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 获取一把独占锁(注意取与放操作是互斥的)
  2. 如果队列是空的,调用notEmpty.await()阻塞消费者线程
  3. 如果不是空的 ,调用dequeue方法,进行出队,将出队位置元素置null
  4. 如果takeIndex移动到数组末尾,则进行重置takeIndex = 0
  5. 数组中的元素个数-1,count–
  6. 调用notFull.signal()唤醒生产者线程,因为此时队列已经有空位置了

2. LinkedBlockingQueue

2.1 LinkedBlockingQueue的定义

LinkedBlockingQueue是Java中的一个可选有界/无界阻塞队列实现,基于链表数据结构。

2.2 LinkedBlockingQueue的优缺点

2.2.1 优点:
  1. 高效性能:LinkedBlockingQueue在高并发情况下表现出很好的性能,因为它使用了两个独立的锁来控制线程的访问,读操作和写操作可以同时进行,提高了并发性能。
  2. 可以作为无界队列:当使用无参构造方法创建LinkedBlockingQueue时,它将成为一个无界队列,可以无限制地添加元素,适用于处理大量任务的情况。
  3. 可以作为有界队列:当使用带参构造方法创建LinkedBlockingQueue时,可以指定队列的容量,限制队列存储的元素数量,避免内存溢出。
2.2.2 缺点:
  1. 内存占用:当使用无界队列时,LinkedBlockingQueue会无限制地添加元素,可能会占用大量的内存资源,导致系统资源受限。
  2. 对公平性的支持有限:LinkedBlockingQueue对于公平性的支持有限,只能保证线程在获取锁时按照FIFO的顺序获取,但不能保证线程在等待时按照FIFO的顺序获得访问权限。
  3. 需要额外的空间:LinkedBlockingQueue使用链表来存储元素,每个节点都需要额外的空间来存储链表节点的引用和元素值,因此会占用更多的内存空间。
2.2.3 小结:

总结来说,LinkedBlockingQueue具有高效性能和灵活性,可以作为无界队列或有界队列使用,但需要注意内存占用和对公平性的限制。根据具体的应用场景和需求,可以选择合适的阻塞队列实现。

2.3 LinkedBlockingQueue的基本使用

2.3.1 代码如下:

LinkedBlockingQueue是一个基于链表的有界或无界的阻塞队列。
下面是一个使用LinkedBlockingQueue的Demo,其中体现了有界和无界的特点:

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        // 使用无界的LinkedBlockingQueue
        LinkedBlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
        System.out.println("无界队列:");

        // 生产者线程
        Thread producer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    unboundedQueue.put(i);
                    System.out.println("生产者1:生产了" + i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumer1 = new Thread(() -> {
            try {
                while (true) {
                    int value = unboundedQueue.take();
                    System.out.println("消费者1:消费了" + value);
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer1.start();
        consumer1.start();

        Thread.sleep(10000);

        // 使用有界的LinkedBlockingQueue
        LinkedBlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(3);
        System.out.println("有界队列:");

        // 生产者线程
        Thread producer2 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    boundedQueue.put(i);
                    System.out.println("生产者2:生产了" + i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumer2 = new Thread(() -> {
            try {
                while (true) {
                    int value = boundedQueue.take();
                    System.out.println("消费者2:消费了" + value);
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer2.start();
        consumer2.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
2.3.2 案例分析:
  1. 在上面的代码中,首先创建了一个无界的LinkedBlockingQueue unboundedQueue,然后创建了一个生产者线程和一个消费者线程。生产者线程每隔1秒向队列中添加一个元素,消费者线程每隔2秒从队列中取出一个元素。由于LinkedBlockingQueue是无界的,所以生产者线程可以一直向队列中添加元素,消费者线程也可以一直从队列中取出元素。

  2. 接着,创建了一个有界的LinkedBlockingQueue boundedQueue,并创建了另一个生产者线程和消费者线程。有界队列的容量为3,即最多只能容纳3个元素。生产者线程每隔1秒向队列中添加一个元素,消费者线程每隔2秒从队列中取出一个元素。当队列已满时,生产者线程会被阻塞,直到有空间可以添加新元素。当队列为空时,消费者线程会被阻塞,直到有元素可以取出。

  3. 通过运行以上代码,可以观察到无界队列中生产者线程可以一直生产元素,消费者线程可以一直消费元素;而有界队列中生产者线程在队列满时会被阻塞,消费者线程在队列空时会被阻塞,体现了有界队列的特点。

2.4 源码分析

2.4.1 LinkedBlockingQueue类的属性

linkedblockingqueue
LinkedBlockingQueue类是Java中的一个阻塞队列实现,它具有以下属性:

2.4.1.1 capacity:队列的容量。

即队列中最多可以容纳的元素数量。该属性决定了队列的最大长度。

2.4.1.2. count:当前队列中的元素数量。

每当有元素被添加到队列中或从队列中取出时,count属性会相应地增加或减少。

2.4.1.3. head:队列的头部节点,即队列中第一个元素所在的节点。

通过head节点,可以快速获取队列的第一个元素。

2.4.1.4. last:队列的尾部节点,即队列中最后一个元素所在的节点。

通过last节点,可以快速获取队列的最后一个元素。

2.4.1.5. takeLock:用于保护从队列中取出元素的锁。

使用该锁可以确保在多线程环境下,只有一个线程可以进行取出操作。

2.4.1.6. notEmpty:一个条件变量,用于表示队列不为空。

当队列中有元素时,该条件变量会被唤醒,以通知等待中的线程可以进行取出操作。

2.4.1.7. putLock:用于保护向队列中添加元素的锁。

使用该锁可以确保在多线程环境下,只有一个线程可以进行添加操作。

2.4.1.8. notFull:一个条件变量,用于表示队列不满。

当队列中有空间可以容纳新的元素时,该条件变量会被唤醒,以通知等待中的线程可以进行添加操作。

2.4.1.9. 属性总结:

这些属性在实现LinkedBlockingQueue的功能中起到了重要的作用。capacity属性定义了队列的容量,count属性跟踪队列中的元素数量,head和last属性指示队列的头部和尾部位置。takeLock和putLock分别用于互斥地保护取出和添加操作,notEmpty和notFull用于线程间的等待和通知机制,以确保队列的一致性和线程安全性。

2.4.2 LinkedBlockingQueue的构造方法:
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
	if (capacity <= 0) throw new IllegalArgumentException();
	this.capacity = capacity;
	last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

LinkedBlockingQueue类提供了三个构造方法,它们的作用如下:
construct

2.4.2.1. LinkedBlockingQueue()

这是LinkedBlockingQueue类的默认构造方法,创建一个无界队列。无界队列的容量可以无限增长,理论上可以一直添加元素而不会阻塞。

2.4.2.1. LinkedBlockingQueue(int capacity)

这个构造方法创建了一个有界队列,容量由参数capacity指定。有界队列的容量是固定的,一旦达到容量限制,后续的添加操作会被阻塞。

2.4.2.3. LinkedBlockingQueue(Collection<? extends E> c)

这个构造方法创建了一个包含指定集合c中所有元素的队列。元素按照迭代器返回的顺序依次添加到队列中。如果集合c是null,则会抛出NullPointerException。

这三个构造方法提供了不同的方式来创建LinkedBlockingQueue对象,以满足不同的需求。无界队列适用于需要不限制队列长度的场景,而有界队列适用于需要限制队列长度的场景。通过指定初始集合,可以快速初始化一个队列并包含指定的元素。

2.4.3 put方法:
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    last = last.next = node;
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  1. 构造Node结点,使用put锁加锁,保证入队线程安全
  2. 如果count.get() == capacity队列容量满了,生产者线程调用notFull.await()阻塞
  3. 否则调用enqueue,将node结点入队,插入链表尾部
  4. count.getAndIncrement(),元素个数+1
  5. 如果count+1小于队列容量capacity,准备唤醒生产者线程(队列没有满)
  6. putLock.unlock()释放锁进行唤醒
  7. if (c == 0) 这里代表队列有一个元素(count.getAndIncrement()会返回旧值,实际1)
  8. signalNotEmpty() 唤醒消费者线程
2.4.4 take方法:
public E take() throws InterruptedException {
   E x;
   int c = -1;
   final AtomicInteger count = this.count;
   final ReentrantLock takeLock = this.takeLock;
   takeLock.lockInterruptibly();
   try {
       while (count.get() == 0) {
           notEmpty.await();
       }
       x = dequeue();
       c = count.getAndDecrement();
       if (c > 1)
           notEmpty.signal();
   } finally {
       takeLock.unlock();
   }
   if (c == capacity)
       signalNotFull();
   return x;
}
private E dequeue() {
     Node<E> h = head;
     Node<E> first = h.next;
     h.next = h; // help GC
     head = first;
     E x = first.item;
     first.item = null;
     return x;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  1. 获取一把take锁,take锁加锁
  2. 如果count.get() == 0,说明队列是空的,执行notEmpty.await() 需要阻塞消费者线程
  3. 执行dequeue方法进行出队,头结点出队
  4. 执行count.getAndDecrement(),将队列元素个数-1
  5. 如果c > 1,队列有元素,准备唤醒消费者线程
  6. 调用takeLock.unlock(),释放锁,进行唤醒
  7. 如果队列还剩一个空闲位置,尝试唤醒一个put的等待线程count.getAndDecrement()返回的是旧值。

3. PriorityBlockingQueue

3.1 PriorityBlockingQueue的定义

  1. PriorityBlockingQueue是一个线程安全的并发队列,它继承自BlockingQueue接口,实现了一个基于优先级的无界阻塞队列。它的特点是当多个线程同时插入元素时,会按照元素的优先级进行排序,并且可以保证在获取元素时,总是返回优先级最高的元素。

  2. PriorityBlockingQueue使用了可重入锁来实现线程安全性,并且使用了条件变量来实现线程间的等待和通知机制。它的内部是通过一个堆数据结构来存储元素,并且在插入和删除元素时会自动进行堆调整,以保证堆的性质。

  3. PriorityBlockingQueue的插入操作和删除操作的时间复杂度都是O(logN),其中N为队列的大小。它提供了一系列的方法来插入、删除和获取元素,包括put、take、offer、poll等。

  4. 需要注意的是,PriorityBlockingQueue的元素必须实现Comparable接口,或者在构造PriorityBlockingQueue时提供一个Comparator对象来指定元素的优先级比较规则。如果元素不具备比较性,那么在插入和删除元素时可能会抛出ClassCastException异常。

  5. 总之,PriorityBlockingQueue是一个线程安全的、基于优先级的无界阻塞队列,它可以在多线程环境下安全地进行元素的插入、删除和获取操作,且保证了获取元素时总是返回优先级最高的元素。

3.2 PriorityBlockingQueue的优缺点

3.2.1 优点:
3.2.1.1. 线程安全:

PriorityBlockingQueue是线程安全的,多个线程可以同时操作PriorityBlockingQueue而不会出现数据不一致的问题。

3.2.1.2. 优先级排序:

PriorityBlockingQueue会根据元素的优先级进行排序,保证在获取元素时总是返回优先级最高的元素。这对于需要按照优先级进行处理的场景非常有用。

3.2.1.3. 无界队列:

PriorityBlockingQueue是一个无界队列,可以存储任意数量的元素。这在需要处理大量元素的场景下非常有优势,不会出现队列溢出的情况。

3.2.2 缺点:
3.2.2.1. 性能开销:

由于PriorityBlockingQueue需要实时地对元素进行排序和调整堆,插入和删除元素的性能开销相对较大。这可能导致在高并发环境下性能下降。

3.2.2.2. 需要实现Comparable接口或提供Comparator对象:

PriorityBlockingQueue要求元素必须实现Comparable接口,或者在构造PriorityBlockingQueue时提供一个Comparator对象来指定元素的优先级比较规则。这对于一些元素类型来说可能是一个额外的工作量。

3.2.3 小结:
  1. 综上所述,PriorityBlockingQueue作为一个线程安全的、基于优先级的无界阻塞队列,在需要按照优先级进行处理的场景下非常有用。
  2. 它具有线程安全、优先级排序和无界队列的优点,但也存在性能开销和需要额外实现接口或提供比较器的缺点。
  3. 因此,在选择使用PriorityBlockingQueue时需要权衡其优点和缺点,并根据具体需求来决定是否使用。

3.3 PriorityBlockingQueue的基本使用

3.3.1 使用场景:医院急诊科室

在医院的急诊科室中,病人的病情需要按照优先级进行处理,例如心脏病、中风等需要优先处理。可以使用PriorityBlockingQueue来管理急诊病人的处理顺序,根据病人的优先级进行排序,保证高优先级的病人先得到医疗救治。

3.3.2 代码如下:

首先,我们需要定义一个Patient类,用于表示病人的信息,包括姓名和病情优先级:

public class Patient implements Comparable<Patient> {
    private String name;
    private int priority;

    public Patient(String name, int priority) {
        this.name = name;
        this.priority = priority;
    }

    public String getName() {
        return name;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public int compareTo(Patient other) {
        // 比较病情优先级,优先级高的排在前面
        return Integer.compare(other.priority, this.priority);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

然后,我们可以创建一个PriorityBlockingQueue来管理急诊病人的处理顺序:

PriorityBlockingQueue<Patient> emergencyRoom = new PriorityBlockingQueue<>();

// 添加病人到急诊科室
emergencyRoom.add(new Patient("John", 1)); // 心脏病,优先级为1
emergencyRoom.add(new Patient("Emily", 2)); // 中风,优先级为2
emergencyRoom.add(new Patient("David", 3)); // 普通病情,优先级为3

// 处理病人
while (!emergencyRoom.isEmpty()) {
    Patient patient = emergencyRoom.poll();
    System.out.println("处理病人:" + patient.getName());
    // 进行相应的医疗救治操作
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3.3.3 案例分析:
  1. 在上述代码中,我们首先创建一个PriorityBlockingQueue来管理急诊病人的处理顺序。然后,我们通过调用add方法向队列中添加病人,每个病人都有一个病情优先级。接下来,我们使用一个循环来处理病人,每次从队列中取出优先级最高的病人进行处理,并输出病人的姓名。在实际应用中,处理病人的操作可以根据具体需求进行扩展。
  2. 这样,我们就可以使用Java的PriorityBlockingQueue来实现医院急诊科室功能,根据病人的病情优先级进行处理。

4. DelayQueue

4.1 DelayQueue的定义

  1. DelayQueue是Java中的一个实现了BlockingQueue接口的特殊队列,它是一个无界队列,用于存储具有过期时间的元素。在DelayQueue中,元素必须实现Delayed接口,该接口定义了元素的过期时间以及与其他元素的比较方法。

  2. DelayQueue是一个按照元素的过期时间进行排序的队列,即最先过期的元素会被放在队列的头部,最晚过期的元素会被放在队列的尾部。当从DelayQueue中获取元素时,只有在元素的过期时间到达后,该元素才能被取出。

4.2 DelayQueue的优缺点

4.2.1 优点:
4.2.1.1. 提供了可靠的延迟任务调度机制:

DelayQueue允许将任务按照延迟时间进行排序,保证了任务按照预定的时间顺序执行。这对于需要在特定时间执行任务的场景非常有用,比如定时任务、延迟队列等。

4.2.1.2. 线程安全:

DelayQueue是线程安全的,并发环境下多个线程可以安全地读取和操作其中的元素,不需要使用额外的同步机制。

4.2.1.3. 高效的实现:

DelayQueue的内部实现基于优先级队列(PriorityQueue),具有高效的插入和删除操作。当元素被插入到队列中时,会根据元素的延迟时间进行排序,保证了队列中的元素始终是有序的。

4.2.2 缺点:
4.2.2.1. 不支持随机访问:

由于DelayQueue是基于优先级队列实现的,它不支持随机访问元素。只能按照队列的顺序依次取出元素,不能直接访问指定位置的元素。

4.2.2.2. 不支持修改元素的延迟时间:

一旦元素被添加到DelayQueue中,就不能再修改其延迟时间。如果需要修改延迟时间,只能先将元素从队列中移除,然后再重新添加。

4.2.2.3. 对内存的占用较大:

DelayQueue需要维护一个优先级队列,并且每个元素都会占用一定的内存空间。如果队列中的元素非常多,可能会导致内存占用较大。

4.2.3 小结:

DelayQueue是一个方便的并发集合类,适用于需要按照延迟时间排序的场景。它提供了可靠的延迟任务调度机制,是实现定时任务和延迟队列的理想选择。然而,由于其不支持随机访问和修改元素的延迟时间,以及可能导致较大的内存占用,需要根据具体场景来选择使用。

4.3 DelayQueue的基本使用

4.3.1 使用场景:订单超时取消

在电商平台中,当用户下单后,可以将订单信息加入到DelayQueue中,设置一个合理的延迟时间。如果在延迟时间内用户未支付,系统可以自动取消订单。

4.3.2 代码如下:

下面是使用DelayQueue实现订单超时取消的Java代码示例:

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

// 订单类
class Order implements Delayed {
    private String orderId; // 订单号
    private long timeout; // 超时时间戳

    public Order(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout;
    }

    public String getOrderId() {
        return orderId;
    }

    // 计算剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    // 比较器,用于排序
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }
}

// 订单管理类
class OrderManager {
    private DelayQueue<Order> delayQueue = new DelayQueue<>();

    // 创建订单
    public void createOrder(String orderId, long timeout) {
        Order order = new Order(orderId, System.currentTimeMillis() + timeout);
        delayQueue.put(order);
        System.out.println("订单 " + orderId + " 创建成功");
    }

    // 取消订单
    public void cancelOrder(String orderId) {
        for (Order order : delayQueue) {
            if (order.getOrderId().equals(orderId)) {
                delayQueue.remove(order);
                System.out.println("订单 " + orderId + " 取消成功");
                return;
            }
        }
        System.out.println("订单 " + orderId + " 不存在");
    }

    // 启动处理超时订单的线程
    public void startCancelThread() {
        Thread cancelThread = new Thread(() -> {
            while (true) {
                try {
                    Order order = delayQueue.take(); // 从DelayQueue中取出超时订单
                    System.out.println("订单 " + order.getOrderId() + " 超时,自动取消");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        cancelThread.start();
    }
}

public class Main {
    public static void main(String[] args) {
        OrderManager orderManager = new OrderManager();
        orderManager.startCancelThread();

        // 创建订单
        orderManager.createOrder("1001", 5000);
        orderManager.createOrder("1002", 3000);
        orderManager.createOrder("1003", 7000);

        // 取消订单
        orderManager.cancelOrder("1002");

        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
4.3.3 案例分析:

anli
从输出结果中可以看到,订单1001在5000毫秒后被取消,订单1003在7000毫秒后被取消。这表明订单超时取消功能成功实现。

5. SynchronousQueue

5.1 SynchronousQueue的定义

  1. SynchronousQueue是Java并发包中提供的一种特殊类型的阻塞队列。它是一种没有容量的队列,每个插入操作必须等待另一个线程的相应移除操作,反之亦然。

  2. 简单来说,SynchronousQueue是一种用于线程间同步通信的队列。它的特点是在插入元素时,如果没有其他线程正在等待移除元素,插入操作将会阻塞,直到有其他线程移除该元素;而在移除元素时,如果没有其他线程正在等待插入元素,移除操作也会被阻塞,直到有其他线程插入元素。

  3. 由于SynchronousQueue没有容量,所以它并不存储元素。相反,它只是在插入和移除操作之间传递元素。这使得SynchronousQueue非常适合于一些需要线程间直接传递数据的场景,例如线程池的任务分发等。

5.2 SynchronousQueue的优缺点

5.2.1 优点:
5.2.1.1 提供了有效的线程间同步通信机制:

SynchronousQueue在插入和移除之间直接传递元素,避了使用中间缓冲,从而提供了效的线程间通信制。

5.2.1.2 高并发性能:

SynchronousQueue没有实际存储元素,因此它可以支持非常高的并发度,多个线程可以同时插入和移除元素。

5.2.2 缺点:
5.2.2.1. 队列容量为0:

SynchronousQueue的容量为0,意味着它不能保存任何元素。这可能会限制一些需要缓冲区的场景,例如生产者-消费者模型中的解耦。

5.2.2.2. 容易发生阻塞:

由于SynchronousQueue的特性,插入和移除操作都可能会导致线程阻塞。如果没有其他线程等待相反的操作,插入操作将会阻塞直到有其他线程移除元素,移除操作也会阻塞直到有其他线程插入元素。这可能会导致程序的复杂性增加。

5.2.2.3. 需要额外的线程协调:

由于SynchronousQueue需要一对线程进行插入和移除操作,因此需要额外的线程协调来保证操作的成功。如果某个线程插入元素而没有其他线程移除,或者某个线程移除元素而没有其他线程插入,那么可能会导致线程阻塞,甚至死锁。

5.2.3 小结:

综上所述,SynchronousQueue适用于需要高效的线程间同步通信的场景,但在一些需要缓冲区或者简单的生产者-消费者模型中可能不太适用。使用时需要谨慎考虑其特性和对线程的要求。

5.3 SynchronousQueue的基本使用

5.3.1 使用场景:
  1. 一个贴切的生活实际案例是,假设有一个餐馆的厨房和服务员之间需要进行实时的菜品传递。当厨师烹饪完成一道菜品后,他会将菜品放入一个SynchronousQueue中。服务员会在这个队列中等待,一旦有菜品放入队列,服务员就会立即取出菜品并将其送往对应的餐桌。

  2. 在这个案例中,SynchronousQueue充当了一个中转站的角色,确保了厨师和服务员之间的菜品传递是同步的。当厨师烹饪完成菜品时,他必须等待菜品被取走后才能继续烹饪下一道菜品,而服务员在没有菜品可供上菜时也必须等待。

  3. 这种实时的菜品传递机制可以有效地避免厨师和服务员之间的菜品堆积和混乱,确保了菜品的及时送达,提高了餐馆的服务效率和顾客的用餐体验。

5.3.2 代码如下:
import java.util.concurrent.SynchronousQueue;

class Chef implements Runnable {
    private SynchronousQueue<String> queue;

    public Chef(SynchronousQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            // 模拟烹饪过程
            Thread.sleep(2000);
            String dish = "烤鸭";
            System.out.println("厨师烹饪完成:" + dish);
            queue.put(dish); // 将菜品放入队列
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Waiter implements Runnable {
    private SynchronousQueue<String> queue;

    public Waiter(SynchronousQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println("等待菜品...");
            String dish = queue.take(); // 从队列中取出菜品
            System.out.println("服务员取出菜品:" + dish);
            System.out.println("菜品送到餐桌");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Restaurant {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        Thread chefThread = new Thread(new Chef(queue));
        Thread waiterThread = new Thread(new Waiter(queue));

        chefThread.start();
        waiterThread.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
5.3.3 案例分析:
  1. 在这个例子中,Chef(厨师)和Waiter(服务员)分别实现了Runnable接口,并在run()方法中执行相应的操作。
  2. Chef线程表示厨师烹饪菜品,当菜品烹饪完成后,将菜品放入SynchronousQueue中。3. Waiter线程表示服务员等待菜品,当有菜品放入队列后,从队列中取出菜品并进行相应的操作。
  3. 通过使用SynchronousQueue,我们实现了厨师和服务员之间的实时菜品传递机制,确保了菜品的同步和及时传递。

6. LinkedTransferQueue

6.1 LinkedTransferQueue的定义

LinkedTransferQueue是Java并发包中的一个类,它是一个基于链表的无界阻塞队列,实现了TransferQueue接口。LinkedTransferQueue可以用于在线程间传递元素,它提供了一种特殊的操作,即如果当前有等待的消费者线程,就直接将元素传递给消费者线程,否则将元素添加到队列中等待消费。

6.2 LinkedTransferQueue的优缺点

6.2.1 优点:
6.2.1.1. 高效性:

LinkedTransferQueue使用链表结构来保存元素,插入和删除操作效率较高。

6.2.1.2. 无界队列:

LinkedTransferQueue没有容量限制,可以根据需要动态地增加元素,可以持续地接收元素而不会阻塞生产者线程。

6.2.1.3. 阻塞操作:

LinkedTransferQueue提供了多种阻塞操作,可以在元素不可用时阻塞线程,直到元素可用或等待超时。

6.2.1.4. 优先级传递:

LinkedTransferQueue支持优先级传递,可以将元素传递给等待时间最长的消费者线程,减少等待时间。

6.2.1.5. 公平性:

LinkedTransferQueue可以选择公平或非公平的访问策略,可以控制元素的传递顺序。

6.2.1.6. 支持消费者优先:

LinkedTransferQueue提供了tryTransfer()方法,可以尝试立即将元素传递给消费者线程,避免生产者线程的阻塞。

6.2.2 缺点:
6.2.2.1. 内存占用:

LinkedTransferQueue使用链表来保存元素,相比于使用数组,链表需要更多的内存空间。

6.2.2.2. 操作复杂性:

LinkedTransferQueue的操作相对复杂,需要考虑到阻塞和传递的逻辑,使用不当可能会导致线程死锁或性能下降。

6.2.2.3. 不适合高吞吐量场景:

由于LinkedTransferQueue是基于链表的,对于高吞吐量的场景,可能存在较多的节点分配和释放操作,导致性能下降。

6.2.2.4. 存在竞争:

LinkedTransferQueue在多线程环境下可能存在竞争,需要合理地处理竞争条件,避免出现数据不一致或线程安全问题。

6.2.3 小结:

综上所述,LinkedTransferQueue具有高效的传递和阻塞操作,适用于需要灵活调整容量和优先级传递的场景。但在一些特定的场景下,如对内存占用和吞吐量要求较高的情况下,可能不适合使用。

6.3 LinkedTransferQueue的基本使用

6.3.1 使用场景:生产者-消费者模型
6.3.2 代码如下:
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueDemo {
    public static void main(String[] args) {
        LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

        // 生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                // 生产一个元素并立即传递给消费者线程
                queue.transfer("Hello");
                System.out.println("Producer: Element transferred");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                // 等待接收元素
                String element = queue.take();
                System.out.println("Consumer: Element received - " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 启动生产者和消费者线程
        producerThread.start();
        consumerThread.start();

        // 等待线程执行完毕
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
6.3.3 案例分析:

以上示例创建了一个LinkedTransferQueue,并通过transfer()方法在生产者线程中传递一个元素给消费者线程。消费者线程使用take()方法等待接收元素,一旦生产者线程传递了元素,消费者线程将会接收到并打印输出。

7. LinkedBlockingDeque

7.1 LinkedBlockingDeque的定义

LinkedBlockingDeque是一个基于链表的双向阻塞队列,它可以在队列的两端进行插入和删除操作,并且支持线程安全的并发访问。

7.2 LinkedBlockingDeque的优缺点

7.2.1 优点:
7.2.1.1. 线程安全:

LinkedBlockingDeque是线程安全的,多个线程可以同时对队列进行插入和删除操作,而不需要额外的同步机制。

7.2.1.2. 高性能:

LinkedBlockingDeque在并发环境下具有良好的性能。它使用了分离锁的机制,使得在多线程并发操作时可以有效地减少锁竞争,提高吞吐量。

7.2.1.3. 可以设置容量限制:

LinkedBlockingDeque可以指定容量限制,即队列中可以存放的最大元素数量。当队列达到容量限制时,后续的插入操作将会被阻塞,直到有空间可用。

7.2.1.4. 双向操作:

LinkedBlockingDeque支持双向操作,可以在队列的两端进行插入和删除操作。这使得它更加灵活,可以适用于更多的场景。

7.2.2 缺点:
7.2.2.1. 内存占用:

LinkedBlockingDeque是基于链表实现的,因此它在内存占用上可能比基于数组的阻塞队列更高。

7.2.2.2. 无界队列可能导致内存溢出:

如果没有设置容量限制,LinkedBlockingDeque可以无限制地增长。在某些情况下,如果生产者的速度大于消费者的速度,可能会导致队列无限增长,最终消耗掉所有可用的内存,导致内存溢出。

7.2.3 小结:

综上所述,LinkedBlockingDeque是一个高性能、线程安全且支持双向操作的阻塞队列。它适用于需要在多个线程之间进行高效、并发操作的场景。但是需要注意设置合理的容量限制,以避免内存溢出的风险。

7.3 LinkedBlockingDeque的基本使用

7.3.1 使用场景:
  1. 一个在生活中常见的使用场景是电影票售卖系统。假设电影院使用LinkedBlockingDeque作为双端队列来管理售票队列。

  2. 当顾客到达电影院购买电影票时,他们可以选择从队列的一端排队,也可以选择从队列的另一端排队。这样,售票员可以根据实际情况决定从哪一端开始售票。

  3. 当售票员售完一张电影票后,顾客可以从队列的另一端离开。这样,售票员可以方便地将已经购买完票的顾客从队列中移除,以便为下一位顾客腾出位置。

7.3.2 代码如下:
import java.util.concurrent.LinkedBlockingDeque;

public class MovieTicketSystem {
    private LinkedBlockingDeque<String> ticketQueue;

    public MovieTicketSystem() {
        ticketQueue = new LinkedBlockingDeque<>();
    }

    public void addCustomerToQueue(String customer) {
        ticketQueue.offerLast(customer);
        System.out.println("顾客 " + customer + " 加入队列");
    }

    public void sellTicket() {
        String customer = ticketQueue.pollFirst();
        if (customer != null) {
            System.out.println("售票给顾客 " + customer);
        } else {
            System.out.println("队列中没有顾客等待购票");
        }
    }

    public void removeCustomerFromQueue(String customer) {
        boolean removed = ticketQueue.remove(customer);
        if (removed) {
            System.out.println("顾客 " + customer + " 已离开队列");
        } else {
            System.out.println("队列中没有顾客 " + customer);
        }
    }

    public void displayQueue() {
        System.out.println("当前队列顺序:");
        for (String customer : ticketQueue) {
            System.out.println(customer);
        }
    }

    public static void main(String[] args) {
        MovieTicketSystem ticketSystem = new MovieTicketSystem();

        // 添加顾客到队列
        ticketSystem.addCustomerToQueue("顾客A");
        ticketSystem.addCustomerToQueue("顾客B");
        ticketSystem.addCustomerToQueue("顾客C");

        // 售票
        ticketSystem.sellTicket();
        ticketSystem.sellTicket();

        // 移除顾客
        ticketSystem.removeCustomerFromQueue("顾客B");

        // 显示队列
        ticketSystem.displayQueue();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
7.3.3 案例分析:

deque
这个例子创建了一个电影票售卖系统的类MovieTicketSystem,其中使用LinkedBlockingDeque作为队列来管理顾客的等待和购票过程。通过调用addCustomerToQueue方法将顾客添加到队列,调用sellTicket方法售票给队列中的顾客,调用removeCustomerFromQueue方法移除队列中的顾客,调用displayQueue方法显示当前队列的顺序。

在main方法中,我们创建了一个MovieTicketSystem对象,并模拟了添加顾客、售票、移除顾客和显示队列的操作。你可以根据需要修改和扩展这个例子,以适应实际的电影票售卖系统需求。

参考资料:Java并发编程的艺术

并发编程的艺术

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/968374
推荐阅读
相关标签
  

闽ICP备14008679号