当前位置:   article > 正文

spark调度体系——stage/task提交

spark调度体系——stage/task提交

提交stage

stage提交入口是在DAGScheduler类的handleJobSubmitted方法最后,调用submitStage提交最后一个stage。
image.png
submitStage

  1. 判断stage是否已经提交过,提交过则直接退出
  2. 获取上游缺失的stage,如果没有缺失,则表明这个stage的task可以提交了,调用submitMissingTasks提交
  3. 缺失的stage使用submitStage方法提交

image.png
getMissingParentStages
遍历stage,根据isAvailable判断stage是否完成。
这里只会向上找一层的shuffleMapStage。
image.png
如果shuffle输出结果数量达到对应numPartitions,则表示shuffle完成,即shuffleMapStage完成。
image.png

提交task

清除stage的中间结果

首次提交stage,是不会有中间结果,只有重试的stage才会有。
stage.isIndeterminate stage输出结果不确定,随机的。
!sms.isAvailable stage stage未完成
ShuffleMapStage满足上面两个条件则重试的时候清楚中间结果。
image.png

获取需要计算的partitions

image.png
可以看到findMissingPartitions是MapOutputTracker根据shuffleId获取对应partition的输出结果,如果哪个partition没有输出,则表示待计算。
image.png
image.png
image.png
获取task对应执行位置
image.png
TaskLocation是任务执行的位置。优先是同一个executor,其次是同一个host
image.png
获取偏好位置
为了避免数据网络传输,尽量在本地或者本机进行数据传输,所以才有了task启动的偏好位置选择。

  1. 已经遍历过的rdd、partition跳过
  2. 有缓存优先从缓存中获取
  3. rdd、partition有偏好位置,则返回偏好位置
  4. rdd依赖是窄依赖,从窄依赖中遍历得到第一个有偏好位置的返回

image.png

广播task信息

根据stage不同,将task不同信息先序列化再广播出去。

  • ShuffleMapTask:rdd, shuffleDep
  • ResultTask:rdd, func

image.png

生成task任务

根据stage不同生成对应的类型的task任务,汇总到tasks集合中
ShuffleMapTask和ResultTask
image.png

taskScheduler提交任务

image.png
为这次taskSet创建新的TaskSetManager并缓存,同时将可能存在的stage旧的TaskSetManager状态置为isZombie。
backend.reviveOffers()提交任务。
image.png

backend提交任务

yarn对应的backend对应的是CoarseGrainedSchedulerBackend
DriverEndpoint是CoarseGrainedSchedulerBackend的一个内部类,RPC的一个实例。
最后调用的makeOffers方法
image.png
image.png
makeOffers
获取存活的executor的信息资源,调用TaskSchedulerImpl的resourceOffers方法进行task分配,生成task执行信息。在加载执行task。
image.png

对task进行资源分配

根据offers,判断是否有新的executor加入,如果有的话就需要重新计算taskSetManager的本地性。同时根据黑名单过滤不能执行的host、executor得到最后的可以执行的filteredOffers。
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
tasks是最后的结果,生成的TaskDescription放入其中并返回。
sortedTaskSets是对所有的taskSetManager排序过滤僵尸状态,
排序有fico和fair,默认是fifo。
image.png
对taskSetManager进行资源分配。

  1. PROCESS_LOCAL
  2. NODE_LOCAL
  3. NO_PREF
  4. RACK_LOCAL
  5. ANY

按照上面顺序递增进行分配
image.png
image.png

创建TaskDescription对象

在taskSet的resourceOffer方法中有TaskDescription对象创建,放到taskDescs中返回
image.png

task发送到executor

makeOffers方法中最后调用launchTasks方法。
image.png
获取task对应的executor,发送LaunchTask给对应的executor。
image.png
附时序图
image.png

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/770998
推荐阅读
相关标签
  

闽ICP备14008679号