赞
踩
fork/join 秉承分而治之思想,从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务。它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。如下图:
可以简单的理解为:将规模为N的问题,当N<阈值,直接解决;当N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。如果现场A的任务执行完成后,于是它就去其他线程B的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行,也就是线程B从头部拿取,而线程A从尾部获取任务开始执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
//有返回值 public abstract class RecursiveTask<V> extends ForkJoinTask<V>;
//没有返回值 public abstract class RecursiveAction extends ForkJoinTask<Void>;
- package cn.hsa.iep.usc.emc.job;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.RecursiveAction;
- import java.util.concurrent.RecursiveTask;
-
- /**
- * @author lsj
- * @date 2022/5/7 14:09
- */
- public class NoRetureValueTest extends RecursiveAction {
- //每个线程执行的阈值
- private static final int THRESHOLD_NUM = 10000;
- //需要处理的数据从构造函数传入
- List<Integer> sumList;
-
- public NoRetureValueTest(List<Integer> sumList) {
- this.sumList = sumList;
- }
-
- @Override
- protected void compute() {
- if (sumList.size() > THRESHOLD_NUM) {
- System.out.println("大于阈值重新划分sumList大小"+sumList.size());
- //大于阈值重新划分
- List<Integer> leftList = sumList.subList(0, THRESHOLD_NUM);
- List<Integer> rightList = sumList.subList(THRESHOLD_NUM, sumList.size());
- NoRetureValueTest leftTask = new NoRetureValueTest(leftList);
- NoRetureValueTest rightTask = new NoRetureValueTest(rightList);
- //执行任务
- invokeAll(leftTask, rightTask);
- }
-
- //如果小于阈值直接执行任务
- taskExcute(sumList);
-
- }
-
- private Integer taskExcute(List<Integer> sumList) {
- int sum = 0;
- for (Integer num :
- sumList) {
- sum += num;
- }
- System.out.println(Thread.currentThread().getName()+"执行完毕,sum="+sum);
- return sum;
- }
-
- public static void main(String[] args) throws InterruptedException {
- // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkjoinPool = new ForkJoinPool();
- //任务数据
- List<Integer> sumList = new ArrayList<>();
- for (int i = 0 ;i<100000; i++){
- sumList.add(i);
- }
- //创建初始任务
- NoRetureValueTest retureValueTest = new NoRetureValueTest(sumList);
- forkjoinPool.execute(retureValueTest);//异步执行任务,不需要等待所有线程都执行完
- //关闭线程池
- forkjoinPool.shutdown();
- }
- }
- package cn.hsa.iep.usc.emc.job;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.RecursiveTask;
-
- /**
- * @author lsj
- * @date 2022/5/7 14:09
- */
- public class RetureValueTest extends RecursiveTask<Integer> {
- //每个线程执行的阈值
- private static final int THRESHOLD_NUM = 10000;
- //需要处理的数据从构造函数传入
- List<Integer> sumList;
-
- public RetureValueTest(List<Integer> sumList) {
- this.sumList = sumList;
- }
-
- @Override
- protected Integer compute() {
- if (sumList.size() > THRESHOLD_NUM) {
- System.out.println("大于阈值重新划分sumList大小"+sumList.size());
- //大于阈值重新划分
- List<Integer> leftList = sumList.subList(0, THRESHOLD_NUM);
- List<Integer> rightList = sumList.subList(THRESHOLD_NUM, sumList.size());
- RetureValueTest leftTask = new RetureValueTest(leftList);
- RetureValueTest rightTask = new RetureValueTest(rightList);
- //执行任务
- invokeAll(leftTask, rightTask);
- //获取结果
- Integer leftResult = leftTask.join();
- Integer rightResult = rightTask.join();
-
- return leftResult + rightResult;
- }
-
- //如果小于阈值直接执行任务
- return taskExcute(sumList);
-
- }
-
- private Integer taskExcute(List<Integer> sumList) {
- int sum = 0;
- for (Integer num :
- sumList) {
- sum += num;
- }
- try {
- Thread.sleep(3000L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(1/0);
- System.out.println(Thread.currentThread().getName()+"执行完毕,sum="+sum);
- return sum;
- }
-
- public static void main(String[] args) throws InterruptedException {
- // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkjoinPool = new ForkJoinPool();
- //任务数据
- List<Integer> sumList = new ArrayList<>();
- for (int i = 0 ;i<100000; i++){
- sumList.add(i);
- }
- //创建初始任务
- RetureValueTest retureValueTest = new RetureValueTest(sumList);
- Integer invoke = forkjoinPool.invoke(retureValueTest);//执行任务
- // forkjoinPool.awaitTermination(10, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
- // if(retureValueTest.isCompletedAbnormally()){
- // System.out.println("异常:"+ retureValueTest.getException());
- // }
-
- System.out.println("结果1:"+ retureValueTest.join());
- System.out.println("结果2:"+ invoke);
- //关闭线程池
- forkjoinPool.shutdown();
- }
- }
1 .在有大量计算任务时,此框架方法可进行并行计算效率高,以上示例,可以根据具体的业务需求更改属性及相关方法用于匹配自己的业务逻辑
2 .JDK1.8后由于加入Stream流的操作,集合框架可以使用Collection<E> default Stream<E> parallelStream()的方法转换成并行流进行计算,此时效果与Fork/Join任务同效
3 .ForkJoinPool中的多种方法
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);//等待获取结果 public void execute(ForkJoinTask<?> task);//异步执行 public <T> T invoke(ForkJoinTask<T> task);//执行,获取Future
4.ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方 法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
if(task.isCompletedAbnormally()) { System.out.println(task.getException()); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。