赞
踩
目录
Stage的划分那么要想清楚RDD的依赖关系,可点击下面了解
任何一个Action算子就是一个job,因为每一个Action算子都会调用runjob方法
任务包括:ResultTask(最后一个阶段的任务) + ShuffleMapTask(非最后一个阶段)
四个步骤:
1,构建DAG
用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统;
DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算;
DAG是有边界的:开始(通过sparkcontext创建的RDD), 结束 (触发action, 调用runjob就是一 个完整的DAG形成了, 一旦触发action,就形成了- -个完整的DAG) ;
一个RDD描述了数据计算过程中的一个环节, 而一个DAG包含多 个RDD,描述了数据计算过程中的所有环节;
一个spark application可以包含多个DAG,取决于具体有多少个action。
2,DAGScheduler将DAG切分stage (切分依据是shuffle) ,将stage中生成的task以taskset的形式发送给 TaskScheduler为什么要切分stage?
一个复杂是业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一-个阶段,下一 个阶段的计算依赖上一个阶段的数据在同一个stage中,会有多个算子,可以合并到一-起,我们很难” 称其为pipeline (流水线,严格按照流程、顺序执行)
3,TaskScheduler 调度task (根据资源情况将task调度到Executors)
4,Executors接收task, 然后将task交给线程池执行。
划分stage的过程:从最后一个RDD开始 ,调用递归算法查找该RDD的父RDD ,找到父RDD后开始遍历,判断父RDD和该RDD的依赖关系,如果是宽依赖,就把父RDD和前面所有RDD都划分一个stage ,如果是窄依赖,继续递归查找父RDD的父RDD ,递归的出口是直到找不到父RDD.最后把所有的RDD统一划分一个stage.
一个job有一个或多个Stage组成,一个Stage由一个或多个Task组成
下图从源文件到输出总共有4给RDD,textDile-RDD --> flaMap-RDD --> map-RDD都是窄依赖所以划分为一个Stage, 然后reduceByKey-RDD这是宽依赖,会发生shuffle,这里段开,重新一个Stage,后面依照这个来划分Stage
在shulle之前保存数据用于重复计算
还可以提高并行度
sc.textFile("hdfs://hadoop01:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。