赞
踩
目录
21.1.3.4.1. 创建FixedThreadPool线程池:
21.1.3.4.2. 创建CachedThreadPool线程池:
21.1.3.4.3. 创建ScheduledThreadPool线程池:
二十一 . Java并发包提供了哪些并发工具类
在前面的篇幅中我们进行了,线程,锁,等并发编程的基础元素的讲解。
我们通常所说的并发包也就是 java.util.concurrent 及其子包,集中了 Java 并发的各种基础
工具类。
具体主要包括几个方面:
提供了比 synchronized 更加高级的各种同步结构,包括 CountDownLatch、
CyclicBarrier、Semaphore 等,可以实现更加丰富的多线程操作,比如利用Semap作为资源控制器,限制同时进行工作的线程数量。
创建一个CountDownLatch对象,并将其计数器初始化为3:
CountDownLatch countDownLatch = new CountDownLatch(3);
创建一个InterfaceRequestThread类,继承自Thread类,用于发起接口请求并倒计时:
- public class InterfaceRequestThread extends Thread {
- private final CountDownLatch countDownLatch;
-
- public InterfaceRequestThread(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
- @Override
- public void run() {
- // 发起接口请求操作
- System.out.println("请求接口数据...");
- try {
- Thread.sleep(2000); // 模拟接口请求耗时
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("接口数据请求完成。");
-
- countDownLatch.countDown(); // 计数器减一
- }
- }
主线程中创建多个InterfaceRequestThread线程,并启动它们:
- // 创建CountDownLatch对象
- CountDownLatch countDownLatch = new CountDownLatch(3);
-
- // 创建三个InterfaceRequestThread线程,并启动它们
- for (int i = 0; i < 3; i++) {
- Thread thread = new InterfaceRequestThread(countDownLatch);
- thread.start();
- }
在主线程中调用await()方法等待计数器达到零,然后进行下一步处理:
- try {
- countDownLatch.await(); // 等待计数器达到零
- System.out.println("所有接口数据请求完成,开始进行下一步处理...");
- // 进行下一步操作,例如数据处理等
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
运行时,会观察到每个InterfaceRequestThread线程都会发起接口请求,并在请求完成后将CountDownLatch的计数器减一。当所有线程的请求都完成后,主线程就会从await()方法中返回,并输出"所有接口数据请求完成,开始进行下一步处理...",表示可以开始下一步操作了。
通过CountDownLatch,我们实现了多个线程之间的协作与同步,确保所有接口数据都请求完毕后再进行下一步操作。这在需要等待其他线程完成某项操作后再继续执行的场景中非常有用。
CyclicBarrier
是 Java 提供的一种同步工具类,它可以用于多个线程之间相互等待,直到所有线程都达到一个共同的屏障点,然后同时继续执行。
CyclicBarrier
的主要特点如下:
await()
方法,此时该线程会被阻塞,直到所有线程都调用了 await()
方法。await()
方法后,所有线程会同时被释放,可以继续执行后续操作。CyclicBarrier
的计数器可以重置,因此它可以被重复使用。- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
-
- public class CyclicBarrierExample {
- public static void main(String[] args) {
- int numberOfThreads = 3;
- Runnable barrierAction = () -> System.out.println("All threads have reached the barrier");
-
- CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction);
-
- for (int i = 0; i < numberOfThreads; i++) {
- Thread thread = new Thread(new Worker(barrier));
- thread.start();
- }
- }
-
- static class Worker implements Runnable {
- private final CyclicBarrier barrier;
-
- Worker(CyclicBarrier barrier) {
- this.barrier = barrier;
- }
-
- @Override
- public void run() {
- try {
- System.out.println("Thread started");
- // 模拟线程的工作
- Thread.sleep(1000);
- System.out.println("Thread finished work and waiting at the barrier");
- barrier.await();
- System.out.println("Thread released from the barrier and continuing its work");
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
- }
- Thread started
- Thread finished work and waiting at the barrier
- Thread started
- Thread finished work and waiting at the barrier
- Thread started
- Thread finished work and waiting at the barrier
- All threads have reached the barrier
- Thread released from the barrier and continuing its work
- Thread released from the barrier and continuing its work
- Thread released from the barrier and continuing its work
我们创建了一个 CyclicBarrier
对象,并指定等待的线程数量为 3,同时传入一个 barrierAction
,当所有线程都达到屏障点时,会执行该 barrierAction
。
每个线程在 run()
方法中模拟一些工作,并调用 await()
方法等待其他线程。当所有线程都调用了 await()
方法后,它们会同时被释放,继续执行后续操作。
在输出结果中,可以看到三个线程同时执行到了屏障点,并输出了 "All threads have reached the barrier" 的信息,然后它们继续执行剩余的工作。
通过使用 CyclicBarrier
,可以实现多个线程之间的同步,让它们在某个点上进行协调,等待彼此达到同一个状态,然后再继续执行后续操作。这在需要线程间相互等待的场景下非常有用,例如并行计算中将计算结果合并,多个线程同时开始某个任务等等。
屏障点是CyclicBarrier
对象的构造函数中指定的等待线程数量达到了设定值。也就是说,当3个线程都调用了await()
方法后,它们会在CyclicBarrier
对象的屏障点处等待。
具体来说,在该例子中,CyclicBarrier
对象的构造函数参数为3,表示需要等待3个线程。当第一个线程调用await()
方法时,它会被阻塞,直到所有3个线程都调用了await()
方法。当第二个和第三个线程都调用了await()
方法后,所有线程都达到了屏障点并等待。
一旦所有线程都达到屏障点,屏障点就会打开,所有线程都会被释放并继续执行后续操作。
Semaphore
(信号量)是一种并发控制机制,用于限制同时访问某个共享资源的线程数量。它可以用于控制对临界区的访问,或者限制对有限资源的并发访问。
Semaphore
主要有两个操作:acquire()
和 release()
。
acquire()
: 当一个线程需要访问被 Semaphore
保护的资源时,它调用 acquire()
方法进行获取信号量。如果信号量计数器大于0,该线程可以继续执行,同时信号量计数器减1。如果信号量计数器为0,那么该线程将被阻塞,直到有其他线程调用 release()
方法释放信号量。release()
: 当一个线程访问完被 Semaphore
保护的资源后,它调用 release()
方法释放信号量。该操作会使信号量计数器增加1,并且唤醒一个或多个被阻塞的线程,允许它们继续执行。Semaphore
的计数器可以初始化为一个非负整数。如果计数器初始化为1,那么 Semaphore
就变成了互斥锁的功能,也被称为二元信号量。
Semaphore
例子:- import java.util.concurrent.Semaphore;
-
- public class SemaphoreExample {
- public static void main(String[] args) {
- int numberOfPermits = 3; // 允许同时访问资源的线程数量
- Semaphore semaphore = new Semaphore(numberOfPermits);
-
- for (int i = 0; i < 5; i++) {
- Thread thread = new Thread(new Worker(semaphore));
- thread.start();
- }
- }
-
- static class Worker implements Runnable {
- private final Semaphore semaphore;
-
- Worker(Semaphore semaphore) {
- this.semaphore = semaphore;
- }
-
- @Override
- public void run() {
- try {
- System.out.println("Thread acquiring semaphore");
- semaphore.acquire();
- System.out.println("Thread acquired semaphore and performing its task");
- Thread.sleep(2000);
- System.out.println("Thread released semaphore");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- Thread acquiring semaphore
- Thread acquiring semaphore
- Thread acquiring semaphore
- Thread acquired semaphore and performing its task
- Thread acquired semaphore and performing its task
- Thread acquired semaphore and performing its task
- Thread releasing semaphore
- Thread releasing semaphore
- Thread releasing semaphore
我们创建了一个 Semaphore
对象,并指定允许同时访问资源的线程数量为3。然后,我们创建了5个线程,并在每个线程中进行一些任务。
每个线程在运行时首先调用 acquire()
方法来获取信号量。如果有可用的许可证(计数器大于0),线程将继续执行,并且信号量计数器减1。如果没有可用的许可证(计数器为0),线程将被阻塞,直到有其他线程释放信号量。
在线程执行完任务后,它调用 release()
方法来释放信号量,使得其他等待的线程能够继续执行。
需要注意的是,在获取和释放信号量时要确保正确使用,以免导致死锁或访问冲突的问题。因此,在编写使用 Semaphore
的程序时,需要仔细设计和管理信号量的获取和释放操作,以确保线程间的正确协调和资源的正确使用。
各种线程安全的容器,比如常见的ConcurrentHashMap,有序的ConcurrentSkipListMap,或者通过类似快照机制,实现线程安全的动态数组CopyOnWriteArrayList。
以及并发队列实现,如:BlockingQueue实现,比较典型的ArrayBlockingQueue,SynchronousQueue或对特定场景的PriorityBlockingQueue等这些在前面的篇幅中有详细解释。
强大的Executor框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况,不需要自己从头进行实现线程池和任务调度器。
需要处理多个并发任务时,可以使用Executor框架来创建不同类型的线程池和调度任务运行。这样可以大大简化多线程编程的难度,并且可以有效地管理线程资源。下面是几个使用Executor框架创建线程池的实例:
FixedThreadPool线程池是一种固定大小的线程池,它能够同时执行指定数量的任务,如果有新任务提交时,它会将其放入任务队列等待执行。以下代码展示了如何创建一个FixedThreadPool线程池:
- ExecutorService executor = Executors.newFixedThreadPool(4); // 创建一个大小为4的线程池
- executor.execute(new MyTask()); // 提交任务至线程池
例子:
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class Main {
-
- public static void main(String[] args) {
- // 创建线程池
- ExecutorService executor = Executors.newFixedThreadPool(4); // 创建一个大小为4的线程池
-
- // 提交任务至线程池
- for (int i = 1; i <= 10; i++) {
- MyTask task = new MyTask(i);
- executor.execute(task); // 提交任务至线程池
- }
-
- // 关闭线程池
- executor.shutdown();
- }
- }
-
- class MyTask implements Runnable {
- private int taskId;
-
- public MyTask(int taskId) {
- this.taskId = taskId;
- }
-
- @Override
- public void run() {
- System.out.println("Task " + taskId + " is running."); // 执行具体的任务逻辑
- try {
- Thread.sleep(1000); // 模拟任务执行时间
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Task " + taskId + " is complete.");
- }
- }
我们创建了一个大小为4的FixedThreadPool线程池,并提交了10个任务到线程池中。
每个任务由MyTask
类表示,该类实现了Runnable
接口,并重写了run
方法。在run
方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印出任务标识,并模拟了一秒钟的执行时间。
通过循环提交多个任务到线程池中,线程池会同时执行最多4个任务。如果有新的任务提交时,它会将其放入任务队列等待执行,直到有空闲的线程可用。
最后,我们调用executor.shutdown()
方法关闭线程池,在所有任务执行完毕后,线程池停止接受新的任务并逐渐关闭。
可以利用FixedThreadPool线程池来并发执行多个任务,提高程序的效率和性能。
当使用FixedThreadPool线程池时,可以提交多个任务并让线程池同时执行指定数量的任务。
CachedThreadPool线程池是一种自动管理大小的线程池,它会根据任务量自动扩展线程数,如果有多个任务同时提交,CachedThreadPool线程池会启动多个线程来执行这些任务。以下代码展示了如何创建一个CachedThreadPool线程池:
- ExecutorService executor = Executors.newCachedThreadPool(); // 创建一个自动管理线程数的线程池
- executor.execute(new MyTask()); // 提交任务至线程池
详细进行例子讲解:
当使用CachedThreadPool线程池时,它会自动管理线程数,根据任务量的大小来动态调整线程数量。以下是一个详细的示例,展示了如何创建和应用CachedThreadPool线程池:
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class Main {
-
- public static void main(String[] args) {
- // 创建线程池
- ExecutorService executor = Executors.newCachedThreadPool(); // 创建一个自动管理线程数的线程池
-
- // 提交任务至线程池
- for (int i = 1; i <= 10; i++) {
- MyTask task = new MyTask(i);
- executor.execute(task); // 提交任务至线程池
- }
-
- // 关闭线程池
- executor.shutdown();
- }
- }
-
- class MyTask implements Runnable {
- private int taskId;
-
- public MyTask(int taskId) {
- this.taskId = taskId;
- }
-
- @Override
- public void run() {
- System.out.println("Task " + taskId + " is running."); // 执行具体的任务逻辑
- try {
- Thread.sleep(1000); // 模拟任务执行时间
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Task " + taskId + " is complete.");
- }
- }
我们创建了一个CachedThreadPool线程池,并提交了10个任务到线程池中。
每个任务由MyTask
类表示,该类实现了Runnable
接口,并重写了run
方法。在run
方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印出任务标识,并模拟了一秒钟的执行时间。
通过循环提交多个任务到线程池中,CachedThreadPool线程池会自动根据任务量的大小来动态调整线程数量。如果有多个任务同时提交,它会启动多个线程来执行这些任务。
最后,我们调用executor.shutdown()
方法关闭线程池,在所有任务执行完毕后,线程池停止接受新的任务并逐渐关闭。
这样,你可以利用CachedThreadPool线程池来处理大量的短期任务,让线程池动态适应任务的变化,提高程序的效率和性能。
ScheduledThreadPool线程池是一种定时执行任务的线程池,它可以让任务在指定时间或周期性地执行。以下代码展示了如何创建一个ScheduledThreadPool线程池:
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池
- executor.schedule(new MyTask(), 10, TimeUnit.SECONDS); // 10秒后执行MyTask任务
- executor.scheduleAtFixedRate(new MyTask(), 1, 3, TimeUnit.SECONDS); // 1秒后开始执行MyTask任务,每隔3秒重复执行一次
详细进行例子讲解:
当使用ScheduledThreadPool线程池时,它可以定时执行任务,让任务在指定的时间或周期性地执行。以下是一个详细的示例,展示了如何创建和应用ScheduledThreadPool线程池:
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- public class Main {
-
- public static void main(String[] args) {
- // 创建线程池
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池
-
- // 使用schedule方法,在10秒后执行任务
- executor.schedule(new MyTask(), 10, TimeUnit.SECONDS);
-
- // 使用scheduleAtFixedRate方法,在1秒后开始执行任务,并且每隔3秒重复执行一次
- executor.scheduleAtFixedRate(new MyTask(), 1, 3, TimeUnit.SECONDS);
-
- // 关闭线程池
- // executor.shutdown();
- }
- }
-
- class MyTask implements Runnable {
- @Override
- public void run() {
- System.out.println("Task is running."); // 执行具体的任务逻辑
- try {
- Thread.sleep(1000); // 模拟任务执行时间
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Task is complete.");
- }
- }
我们创建了一个ScheduledThreadPool线程池,大小为2。
首先,我们使用executor.schedule()
方法,安排一个任务在10秒后执行。这意味着10秒钟后,线程池会执行MyTask
任务的run()
方法。
然后,我们使用executor.scheduleAtFixedRate()
方法,安排一个任务在1秒后开始执行,并且每隔3秒重复执行一次。这意味着任务将会以固定的时间间隔来执行。
每个任务由MyTask
类表示,该类实现了Runnable
接口,并重写了run
方法。在run
方法中,我们定义了任务的具体逻辑。在本例中,每个任务打印输出一条信息,并模拟了一秒钟的执行时间。
我们注释掉了关闭线程池的代码。如果需要,在所有任务执行完毕后,可以调用executor.shutdown()
方法来关闭线程池。
这样可以利用ScheduledThreadPool线程池来安排和执行定时任务,让任务在指定的时间或周期性地执行,从而满足需求。
这些是Executor框架创建线程池的例子。
实际上,Executor框架还提供了很多其他类型的线程池和任务调度器,可以根据需要选择适合自己的线程池和任务调度器来处理并发任务。
1.利用多线程提高程序的扩展能力,以达到业务对吞吐量的要求。
2.协调线程间调度,交互,以完成业务逻辑。
3.线程间传递数据和状态,同样是实现业务逻辑的需要。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。