赞
踩
第二章 Java进阶篇之并发控制:掌握锁、信号量与屏障的艺术
第三章 Java进阶篇之并发工具类:深入Atomic、Concurrent与BlockingQueue
第四章 Java进阶篇之线程池(Executor框架)深度解析
第五章 Java进阶篇之Fork/Join 框架:并行处理的艺术
目录
在多核时代,如何有效利用多核处理器的并行计算能力成为了软件开发中的重要议题。Java 7引入的Fork/Join框架正是为此目的而生,它提供了一套高度优化的并行任务分解和合并机制,极大地简化了并行编程的复杂度,同时提升了应用程序的性能。本文将全面剖析Fork/Join框架的原理、使用方法及最佳实践。
Fork/Join框架的核心在于将一个大任务分解(Fork)成多个较小的子任务,并行执行这些子任务,然后将子任务的结果合并(Join)起来形成最终结果。这一过程通常称为“分治法”,它非常适合处理那些可以自然地被分解为独立子任务的问题。
ForkJoinPool:这是Fork/Join框架的核心组件,一个工作线程池,用于管理并行任务的执行。ForkJoinPool可以自动调整线程数量,以适应系统的硬件环境。
ForkJoinTask:这是所有可并行执行任务的基类。子类可以实现doFork()
、doJoin()
和compute()
方法来定义任务的分解和合并逻辑。
RecursiveAction 和 RecursiveTask:这两个类是ForkJoinTask
的子类,分别用于没有返回值的任务和有返回值的任务。
Fork/Join框架的使用始于任务的提交。开发者首先创建一个ForkJoinPool
实例,然后将一个初始任务提交给这个线程池。这个初始任务通常是一个ForkJoinTask
的子类实例。
当一个ForkJoinTask
被提交后,它会检查是否满足分解条件。如果任务足够大,它可以被分解成两个或更多的子任务,每个子任务都可以独立执行。这个分解过程是递归的,直到任务被分解到足够小,可以立即执行为止。
分解后的子任务会被加入到ForkJoinPool
的工作队列中,由可用的线程进行并行执行。ForkJoinPool
会根据当前的线程数和任务队列的长度来决定如何调度这些任务,以达到最优的负载均衡。
当所有子任务完成时,它们的结果会被合并起来,形成最终的输出。对于RecursiveTask
,这个合并过程通常是通过调用join()
方法来完成的,而RecursiveAction
则没有返回值,因此不需要显式的合并步骤。
递归分解最终会在任务小到一定程度时停止,这时的任务可以直接执行而无需进一步分解。这一阈值可以由开发者设定,也可以由框架根据系统负载自动调整。
为了更好地理解Fork/Join框架的使用,我们来看一个并行求和的例子。我们的目标是计算一个整型数组的所有元素之和。
- public class ForkJoinUtil {
-
- /**
- * 求和
- * @param list 集合
- * @param multiplier 线程倍数(cpu核心×传入参数)
- * @return 结果
- */
- public static BigDecimal sum(List<BigDecimal> list, int multiplier) {
- // Runtime.getRuntime().availableProcessors()为计算当前设备cpu核心数的代码
- int forkSize = Runtime.getRuntime().availableProcessors() * multiplier;
- ForkJoinPool forkJoinPool = new ForkJoinPool(forkSize);
- SumTask sumTask = new SumTask(list, 0, list.size());
- BigDecimal sum = forkJoinPool.invoke(sumTask);
- forkJoinPool.shutdown();
- return sum;
- }
-
- /**
- * 求平均值
- * @param list 集合
- * @param multiplier 线程倍数(cpu核心×传入参数)
- * @return 结果
- */
- public static BigDecimal avg(List<BigDecimal> list, int multiplier) {
- BigDecimal sum = sum(list, multiplier);
- return sum.divide(BigDecimal.valueOf(list.size()), 10, RoundingMode.HALF_UP);
- }
-
- static class SumTask extends RecursiveTask<BigDecimal> {
-
- private final List<BigDecimal> array;
- private final int start;
- private final int end;
-
- SumTask(List<BigDecimal> array, int start, int end) {
- this.array = array;
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected BigDecimal compute() {
- // 判断,如果小于等于10则进行计算,否则一直拆分
- if (end - start <= 10) {
- BigDecimal sum = new BigDecimal(0);
- for (int i = start; i < end; i++) {
- sum = sum.add(array.get(i));
- }
- return sum;
- } else {
- // 此处考虑到拆分list会有不必要的内存消耗,索引我们只拆分下标
- int mid = (start + end) / 2;
- SumTask left = new SumTask(array, start, mid);
- SumTask right = new SumTask(array, mid, end);
- left.fork();
- right.fork();
- return left.join().add(right.join());
- }
- }
-
- }
-
- }
在这个例子中,我们定义了一个SumTask
类,继承自RecursiveTask
,实现了compute()
方法。我们设定了一个阈值10,当数组的长度小于等于这个阈值时,任务将直接执行;否则,任务将被分解为两个子任务,并行执行。
- public class Main {
- public static void main(String[] args) {
- // forkJoin 求和,平均值
- // list 排序,最大值,最小值
- List<BigDecimal> list = new ArrayList<>();
- Random rand = new Random();
- // 生成10000个随机整数
- for (int i = 0; i < 100000; i++) {
- long randomNumber = rand.nextInt(); // 生成一个整数
-
- list.add(BigDecimal.valueOf(randomNumber));
- }
- long startTime = System.currentTimeMillis();
- BigDecimal sum = ForkJoinUtil.sum(list, 8);
- System.out.println(System.currentTimeMillis() - startTime + "ms----------" + sum);
- startTime = System.currentTimeMillis();
- // 使用Stream API求和
- BigDecimal sum1 = list.stream()
- .reduce(BigDecimal.ZERO, BigDecimal::add);
- System.out.println(System.currentTimeMillis() - startTime + "ms----------" + sum1);
-
- // 10w
- // 36ms----------767541652652
- // 115ms----------767541652652
-
- // 50w
- // 59ms----------1962610989443
- // 149ms----------1962610989443
-
- // 100w
- // 366ms-----------1540557861211
- // 137ms-----------1540557861211
-
- // 200w
- // 349ms-----------986094985228
- // 130ms-----------986094985228
- }
-
- }
上边是main方法,其中一个是调用fork join实现的,另一个是用的stream流的方法,最后的注释中是我测试的结果,很明显可以看到再数据量50w以内的时候,fork join的效果还是很不错的,我进行了集中测试,发现数据量是不是特别大的时候使用fork join进行求和和平均值是比stream流效率要高的(有说的不对的地方希望大家可以评论区进行交流)
Fork/Join框架虽然强大,但并不总是优于传统的同步循环或并行流。以下几点是在使用Fork/Join框架时应该考虑的:
Fork/Join框架为Java开发者提供了一个强大而灵活的工具,用于并行任务的分解和合并。通过合理的设计和使用,开发者可以显著提升应用程序的性能,尤其是在处理大规模数据集或执行复杂计算时。然而,正如所有高性能编程技术一样,Fork/Join框架的使用也需要仔细权衡和优化,以避免潜在的性能陷阱。
本文深入探讨了Java中的Fork/Join框架,从理论基础到实际应用,希望能够帮助读者掌握这一重要的并行编程技术,从而在未来的软件开发中取得更好的性能和效率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。