当前位置:   article > 正文

Java并发编程--Fork/Join框架使用_thread join fork

thread join fork

      上篇博客我们介绍了通过CyclicBarrier使线程同步,但是上述方法存在一个问题,那就是如果一个大任务跑了2个线程去完成,如果线程2耗时比线程1多2倍,线程1完成后必须等待线程2完成,等待的过程线程1没法复用。现在我们准备解决这个问题,我们希望线程1完成自己的任务后能去帮助线程2完成一部分任务。Java7引如了Fork/Join框架可以很好的解决这个问题。

         Fork/Join是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果后得到大任务结果的框架。fork是分叉,join是结合。下面看张图来清晰的认识一下:

 

    其实Fork/Join本质上是分治算法的一种实现。下面我们来看怎么具体使用:

    ForkJoinTask:我们要使用Fork/Join框架,必须首先创建一个Fork/Join任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

        RecursiveAction:用于没有返回结果的任务。

        RecursiveTask :用于有返回结果的任务。

 

    ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

 

下面,我们同样实现考试系统抽题的例子。

GetQuestionsTask.java
  1. public class GetQuestionsTask extends RecursiveTask<List> {
  2. //参数map
  3. private Map map;
  4. //参数list==只放题型
  5. private List questionTypeList;
  6. public GetQuestionsTask(List questionTypeList,Map map) {
  7. this.questionTypeList = questionTypeList;
  8. this.map=map;
  9. }
  10. @Override
  11. protected List compute() {
  12. System.out.println(questionTypeList.size());
  13. List list = new ArrayList();
  14. if (questionTypeList.size() < 2) {
  15. // 抽题
  16. list = getQuestions(questionTypeList,map);
  17. } else {
  18. int size = questionTypeList.size();
  19. int mid = size / 2;
  20. GetQuestionsTask task1 = new GetQuestionsTask(
  21. questionTypeList.subList(0, mid),map);
  22. GetQuestionsTask task2 = new GetQuestionsTask(
  23. questionTypeList.subList(mid, size),map);
  24. invokeAll(task1, task2);
  25. try {
  26. list = groupResults(task1.get(), task2.get());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } catch (ExecutionException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. return list;
  34. }
  35. //合并抽题结果
  36. private List groupResults(List list1, List list2) {
  37. System.out.println(Thread.currentThread().getName()+"开始合并结果......");
  38. // 合并返回结果
  39. List list = new ArrayList();
  40. list.addAll(list1);
  41. list.addAll(list2);
  42. System.out.println(Thread.currentThread().getName()+"合并结果结束......");
  43. return list;
  44. }
  45. // 抽题
  46. private List getQuestions(List questTypeList,Map map) {
  47. List list = new ArrayList();
  48. for(int i=0;i<questTypeList.size();i++){
  49. System.out.println(Thread.currentThread().getName()+"开始抽题......"+questionTypeList.get(i).toString());
  50. //假数据,向list中放试题
  51. list.add("0");
  52. list.add("1");
  53. list.add("2");
  54. list.add("3");
  55. list.add("4");
  56. try {
  57. Thread.sleep(1000);
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. }
  61. System.out.println(Thread.currentThread().getName()+"抽题结束..."+questionTypeList.get(i).toString());
  62. }
  63. return list;
  64. }
  65. }


 

客户端:

 

  1. //该池的线程数量不会超过0*7fff个(32767)
  2. //池中维护着ForkJoinWorkerThread对象数组,数组大小由parallelism属性决定,parallelism默认为处理器个数
  3. ForkJoinPool pool = new ForkJoinPool();
  4. GetQuestionsTask task = new GetQuestionsTask(questionTypeList, map);
  5. pool.execute(task);
  6. // 试题列表=task.get()
  7. try {
  8. List finalList = task.get();
  9. System.out.println("最终结果个数:" + finalList.size());
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } catch (ExecutionException e) {
  13. e.printStackTrace();
  14. }

总结:

        Fork/Join实现了“工作窃取算法”,当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。当然,fork/join框架的使用有一定的约束条件:

1.除了fork()  和  join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()

2.线程不得进行I/O操作

3.线程不得抛出checked exception。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/220804
推荐阅读
相关标签
  

闽ICP备14008679号