赞
踩
stage提交入口是在DAGScheduler类的handleJobSubmitted方法最后,调用submitStage提交最后一个stage。
submitStage
getMissingParentStages
遍历stage,根据isAvailable判断stage是否完成。
这里只会向上找一层的shuffleMapStage。
如果shuffle输出结果数量达到对应numPartitions,则表示shuffle完成,即shuffleMapStage完成。
首次提交stage,是不会有中间结果,只有重试的stage才会有。
stage.isIndeterminate stage输出结果不确定,随机的。
!sms.isAvailable stage stage未完成
ShuffleMapStage满足上面两个条件则重试的时候清楚中间结果。
可以看到findMissingPartitions是MapOutputTracker根据shuffleId获取对应partition的输出结果,如果哪个partition没有输出,则表示待计算。
获取task对应执行位置
TaskLocation是任务执行的位置。优先是同一个executor,其次是同一个host
获取偏好位置
为了避免数据网络传输,尽量在本地或者本机进行数据传输,所以才有了task启动的偏好位置选择。
根据stage不同,将task不同信息先序列化再广播出去。
根据stage不同生成对应的类型的task任务,汇总到tasks集合中
ShuffleMapTask和ResultTask
为这次taskSet创建新的TaskSetManager并缓存,同时将可能存在的stage旧的TaskSetManager状态置为isZombie。
backend.reviveOffers()提交任务。
yarn对应的backend对应的是CoarseGrainedSchedulerBackend
DriverEndpoint是CoarseGrainedSchedulerBackend的一个内部类,RPC的一个实例。
最后调用的makeOffers方法
makeOffers
获取存活的executor的信息资源,调用TaskSchedulerImpl的resourceOffers方法进行task分配,生成task执行信息。在加载执行task。
根据offers,判断是否有新的executor加入,如果有的话就需要重新计算taskSetManager的本地性。同时根据黑名单过滤不能执行的host、executor得到最后的可以执行的filteredOffers。
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
tasks是最后的结果,生成的TaskDescription放入其中并返回。
sortedTaskSets是对所有的taskSetManager排序过滤僵尸状态,
排序有fico和fair,默认是fifo。
对taskSetManager进行资源分配。
按照上面顺序递增进行分配
在taskSet的resourceOffer方法中有TaskDescription对象创建,放到taskDescs中返回
makeOffers方法中最后调用launchTasks方法。
获取task对应的executor,发送LaunchTask给对应的executor。
附时序图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。