赞
踩
1、add(): add()方法在向队列添加元素时,如果队列已满,会抛出IllegalStateException异常。
offer(E e)
offer()方法在向队列添加元素时,如果队列已满,会返回false,并且不会抛出异常。
remove()方法在从队列头部移除元素时,如果队列为空,会抛出NoSuchElementException异常。
poll(): poll()方法在从队列头部移除元素时,如果队列为空,会返回null,并且不会抛出异常。
element()方法在查看队列头部元素时,如果队列为空,会抛出NoSuchElementException异常。
peek()方法在查看队列头部元素时,如果队列为空,会返回null,并且不会抛出异常。
两者区别主要在于处理满队列时的行为不同。如果在队列已满时,你不希望抛出异常,而是希望通过返回值来判断添加是否成功,则可以使用offer()方法。如果你希望在队列已满时抛出异常,则可以使用add()方法。
验证:AbstractQueue#add
public boolean add(E e) {
//调用add方法,如果添加成功,返回true,失败则抛出IllegalStateException
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
区别主要在于处理空队列时的行为不同。如果你在移除元素时希望在队列为空时抛出异常,则可以使用remove()方法。如果你希望在队列为空时返回null,则可以使用poll()方法。
验证:AbstractQueue#remove
public E remove() {
E x = poll();
//不为空时直接返回,为空时抛出异常NoSuchElementException
if (x != null)
return x;
else
throw new NoSuchElementException();
}
区别主要在于处理空队列时的行为不同。如果你在查看队列头部元素时希望在队列为空时抛出异常,则可以使用element()方法。如果你希望在队列为空时返回null,则可以使用peek()方法。
验证:AbstractQueue#element
public E element() {
E x = peek();
//不为空直接返回,为空抛出NoSuchElementException
if (x != null)
return x;
else
throw new NoSuchElementException();
}
put方法用于将元素放入队列中。如果队列已满,put()方法会阻塞当前线程,直到队列有空闲位置可放入元素。当队列有空闲位置时,put()方法会将元素放入队列,并立即返回。
take方法用于从队列中取出元素。如果队列为空,take()方法会阻塞当前线程,直到队列中有元素可供取出。当队列中有元素可取出时,take()方法会将队列头部的元素取出并返回。
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerExample { private static final int BUFFER_SIZE = 5; private static BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); public static void main(String[] args) { Thread producerThread = new Thread(new Producer()); Thread consumerThread = new Thread(new Consumer()); producerThread.start(); consumerThread.start(); } static class Producer implements Runnable { @Override public void run() { try { int value = 0; while (true) { buffer.put(value); System.out.println("生产者生产了:" + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { @Override public void run() { try { while (true) { int value = buffer.take(); System.out.println("消费者消费了:" + value); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
案例分析:
在这个示例中,我们创建了一个大小为5的阻塞队列作为缓冲区。生产者线程通过不断向队列中放入数据来模拟生产过程,消费者线程通过不断从队列中取出数据来模拟消费过程。当缓冲区已满时,生产者线程将被阻塞,直到有空间可用。当缓冲区为空时,消费者线程将被阻塞,直到有数据可用。
在生产者线程中,我们使用put方法将数据放入队列中,并打印相应的信息。在消费者线程中,我们使用take方法从队列中取出数据,并打印相应的信息。在生产者和消费者的run方法中,我们通过捕获InterruptedException来处理阻塞过程中可能出现的异常。
队列 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个有界阻塞队列 |
PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
ArrayBlockingQueue是Java中的一个阻塞队列实现,它在内部使用一个固定大小的数组来存储元素。它的特点是先进先出(FIFO)的顺序,即最先插入的元素会最先被取出。
ArrayBlockingQueue是一个有界队列,即在创建时需要指定队列的容量大小,一旦达到容量上限,后续的插入操作将被阻塞,直到队列中的元素被消费或者被移除。
ArrayBlockingQueue支持公平性策略,即按照线程的到达顺序来处理元素。当公平性设置为true时,线程会按照插入的顺序进行获取元素;当公平性设置为false时,线程获取元素的顺序是不确定的。
ArrayBlockingQueue提供了阻塞的插入和获取操作。当队列已满时,插入操作会被阻塞,直到有空间可用。当队列为空时,获取操作会被阻塞,直到队列中有元素可用。
ArrayBlockingQueue是线程安全的,多个线程可以同时对队列进行操作。它内部使用了锁和条件变量来保证线程安全性。
ArrayBlockingQueue的插入和获取操作可以设置一个超时时间,如果在指定的时间内仍然无法完成操作,那么操作将返回特定的结果。
总的来说,ArrayBlockingQueue是一个功能强大且线程安全的阻塞队列实现,适用于多线程环境下的生产者-消费者模式等场景,可以帮助处理并发操作的同步问题。
下面是一个简单的示例代码,展示如何使用ArrayBlockingQueue:
import java.util.concurrent.ArrayBlockingQueue; public class ArrayBlockingQueueDemo { public static void main(String[] args) { // 创建一个容量为3的ArrayBlockingQueue ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3); // 生产者线程 Thread producerThread = new Thread(() -> { try { // 向队列中插入元素 queue.put(1); System.out.println("Producer: 1 added to the queue"); Thread.sleep(1000); queue.put(2); System.out.println("Producer: 2 added to the queue"); Thread.sleep(1000); queue.put(3); System.out.println("Producer: 3 added to the queue"); Thread.sleep(1000); // 尝试插入第四个元素,会被阻塞 queue.put(4); System.out.println("Producer: 4 added to the queue"); } catch (InterruptedException e) { e.printStackTrace(); } }); // 消费者线程 Thread consumerThread = new Thread(() -> { try { Thread.sleep(3000); // 从队列中获取元素 int element1 = queue.take(); System.out.println("Consumer: " + element1 + " removed from the queue"); int element2 = queue.take(); System.out.println("Consumer: " + element2 + " removed from the queue"); int element3 = queue.take(); System.out.println("Consumer: " + element3 + " removed from the queue"); // 尝试获取第四个元素,会被阻塞 int element4 = queue.take(); System.out.println("Consumer: " + element4 + " removed from the queue"); } catch (InterruptedException e) { e.printStackTrace(); } }); // 启动生产者和消费者线程 producerThread.start(); consumerThread.start(); } }
在上面的示例中,我们创建了一个容量为3的ArrayBlockingQueue。生产者线程向队列中不断插入元素,并在插入后等待1秒钟。消费者线程在启动后等待3秒钟,然后从队列中不断获取元素。由于队列的容量是有限的,当队列满时,生产者线程会被阻塞,直到队列中有空间可用。同样地,当队列为空时,消费者线程也会被阻塞,直到队列中有元素可用。
运行上述代码,你将看到类似以下的输出:
Producer: 1 added to the queue
Producer: 2 added to the queue
Producer: 3 added to the queue
Consumer: 1 removed from the queue
Consumer: 2 removed from the queue
Consumer: 3 removed from the queue
Producer: 4 added to the queue
Consumer: 4 removed from the queue
通过使用itrs属性,ArrayBlockingQueue可以实现迭代器的快速失败机制,即在迭代器创建后,如果在迭代过程中队列发生结构性修改(例如添加或删除元素),迭代器会立即抛出ConcurrentModificationException异常,以保证迭代器的操作是安全和一致的。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion //通过在构造方法中使用锁,可以确保在构造过程中对共享变量(items)的修改对其他线程可见。这是因为锁的获取和释放都会导致主内存和工作内存之间的数据同步,保证了线程之间的可见性。 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
LinkedBlockingQueue是Java中的一个可选有界/无界阻塞队列实现,基于链表数据结构。
总结来说,LinkedBlockingQueue具有高效性能和灵活性,可以作为无界队列或有界队列使用,但需要注意内存占用和对公平性的限制。根据具体的应用场景和需求,可以选择合适的阻塞队列实现。
LinkedBlockingQueue是一个基于链表的有界或无界的阻塞队列。
下面是一个使用LinkedBlockingQueue的Demo,其中体现了有界和无界的特点:
import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { // 使用无界的LinkedBlockingQueue LinkedBlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>(); System.out.println("无界队列:"); // 生产者线程 Thread producer1 = new Thread(() -> { try { for (int i = 1; i <= 5; i++) { unboundedQueue.put(i); System.out.println("生产者1:生产了" + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); // 消费者线程 Thread consumer1 = new Thread(() -> { try { while (true) { int value = unboundedQueue.take(); System.out.println("消费者1:消费了" + value); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producer1.start(); consumer1.start(); Thread.sleep(10000); // 使用有界的LinkedBlockingQueue LinkedBlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(3); System.out.println("有界队列:"); // 生产者线程 Thread producer2 = new Thread(() -> { try { for (int i = 1; i <= 5; i++) { boundedQueue.put(i); System.out.println("生产者2:生产了" + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); // 消费者线程 Thread consumer2 = new Thread(() -> { try { while (true) { int value = boundedQueue.take(); System.out.println("消费者2:消费了" + value); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producer2.start(); consumer2.start(); } }
在上面的代码中,首先创建了一个无界的LinkedBlockingQueue unboundedQueue,然后创建了一个生产者线程和一个消费者线程。生产者线程每隔1秒向队列中添加一个元素,消费者线程每隔2秒从队列中取出一个元素。由于LinkedBlockingQueue是无界的,所以生产者线程可以一直向队列中添加元素,消费者线程也可以一直从队列中取出元素。
接着,创建了一个有界的LinkedBlockingQueue boundedQueue,并创建了另一个生产者线程和消费者线程。有界队列的容量为3,即最多只能容纳3个元素。生产者线程每隔1秒向队列中添加一个元素,消费者线程每隔2秒从队列中取出一个元素。当队列已满时,生产者线程会被阻塞,直到有空间可以添加新元素。当队列为空时,消费者线程会被阻塞,直到有元素可以取出。
通过运行以上代码,可以观察到无界队列中生产者线程可以一直生产元素,消费者线程可以一直消费元素;而有界队列中生产者线程在队列满时会被阻塞,消费者线程在队列空时会被阻塞,体现了有界队列的特点。
LinkedBlockingQueue类是Java中的一个阻塞队列实现,它具有以下属性:
即队列中最多可以容纳的元素数量。该属性决定了队列的最大长度。
每当有元素被添加到队列中或从队列中取出时,count属性会相应地增加或减少。
通过head节点,可以快速获取队列的第一个元素。
通过last节点,可以快速获取队列的最后一个元素。
使用该锁可以确保在多线程环境下,只有一个线程可以进行取出操作。
当队列中有元素时,该条件变量会被唤醒,以通知等待中的线程可以进行取出操作。
使用该锁可以确保在多线程环境下,只有一个线程可以进行添加操作。
当队列中有空间可以容纳新的元素时,该条件变量会被唤醒,以通知等待中的线程可以进行添加操作。
这些属性在实现LinkedBlockingQueue的功能中起到了重要的作用。capacity属性定义了队列的容量,count属性跟踪队列中的元素数量,head和last属性指示队列的头部和尾部位置。takeLock和putLock分别用于互斥地保护取出和添加操作,notEmpty和notFull用于线程间的等待和通知机制,以确保队列的一致性和线程安全性。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
LinkedBlockingQueue类提供了三个构造方法,它们的作用如下:
这是LinkedBlockingQueue类的默认构造方法,创建一个无界队列。无界队列的容量可以无限增长,理论上可以一直添加元素而不会阻塞。
这个构造方法创建了一个有界队列,容量由参数capacity指定。有界队列的容量是固定的,一旦达到容量限制,后续的添加操作会被阻塞。
这个构造方法创建了一个包含指定集合c中所有元素的队列。元素按照迭代器返回的顺序依次添加到队列中。如果集合c是null,则会抛出NullPointerException。
这三个构造方法提供了不同的方式来创建LinkedBlockingQueue对象,以满足不同的需求。无界队列适用于需要不限制队列长度的场景,而有界队列适用于需要限制队列长度的场景。通过指定初始集合,可以快速初始化一个队列并包含指定的元素。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
PriorityBlockingQueue是一个线程安全的并发队列,它继承自BlockingQueue接口,实现了一个基于优先级的无界阻塞队列。它的特点是当多个线程同时插入元素时,会按照元素的优先级进行排序,并且可以保证在获取元素时,总是返回优先级最高的元素。
PriorityBlockingQueue使用了可重入锁来实现线程安全性,并且使用了条件变量来实现线程间的等待和通知机制。它的内部是通过一个堆数据结构来存储元素,并且在插入和删除元素时会自动进行堆调整,以保证堆的性质。
PriorityBlockingQueue的插入操作和删除操作的时间复杂度都是O(logN),其中N为队列的大小。它提供了一系列的方法来插入、删除和获取元素,包括put、take、offer、poll等。
需要注意的是,PriorityBlockingQueue的元素必须实现Comparable接口,或者在构造PriorityBlockingQueue时提供一个Comparator对象来指定元素的优先级比较规则。如果元素不具备比较性,那么在插入和删除元素时可能会抛出ClassCastException异常。
总之,PriorityBlockingQueue是一个线程安全的、基于优先级的无界阻塞队列,它可以在多线程环境下安全地进行元素的插入、删除和获取操作,且保证了获取元素时总是返回优先级最高的元素。
PriorityBlockingQueue是线程安全的,多个线程可以同时操作PriorityBlockingQueue而不会出现数据不一致的问题。
PriorityBlockingQueue会根据元素的优先级进行排序,保证在获取元素时总是返回优先级最高的元素。这对于需要按照优先级进行处理的场景非常有用。
PriorityBlockingQueue是一个无界队列,可以存储任意数量的元素。这在需要处理大量元素的场景下非常有优势,不会出现队列溢出的情况。
由于PriorityBlockingQueue需要实时地对元素进行排序和调整堆,插入和删除元素的性能开销相对较大。这可能导致在高并发环境下性能下降。
PriorityBlockingQueue要求元素必须实现Comparable接口,或者在构造PriorityBlockingQueue时提供一个Comparator对象来指定元素的优先级比较规则。这对于一些元素类型来说可能是一个额外的工作量。
在医院的急诊科室中,病人的病情需要按照优先级进行处理,例如心脏病、中风等需要优先处理。可以使用PriorityBlockingQueue来管理急诊病人的处理顺序,根据病人的优先级进行排序,保证高优先级的病人先得到医疗救治。
首先,我们需要定义一个Patient类,用于表示病人的信息,包括姓名和病情优先级:
public class Patient implements Comparable<Patient> { private String name; private int priority; public Patient(String name, int priority) { this.name = name; this.priority = priority; } public String getName() { return name; } public int getPriority() { return priority; } @Override public int compareTo(Patient other) { // 比较病情优先级,优先级高的排在前面 return Integer.compare(other.priority, this.priority); } }
然后,我们可以创建一个PriorityBlockingQueue来管理急诊病人的处理顺序:
PriorityBlockingQueue<Patient> emergencyRoom = new PriorityBlockingQueue<>();
// 添加病人到急诊科室
emergencyRoom.add(new Patient("John", 1)); // 心脏病,优先级为1
emergencyRoom.add(new Patient("Emily", 2)); // 中风,优先级为2
emergencyRoom.add(new Patient("David", 3)); // 普通病情,优先级为3
// 处理病人
while (!emergencyRoom.isEmpty()) {
Patient patient = emergencyRoom.poll();
System.out.println("处理病人:" + patient.getName());
// 进行相应的医疗救治操作
}
DelayQueue是Java中的一个实现了BlockingQueue接口的特殊队列,它是一个无界队列,用于存储具有过期时间的元素。在DelayQueue中,元素必须实现Delayed接口,该接口定义了元素的过期时间以及与其他元素的比较方法。
DelayQueue是一个按照元素的过期时间进行排序的队列,即最先过期的元素会被放在队列的头部,最晚过期的元素会被放在队列的尾部。当从DelayQueue中获取元素时,只有在元素的过期时间到达后,该元素才能被取出。
DelayQueue允许将任务按照延迟时间进行排序,保证了任务按照预定的时间顺序执行。这对于需要在特定时间执行任务的场景非常有用,比如定时任务、延迟队列等。
DelayQueue是线程安全的,并发环境下多个线程可以安全地读取和操作其中的元素,不需要使用额外的同步机制。
DelayQueue的内部实现基于优先级队列(PriorityQueue),具有高效的插入和删除操作。当元素被插入到队列中时,会根据元素的延迟时间进行排序,保证了队列中的元素始终是有序的。
由于DelayQueue是基于优先级队列实现的,它不支持随机访问元素。只能按照队列的顺序依次取出元素,不能直接访问指定位置的元素。
一旦元素被添加到DelayQueue中,就不能再修改其延迟时间。如果需要修改延迟时间,只能先将元素从队列中移除,然后再重新添加。
DelayQueue需要维护一个优先级队列,并且每个元素都会占用一定的内存空间。如果队列中的元素非常多,可能会导致内存占用较大。
DelayQueue是一个方便的并发集合类,适用于需要按照延迟时间排序的场景。它提供了可靠的延迟任务调度机制,是实现定时任务和延迟队列的理想选择。然而,由于其不支持随机访问和修改元素的延迟时间,以及可能导致较大的内存占用,需要根据具体场景来选择使用。
在电商平台中,当用户下单后,可以将订单信息加入到DelayQueue中,设置一个合理的延迟时间。如果在延迟时间内用户未支付,系统可以自动取消订单。
下面是使用DelayQueue实现订单超时取消的Java代码示例:
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; // 订单类 class Order implements Delayed { private String orderId; // 订单号 private long timeout; // 超时时间戳 public Order(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout; } public String getOrderId() { return orderId; } // 计算剩余时间 @Override public long getDelay(TimeUnit unit) { return unit.convert(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 比较器,用于排序 @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } } // 订单管理类 class OrderManager { private DelayQueue<Order> delayQueue = new DelayQueue<>(); // 创建订单 public void createOrder(String orderId, long timeout) { Order order = new Order(orderId, System.currentTimeMillis() + timeout); delayQueue.put(order); System.out.println("订单 " + orderId + " 创建成功"); } // 取消订单 public void cancelOrder(String orderId) { for (Order order : delayQueue) { if (order.getOrderId().equals(orderId)) { delayQueue.remove(order); System.out.println("订单 " + orderId + " 取消成功"); return; } } System.out.println("订单 " + orderId + " 不存在"); } // 启动处理超时订单的线程 public void startCancelThread() { Thread cancelThread = new Thread(() -> { while (true) { try { Order order = delayQueue.take(); // 从DelayQueue中取出超时订单 System.out.println("订单 " + order.getOrderId() + " 超时,自动取消"); } catch (InterruptedException e) { e.printStackTrace(); } } }); cancelThread.start(); } } public class Main { public static void main(String[] args) { OrderManager orderManager = new OrderManager(); orderManager.startCancelThread(); // 创建订单 orderManager.createOrder("1001", 5000); orderManager.createOrder("1002", 3000); orderManager.createOrder("1003", 7000); // 取消订单 orderManager.cancelOrder("1002"); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } } }
从输出结果中可以看到,订单1001在5000毫秒后被取消,订单1003在7000毫秒后被取消。这表明订单超时取消功能成功实现。
SynchronousQueue是Java并发包中提供的一种特殊类型的阻塞队列。它是一种没有容量的队列,每个插入操作必须等待另一个线程的相应移除操作,反之亦然。
简单来说,SynchronousQueue是一种用于线程间同步通信的队列。它的特点是在插入元素时,如果没有其他线程正在等待移除元素,插入操作将会阻塞,直到有其他线程移除该元素;而在移除元素时,如果没有其他线程正在等待插入元素,移除操作也会被阻塞,直到有其他线程插入元素。
由于SynchronousQueue没有容量,所以它并不存储元素。相反,它只是在插入和移除操作之间传递元素。这使得SynchronousQueue非常适合于一些需要线程间直接传递数据的场景,例如线程池的任务分发等。
SynchronousQueue在插入和移除之间直接传递元素,避了使用中间缓冲,从而提供了效的线程间通信制。
SynchronousQueue没有实际存储元素,因此它可以支持非常高的并发度,多个线程可以同时插入和移除元素。
SynchronousQueue的容量为0,意味着它不能保存任何元素。这可能会限制一些需要缓冲区的场景,例如生产者-消费者模型中的解耦。
由于SynchronousQueue的特性,插入和移除操作都可能会导致线程阻塞。如果没有其他线程等待相反的操作,插入操作将会阻塞直到有其他线程移除元素,移除操作也会阻塞直到有其他线程插入元素。这可能会导致程序的复杂性增加。
由于SynchronousQueue需要一对线程进行插入和移除操作,因此需要额外的线程协调来保证操作的成功。如果某个线程插入元素而没有其他线程移除,或者某个线程移除元素而没有其他线程插入,那么可能会导致线程阻塞,甚至死锁。
综上所述,SynchronousQueue适用于需要高效的线程间同步通信的场景,但在一些需要缓冲区或者简单的生产者-消费者模型中可能不太适用。使用时需要谨慎考虑其特性和对线程的要求。
一个贴切的生活实际案例是,假设有一个餐馆的厨房和服务员之间需要进行实时的菜品传递。当厨师烹饪完成一道菜品后,他会将菜品放入一个SynchronousQueue中。服务员会在这个队列中等待,一旦有菜品放入队列,服务员就会立即取出菜品并将其送往对应的餐桌。
在这个案例中,SynchronousQueue充当了一个中转站的角色,确保了厨师和服务员之间的菜品传递是同步的。当厨师烹饪完成菜品时,他必须等待菜品被取走后才能继续烹饪下一道菜品,而服务员在没有菜品可供上菜时也必须等待。
这种实时的菜品传递机制可以有效地避免厨师和服务员之间的菜品堆积和混乱,确保了菜品的及时送达,提高了餐馆的服务效率和顾客的用餐体验。
import java.util.concurrent.SynchronousQueue; class Chef implements Runnable { private SynchronousQueue<String> queue; public Chef(SynchronousQueue<String> queue) { this.queue = queue; } @Override public void run() { try { // 模拟烹饪过程 Thread.sleep(2000); String dish = "烤鸭"; System.out.println("厨师烹饪完成:" + dish); queue.put(dish); // 将菜品放入队列 } catch (InterruptedException e) { e.printStackTrace(); } } } class Waiter implements Runnable { private SynchronousQueue<String> queue; public Waiter(SynchronousQueue<String> queue) { this.queue = queue; } @Override public void run() { try { System.out.println("等待菜品..."); String dish = queue.take(); // 从队列中取出菜品 System.out.println("服务员取出菜品:" + dish); System.out.println("菜品送到餐桌"); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Restaurant { public static void main(String[] args) { SynchronousQueue<String> queue = new SynchronousQueue<>(); Thread chefThread = new Thread(new Chef(queue)); Thread waiterThread = new Thread(new Waiter(queue)); chefThread.start(); waiterThread.start(); } }
LinkedTransferQueue是Java并发包中的一个类,它是一个基于链表的无界阻塞队列,实现了TransferQueue接口。LinkedTransferQueue可以用于在线程间传递元素,它提供了一种特殊的操作,即如果当前有等待的消费者线程,就直接将元素传递给消费者线程,否则将元素添加到队列中等待消费。
LinkedTransferQueue使用链表结构来保存元素,插入和删除操作效率较高。
LinkedTransferQueue没有容量限制,可以根据需要动态地增加元素,可以持续地接收元素而不会阻塞生产者线程。
LinkedTransferQueue提供了多种阻塞操作,可以在元素不可用时阻塞线程,直到元素可用或等待超时。
LinkedTransferQueue支持优先级传递,可以将元素传递给等待时间最长的消费者线程,减少等待时间。
LinkedTransferQueue可以选择公平或非公平的访问策略,可以控制元素的传递顺序。
LinkedTransferQueue提供了tryTransfer()方法,可以尝试立即将元素传递给消费者线程,避免生产者线程的阻塞。
LinkedTransferQueue使用链表来保存元素,相比于使用数组,链表需要更多的内存空间。
LinkedTransferQueue的操作相对复杂,需要考虑到阻塞和传递的逻辑,使用不当可能会导致线程死锁或性能下降。
由于LinkedTransferQueue是基于链表的,对于高吞吐量的场景,可能存在较多的节点分配和释放操作,导致性能下降。
LinkedTransferQueue在多线程环境下可能存在竞争,需要合理地处理竞争条件,避免出现数据不一致或线程安全问题。
综上所述,LinkedTransferQueue具有高效的传递和阻塞操作,适用于需要灵活调整容量和优先级传递的场景。但在一些特定的场景下,如对内存占用和吞吐量要求较高的情况下,可能不适合使用。
import java.util.concurrent.LinkedTransferQueue; public class LinkedTransferQueueDemo { public static void main(String[] args) { LinkedTransferQueue<String> queue = new LinkedTransferQueue<>(); // 生产者线程 Thread producerThread = new Thread(() -> { try { // 生产一个元素并立即传递给消费者线程 queue.transfer("Hello"); System.out.println("Producer: Element transferred"); } catch (InterruptedException e) { e.printStackTrace(); } }); // 消费者线程 Thread consumerThread = new Thread(() -> { try { // 等待接收元素 String element = queue.take(); System.out.println("Consumer: Element received - " + element); } catch (InterruptedException e) { e.printStackTrace(); } }); // 启动生产者和消费者线程 producerThread.start(); consumerThread.start(); // 等待线程执行完毕 try { producerThread.join(); consumerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
以上示例创建了一个LinkedTransferQueue,并通过transfer()方法在生产者线程中传递一个元素给消费者线程。消费者线程使用take()方法等待接收元素,一旦生产者线程传递了元素,消费者线程将会接收到并打印输出。
LinkedBlockingDeque是一个基于链表的双向阻塞队列,它可以在队列的两端进行插入和删除操作,并且支持线程安全的并发访问。
LinkedBlockingDeque是线程安全的,多个线程可以同时对队列进行插入和删除操作,而不需要额外的同步机制。
LinkedBlockingDeque在并发环境下具有良好的性能。它使用了分离锁的机制,使得在多线程并发操作时可以有效地减少锁竞争,提高吞吐量。
LinkedBlockingDeque可以指定容量限制,即队列中可以存放的最大元素数量。当队列达到容量限制时,后续的插入操作将会被阻塞,直到有空间可用。
LinkedBlockingDeque支持双向操作,可以在队列的两端进行插入和删除操作。这使得它更加灵活,可以适用于更多的场景。
LinkedBlockingDeque是基于链表实现的,因此它在内存占用上可能比基于数组的阻塞队列更高。
如果没有设置容量限制,LinkedBlockingDeque可以无限制地增长。在某些情况下,如果生产者的速度大于消费者的速度,可能会导致队列无限增长,最终消耗掉所有可用的内存,导致内存溢出。
综上所述,LinkedBlockingDeque是一个高性能、线程安全且支持双向操作的阻塞队列。它适用于需要在多个线程之间进行高效、并发操作的场景。但是需要注意设置合理的容量限制,以避免内存溢出的风险。
一个在生活中常见的使用场景是电影票售卖系统。假设电影院使用LinkedBlockingDeque作为双端队列来管理售票队列。
当顾客到达电影院购买电影票时,他们可以选择从队列的一端排队,也可以选择从队列的另一端排队。这样,售票员可以根据实际情况决定从哪一端开始售票。
当售票员售完一张电影票后,顾客可以从队列的另一端离开。这样,售票员可以方便地将已经购买完票的顾客从队列中移除,以便为下一位顾客腾出位置。
import java.util.concurrent.LinkedBlockingDeque; public class MovieTicketSystem { private LinkedBlockingDeque<String> ticketQueue; public MovieTicketSystem() { ticketQueue = new LinkedBlockingDeque<>(); } public void addCustomerToQueue(String customer) { ticketQueue.offerLast(customer); System.out.println("顾客 " + customer + " 加入队列"); } public void sellTicket() { String customer = ticketQueue.pollFirst(); if (customer != null) { System.out.println("售票给顾客 " + customer); } else { System.out.println("队列中没有顾客等待购票"); } } public void removeCustomerFromQueue(String customer) { boolean removed = ticketQueue.remove(customer); if (removed) { System.out.println("顾客 " + customer + " 已离开队列"); } else { System.out.println("队列中没有顾客 " + customer); } } public void displayQueue() { System.out.println("当前队列顺序:"); for (String customer : ticketQueue) { System.out.println(customer); } } public static void main(String[] args) { MovieTicketSystem ticketSystem = new MovieTicketSystem(); // 添加顾客到队列 ticketSystem.addCustomerToQueue("顾客A"); ticketSystem.addCustomerToQueue("顾客B"); ticketSystem.addCustomerToQueue("顾客C"); // 售票 ticketSystem.sellTicket(); ticketSystem.sellTicket(); // 移除顾客 ticketSystem.removeCustomerFromQueue("顾客B"); // 显示队列 ticketSystem.displayQueue(); } }
这个例子创建了一个电影票售卖系统的类MovieTicketSystem,其中使用LinkedBlockingDeque作为队列来管理顾客的等待和购票过程。通过调用addCustomerToQueue方法将顾客添加到队列,调用sellTicket方法售票给队列中的顾客,调用removeCustomerFromQueue方法移除队列中的顾客,调用displayQueue方法显示当前队列的顺序。
在main方法中,我们创建了一个MovieTicketSystem对象,并模拟了添加顾客、售票、移除顾客和显示队列的操作。你可以根据需要修改和扩展这个例子,以适应实际的电影票售卖系统需求。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。