赞
踩
java.util.concurrent.atomic
包下的原子类:AtomicBoolean
, AtomicInteger
, AtomicLong
: 这些原子类提供对基本数值类型(布尔、整型、长整型)的原子操作,如递增、递减、比较并交换(CAS)等,能够在不使用锁的情况下保证多线程环境下的线程安全性。java.util.concurrent.atomic
包下的原子类为Java并发编程提供了一组高效的、线程安全的原子操作类,它们主要用于在高并发环境下确保对共享变量的操作具有原子性,即在同一时刻只有一个线程能够修改这些变量的值,而不会出现因多线程并发导致的中间状态或数据竞争问题。这些原子类的使用可以替代传统的基于synchronized
关键字或Lock
接口的同步机制,通常具有更好的性能,尤其是在争用不严重或者仅需对单个变量进行原子操作的情况下。
以下是java.util.concurrent.atomic
包下一些常见原子类的介绍及其使用方式:
AtomicBoolean
AtomicBoolean
实例,通过get()
、set(boolean)
、compareAndSet(boolean, boolean)
等方法进行原子读写或条件更新。AtomicBoolean flag = new AtomicBoolean(false);
// 原子设置为true
flag.set(true);
// 原子读取
boolean currentValue = flag.get();
// 原子条件更新(只有当当前值为预期值时才更新)
boolean wasUpdated = flag.compareAndSet(false, true);
AtomicInteger
和 AtomicLong
int
)和长整型(long
)数值进行原子操作,包括增加、减少、比较并交换(CAS)等。get()
、set(int)
、incrementAndGet()
、decrementAndGet()
、addAndGet(int)
、compareAndSet(int, int)
等方法进行原子操作。AtomicInteger counter = new AtomicInteger(0);
// 原子增加并返回新值
int newValue = counter.incrementAndGet();
// 原子减少并返回新值
newValue = counter.decrementAndGet();
// 原子添加指定值并返回新值
newValue = counter.addAndGet(10);
// 原子条件更新(只有当当前值为预期值时才更新)
boolean updated = counter.compareAndSet(100, 200);
AtomicReference
AtomicReference<T>
实例,通过get()
、set(T)
、compareAndSet(T, T)
、weakCompareAndSet(T, T)
等方法进行原子读写或条件更新。AtomicReference<String> ref = new AtomicReference<>("initial value");
// 原子设置新值
ref.set("new value");
// 原子读取当前值
String currentValue = ref.get();
// 原子条件更新(只有当当前值为预期值时才更新)
boolean wasReplaced = ref.compareAndSet("old value", "new value");
AtomicIntegerArray
、AtomicLongArray
、AtomicReferenceArray
。AtomicIntegerArray array = new AtomicIntegerArray(10);
// 原子增加数组元素并返回新值
int newValue = array.incrementAndGet(3);
// 原子设置数组元素
array.set(5, .png);
// 原子条件更新数组元素(只有当当前值为预期值时才更新)
boolean wasUpdated = array.compareAndSet(2, oldVal, newVal);
AtomicIntegerFieldUpdater
、AtomicLongFieldUpdater
、AtomicReferenceFieldUpdater
。这些类允许通过反射来对指定类的非静态字段进行原子更新。updateAndGet()
、getAndSet()
、compareAndSet()
等方法进行原子操作。由于涉及反射,通常需要在类定义之外使用静态工厂方法创建更新器实例。public class MyClass {
volatile int myInt;
}
AtomicIntegerFieldUpdater<MyClass> updater = AtomicIntegerFieldUpdater.newUpdater(MyClass.class, "myInt");
MyClass instance = new MyClass();
// 原子增加字段值并返回新值
int newValue = updater.incrementAndGet(instance);
// 原子设置字段值
updater.set(instance, 100);
// 原子条件更新字段值(只有当当前值为预期值时才更新)
boolean wasUpdated = updater.compareAndSet(instance, oldValue, newValue);
总结来说,java.util.concurrent.atomic
包下的原子类提供了针对不同数据类型的原子操作能力,使得开发者可以在无需显式同步的情况下,编写出既高效又线程安全的代码。选择合适的原子类并正确使用其提供的方法,可以有效避免并发环境下的数据竞争和同步问题。
java.util.concurrent.CountDownLatch
:countDown()
方法使计数器减一。其他线程通过 await()
方法阻塞等待,直到计数器归零,表示所有任务线程已结束,等待线程才能继续执行。作用:
如何使用:
使用 CountDownLatch
通常遵循以下步骤:
在创建 CountDownLatch
对象时,传递一个整数作为参数,表示需要等待的工作线程数量或任务计数。
CountDownLatch latch = new CountDownLatch(n); // n 为需要等待的线程数或任务计数
每个完成任务的工作线程在任务完成后调用 countDown()
方法,通知 CountDownLatch
计数器减一。
latch.countDown();
主线程调用 await()
方法,该方法将阻塞直到计数器值变为零。这意味着主线程会等待所有工作线程完成任务并调用 countDown()
使得计数器递减到零。
latch.await(); // 可能抛出 InterruptedException
await()
方法可以抛出 InterruptedException
,因此通常需要在调用时捕获或声明抛出此异常。此外,await()
还有一个重载版本接受一个超时时间,如果在指定时间内计数器仍未归零,则会解除阻塞并返回,这有助于防止无限期等待。
try {
boolean isAllTasksDone = latch.await(timeout, TimeUnit.MILLISECONDS);
if (!isAllTasksDone) {
// 处理超时情况
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断标记
// 处理中断情况
}
下面是一个简单的示例,展示如何使用 CountDownLatch
让主线程等待三个子线程完成各自任务:
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { final int workerCount = 3; final CountDownLatch latch = new CountDownLatch(workerCount); for (int i = 0; i < workerCount; i++) { new Thread(() -> { // 执行子线程任务... System.out.println(Thread.currentThread().getName() + " finished its task."); // 任务完成后通知 CountDownLatch latch.countDown(); }, "Worker-" + (i + 1)).start(); } // 主线程等待所有子线程完成 latch.await(); System.out.println("All worker threads have finished. Main thread can continue now."); } }
在这个例子中,主线程创建了三个工作线程,并为它们设置了相同的 CountDownLatch
。每个工作线程在完成任务后调用 countDown()
,主线程通过调用 await()
阻塞,直到所有工作线程都完成任务并使计数器归零,此时主线程才会解除阻塞并继续执行。
java.util.concurrent.CyclicBarrier
:CountDownLatch
不同的是,它可以重复使用,当所有线程到达屏障时,计数器重置,线程可以再次参与到新的循环中。线程通过 await()
方法进入等待状态,直到所有参与者都到达。作用:
Runnable
),用于执行一些全局的初始化操作、统计结果、清理资源等。如何使用:
使用 CyclicBarrier
通常遵循以下步骤:
在创建 CyclicBarrier
对象时,传递一个整数作为参数,表示需要等待的线程数量。还可以传入一个可选的 Runnable
对象作为屏障动作。
CyclicBarrier barrier = new CyclicBarrier(parties, barrierAction); // parties 为参与线程数,barrierAction 为可选的屏障动作
每个参与同步的工作线程在执行到屏障点时调用 await()
方法,该方法会使线程阻塞,直到所有线程都调用了 await()
并达到屏障点。
try {
barrier.await(); // 可能抛出 BrokenBarrierException 或 InterruptedException
} catch (InterruptedException | BrokenBarrierException e) {
// 处理中断或屏障破坏的情况
}
如果在构造 CyclicBarrier
时指定了屏障动作(Runnable
),当最后一个线程到达屏障点时,系统会自动执行这个动作。屏障动作通常用于执行一些需要所有线程参与或依赖所有线程结果的操作。
一旦所有线程都到达屏障点,屏障会自动重置,等待下一轮线程再次同步。如此反复,直至应用程序不再需要同步。
以下是一个简单的示例,展示了如何使用 CyclicBarrier
让四个线程在每个迭代阶段都同步执行:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 4; CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> { System.out.println("All threads have reached the barrier. Starting next iteration..."); }); for (int i = 0; i < numberOfThreads; i++) { new Thread(() -> { for (int j = 0; j < 3; j++) { System.out.println(Thread.currentThread().getName() + " working on iteration " + j); try { // 在每次迭代结束时同步 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }, "Thread-" + (i + 1)).start(); } } }
在这个例子中,创建了一个有四个参与者的 CyclicBarrier
,并在屏障动作中打印一条消息表示所有线程已经到达屏障,即将开始下一迭代。每个线程在完成三次迭代工作后调用 await()
,等待其他线程也到达屏障。当所有线程都到达后,屏障动作执行,然后屏障重置,线程们继续进行下一次迭代,如此反复三次。
java.util.concurrent.Semaphore
:acquire()
方法获取许可证(如果无可用许可证则等待),release()
方法释放许可证。常用于限流或资源池场景。作用:
如何使用:
使用 Semaphore
通常遵循以下步骤:
在创建 Semaphore
对象时,传递一个整数作为参数,表示可用的许可(permit)数量。
Semaphore semaphore = new Semaphore(permits); // permits 为初始许可数量
当一个线程需要访问受保护的资源时,调用 acquire()
方法尝试获取一个许可。如果当前有可用的许可,线程将立即获得并继续执行;否则,线程将被阻塞,直到其他线程释放许可或有新的许可被添加。
try {
semaphore.acquire(); // 可能抛出 InterruptedException
// 在此处访问受保护的资源或执行临界区代码
} finally {
semaphore.release(); // 释放许可,使其他等待的线程有机会获取
}
当线程完成对资源的访问或退出临界区时,必须调用 release()
方法释放之前获取的许可,以便其他等待的线程可以继续执行。
Semaphore
还提供了非阻塞版本的 tryAcquire()
方法,以及带有超时的 tryAcquire(long timeout, TimeUnit unit)
方法,允许线程尝试获取许可而不必立即阻塞。如果无法立即获取许可,非阻塞方法将返回失败(false
),或者在超时后返回失败,不会阻塞线程。
下面是一个简单的示例,展示如何使用 Semaphore
限制同时访问共享资源的线程数量为 2:
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreExample { private static final Semaphore SEMAPHORE = new Semaphore(2); // 允许同时访问资源的线程数为 2 public static void main(String[] args) { for (int i = 0; i < 5; i++) { new Thread(() -> { try { SEMAPHORE.acquire(); // 获取许可 System.out.println(Thread.currentThread().getName() + " acquired a permit and is accessing the shared resource..."); simulateWork(3, TimeUnit.SECONDS); // 模拟工作,此处代表访问共享资源 System.out.println(Thread.currentThread().getName() + " has finished and is releasing the permit."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(Thread.currentThread().getName() + " was interrupted while waiting for a permit."); } finally { SEMAPHORE.release(); // 释放许可 } }, "Thread-" + (i + 1)).start(); } } private static void simulateWork(long duration, TimeUnit timeUnit) { try { timeUnit.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
在这个例子中,创建了一个初始许可数为 2 的 Semaphore
。五个线程试图访问共享资源,但每次只有两个线程能够成功获取许可并执行模拟工作。当一个线程完成工作并释放许可后,等待的线程才有机会继续执行。这样就实现了对共享资源并发访问的控制。
java.util.concurrent.Phaser
:CyclicBarrier
,但支持多个同步阶段和更精细的控制。java.util.concurrent.Phaser
是Java并发编程中的一种高级同步工具类,它提供了更为灵活的多阶段同步机制。Phaser
允许一组线程按照预定的阶段(phase)进行协作,每个阶段可以有多个参与线程。线程在每个阶段开始时注册,完成各自任务后注销,所有参与线程在当前阶段全部注销后,Phaser
进入下一个阶段。Phaser
支持动态注册新的参与线程,且可以循环使用,适用于复杂的多阶段并发任务协调场景。作用:
Phaser
可以重置并用于新一轮的多阶段同步。如何使用:
使用 Phaser
通常遵循以下步骤:
创建 Phaser
对象,可以选择是否指定初始参与线程数。如果不指定,默认为 0。
Phaser phaser = new Phaser(); // 默认初始参与线程数为 0
// 或者指定初始参与线程数
Phaser phaser = new Phaser(initialParties);
arriveAndAwaitAdvance()
或 register()
方法注册自己。arriveAndAwaitAdvance()
会立即返回并等待其他线程完成,而 register()
只是注册线程而不等待。phaser.arriveAndAwaitAdvance(); // 注册并等待所有线程完成当前阶段
// 或
phaser.register(); // 注册线程,不等待
arriveAndDeregister()
或 arrive()
方法注销自己。arriveAndDeregister()
注销并减少参与线程数,适用于一次性参与的线程;arrive()
只注销当前阶段,线程仍参与后续阶段。phaser.arriveAndDeregister(); // 注销并减少参与线程数
// 或
phaser.arrive(); // 注销当前阶段,线程仍参与后续阶段
getPhase()
查询当前阶段数。reset()
方法重置 Phaser
,使其回到初始状态,准备开始新一轮的多阶段同步。以下是一个简单的示例,展示如何使用 Phaser
协调三个阶段的多线程任务:
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) { Phaser phaser = new Phaser(3); // 3 个初始参与线程 for (int i = 0; i < 3; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " registered for phase 0."); phaser.arriveAndAwaitAdvance(); // 注册并等待阶段 0 开始 System.out.println(Thread.currentThread().getName() + " started phase 1 work..."); simulateWork(); // 模拟阶段 1 工作 phaser.arrive(); // 结束阶段 1 System.out.println(Thread.currentThread().getName() + " started phase 2 work..."); simulateWork(); // 模拟阶段 2 工作 phaser.arriveAndDeregister(); // 结束阶段 2 并从 Phaser 中注销 }, "Thread-" + (i + 1)).start(); } } private static void simulateWork() { try { Thread.sleep(1000); // 模拟工作耗时 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
在这个例子中,创建了一个初始参与线程数为 3 的 Phaser
。每个线程在开始时注册参与阶段 0,然后等待所有线程都到达。当所有线程都到达后,所有线程进入阶段 1,执行模拟工作并结束阶段 1。接着,所有线程进入阶段 2,再次执行模拟工作并结束阶段 2,同时从 Phaser
中注销自己。整个过程中,Phaser
有效地协调了线程在三个阶段的同步工作。
java.util.concurrent.Exchanger
:exchange()
方法时会阻塞,直到另一个线程也调用该方法,此时两个线程的数据会被交换。适用于需要在线程间进行数据交互的场景。java.util.concurrent.Exchanger
是Java并发编程中的一种同步工具类,它主要用于在两个线程之间实现数据的直接、同步交换。当两个线程都到达交换点时,它们各自携带的数据会被互换,确保线程间的通信和协作。
作用:
如何使用:
使用 Exchanger
通常遵循以下步骤:
创建 Exchanger
对象,不需要传递任何参数。
Exchanger<DataType> exchanger = new Exchanger<>();
每个参与交换的线程准备待交换的数据。
DataType dataToExchange = ...; // 准备要交换的数据
每个线程在其适当的位置调用 exchange()
方法,该方法会阻塞,直到另一个参与交换的线程也到达交换点。当两个线程都到达时,它们携带的数据会被互换。
DataType receivedData = exchanger.exchange(dataToExchange);
以下是一个简单的示例,展示如何使用 Exchanger
让两个线程交换整数:
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<Integer> exchanger = new Exchanger<>(); new Thread(() -> { int dataFromThread1 = 100; System.out.println("Thread 1: Preparing data " + dataFromThread1); try { Integer dataReceived = exchanger.exchange(dataFromThread1); System.out.println("Thread 1: Received data " + dataReceived); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Thread 1: Exchange interrupted."); } }, "Thread 1").start(); new Thread(() -> { int dataFromThread2 = 200; System.out.println("Thread 2: Preparing data " + dataFromThread2); try { Integer dataReceived = exchanger.exchange(dataFromThread2); System.out.println("Thread 2: Received data " + dataReceived); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Thread 2: Exchange interrupted."); } }, "Thread 2").start(); } }
在这个例子中,创建了一个 Exchanger
实例。两个线程分别准备了整数 100
和 200
,然后调用 exchange()
方法进行交换。当两个线程都到达交换点时,它们携带的数据会被互换,即线程1收到 200
,线程2收到 100
,并打印出交换结果。Exchanger
保证了两个线程的数据交换是同步且准确的。
java.util.concurrent.ThreadLocal
:ThreadLocal
变量副本,从而避免了线程间共享数据时的竞态条件。java.util.concurrent.ThreadLocal
是Java并发编程中的一种工具类,它为每个线程提供一个独立的变量副本,使得每个线程在访问 ThreadLocal
变量时,实际上操作的是自己本地内存中的副本,而不是共享的全局变量。这种设计确保了线程间数据的隔离,避免了线程安全问题,同时也简化了线程间数据传递的复杂度。
作用:
ThreadLocal
实例的 get()
和 set()
方法访问和修改其本地副本,无需额外的同步机制。如何使用:
使用 ThreadLocal
通常遵循以下步骤:
创建 ThreadLocal
对象,通常指定泛型类型以明确存储的变量类型。
ThreadLocal<DataType> threadLocal = new ThreadLocal<>();
在需要的线程中,通过 set()
方法设置其本地变量副本的值。
threadLocal.set(value); // 设置本地变量副本的值
在同一线程中,可以通过 get()
方法获取其本地变量副本的值。
DataType localValue = threadLocal.get(); // 获取本地变量副本的值
如果存储在 ThreadLocal
中的对象需要手动清理(如关闭数据库连接、释放资源等),可以覆盖 ThreadLocal
类的 remove()
方法,或者在应用的适当位置(如 finally
块、Thread.UncaughtExceptionHandler
、ExecutorService
的 ThreadFactory
等)调用 remove()
,以确保线程结束时清理资源。
threadLocal.remove(); // 清除当前线程的本地变量副本
以下是一个简单的示例,展示如何使用 ThreadLocal
存储并访问线程本地的字符串:
import java.util.concurrent.ThreadLocal; public class ThreadLocalExample { public static void main(String[] args) { ThreadLocal<String> threadLocal = new ThreadLocal<>(); Thread thread1 = new Thread(() -> { threadLocal.set("Thread 1 Value"); String value = threadLocal.get(); System.out.println("Thread 1: " + value); }); Thread thread2 = new Thread(() -> { threadLocal.set("Thread 2 Value"); String value = threadLocal.get(); System.out.println("Thread 2: " + value); }); thread1.start(); thread2.start(); try { thread1.join(); thread2.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Main thread interrupted."); } } }
在这个例子中,创建了一个 ThreadLocal<String>
实例。两个线程分别设置各自的本地字符串值,并通过 get()
方法获取和打印。尽管两个线程使用的是同一份 ThreadLocal
实例,但由于 ThreadLocal
的特性,它们各自操作的是独立的本地副本,因此输出结果互不影响,展示了线程间数据的隔离。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。