赞
踩
线程池是一种管理线程的机制,通过预先创建一定数量的线程,可以在需要时重复使用这些线程,从而避免频繁创建和销毁线程带来的性能开销。
使用线程池的优点包括:
Java 提供了 java.util.concurrent 包中的 Executors 工具类来创建各种类型的线程池。常见的方法包括:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
可以通过 execute 方法或 submit 方法将任务提交到线程池。
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("Task executed");
}
});
Future<?> future = fixedThreadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Task completed";
}
});
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Callable 是一个类似于 Runnable 的接口,但它可以返回一个结果或抛出异常。Future 表示一个异步计算的结果,可以用来获取 Callable 的返回值或检查任务是否完成。
Callable<String> callableTask = new Callable<String>() {
@Override
public String call() throws Exception {
return "Task result";
}
};
Future<String> future = fixedThreadPool.submit(callableTask);
try {
String result = future.get(); // 阻塞等待任务完成并获取结果
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
可以使用 shutdown 或 shutdownNow 方法来关闭线程池。
停止接受新任务,并让已提交的任务执行完毕。
fixedThreadPool.shutdown();
尝试停止所有正在执行的任务,并返回等待执行的任务列表。
List<Runnable> notExecutedTasks = fixedThreadPool.shutdownNow();
try {
if (!fixedThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedThreadPool.shutdownNow();
}
ThreadPoolExecutor 是 Java 线程池的核心实现类,它的构造函数包含以下参数:
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<Runnable>(), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new ThreadPoolExecutor.AbortPolicy() // handler
);
当线程池无法接受新任务时,会使用拒绝策略来处理这些任务。ThreadPoolExecutor 提供了以下几种拒绝策略:
默认策略,抛出 RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy();
由调用线程执行任务
new ThreadPoolExecutor.CallerRunsPolicy();
直接丢弃任务,不抛出异常。
new ThreadPoolExecutor.DiscardPolicy();
丢弃队列中最旧的任务,然后尝试重新提交任务。
new ThreadPoolExecutor.DiscardOldestPolicy();
ForkJoinPool 是 Java 7 引入的一种特殊的线程池,设计用于处理可以递归拆分成更小任务的并行计算。它基于工作窃取算法,适合处理大规模并行任务。
ForkJoinPool 与 ThreadPoolExecutor 的主要区别在于:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
class SumTask extends RecursiveTask<Integer> {
private final int[] array;
private final int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 10) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork();
return rightTask.compute() + leftTask.join();
}
}
}
public class ForkJoinExample {
public static void main(String[] args) {
int[] array = new int[100];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
在 ThreadPoolExecutor 中,当线程池和队列都满时,可以自定义拒绝策略来处理新提交的任务。可以通过实现 RejectedExecutionHandler 接口来自定义拒绝策略。
import java.util.concurrent.*;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝策略,例如记录日志或将任务放入另一个队列
System.out.println("Task " + r.toString() + " rejected from " + executor.toString());
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new CustomRejectedExecutionHandler()
);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is executing task");
});
}
executor.shutdown();
}
}
可以通过继承 ThreadPoolExecutor 类来实现一个自定义的线程池,并重写其方法以添加自定义行为。
import java.util.concurrent.*;
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
System.out.println("Before executing task: " + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println("After executing task: " + r.toString());
}
@Override
protected void terminated() {
super.terminated();
System.out.println("Thread pool terminated");
}
public static void main(String[] args) {
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)
);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is executing task");
});
}
executor.shutdown();
}
}
可以通过定期调整 ThreadPoolExecutor 的核心参数来实现动态调整线程池的功能。例如,可以使用 ScheduledExecutorService 定期检查任务队列的长度,并根据需要调整核心线程数和最大线程数。
import java.util.concurrent.*;
public class DynamicThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)
);
ScheduledExecutorService adjuster = Executors.newScheduledThreadPool(1);
adjuster.scheduleAtFixedRate(() -> {
int queueSize = executor.getQueue().size();
if (queueSize > 2) {
executor.setCorePoolSize(Math.min(executor.getCorePoolSize() + 1, 10));
executor.setMaximumPoolSize(Math.min(executor.getMaximumPoolSize() + 1, 20));
} else if (queueSize == 0) {
executor.setCorePoolSize(Math.max(executor.getCorePoolSize() - 1, 2));
executor.setMaximumPoolSize(Math.max(executor.getMaximumPoolSize() - 1, 4));
}
System.out.println("Adjusted Pool Size: " + executor.getCorePoolSize() + ", " + executor.getMaximumPoolSize());
}, 0, 1, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is executing task");
});
}
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
adjuster.shutdown();
}
}
可以使用 PriorityBlockingQueue 来实现一个具有优先级的线程池。任务需要实现 Comparable 接口,以定义任务的优先级。
import java.util.concurrent.*;
public class PriorityThreadPool {
static class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority;
private final String name;
public PriorityTask(int priority, String name) {
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is executing task: " + name);
}
@Override
public int compareTo(PriorityTask o) {
return Integer.compare(o.priority, this.priority);
}
@Override
public String toString() {
return name + "(priority=" + priority + ")";
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS, new PriorityBlockingQueue<>()
);
for (int i = 0; i < 10; i++) {
int priority = i % 3;
executor.execute(new PriorityTask(priority, "Task-" + i));
}
executor.shutdown();
}
}
可以使用 ScheduledThreadPoolExecutor 来实现一个支持超时任务的线程池。通过 schedule 方法,可以提交一个带有超时功能的任务。
import java.util.concurrent.*;
public class TimeoutThreadPool {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4);
for (int i = 0; i < 10; i++) {
ScheduledFuture<?> future = executor.schedule(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is executing task");
}, 1, TimeUnit.SECONDS);
executor.schedule(() -> {
if (!future.isDone()) {
future.cancel(true);
System.out.println("Task timed out and was cancelled");
}
}, 3, TimeUnit.SECONDS);
}
executor.shutdown();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。