赞
踩
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系
- // 代码样例
- def main(args: Array[String]): Unit = {
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
- val rdd:RDD[String] = sc.textFile("input/1.txt")
- val mapRdd = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
- mapRdd.saveAsTextFile("outpath")
- //3.关闭连接
- sc.stop()
- }
先看下执行流程图(Yarn-Cluster)
现在一步一步分析
这个阶段会产生血缘依赖关系,具体的代码并没有执行
DAGScheduler: 先划分阶段(stage)再划分任务(task)
这个时候会产生Job的stage个数 = 宽依赖的个数+1 = 2 (这个地方产生一个宽依赖),也就是产生shuffle这个地方
Job的Task个数= 一个stage阶段中,最后一个RDD的分区个数就是Task的个数(2+2 =4)shuffle前的ShuffleStage产生两个
shuffle后reduceStage产生两个
job的个数也就是 = Action算子的个数(这里只一个collect)
源码分析
一步一步从collect()方法找下会找到这段主要代码
- ———————————————————————————————1—————————————————————————————————
- var finalStage: ResultStage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return
- }
-
-
- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
- 根据上面图片流程,程序需要找到最后一个Rdd然后创建ResultStage
-
- 下走
-
- —————————————————————————————————2—resultStage创建——————————————————————————————
-
- private def createResultStage(
- rdd: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- jobId: Int,
- callSite: CallSite): ResultStage = {
- val parents = getOrCreateParentStages(rdd, jobId)
- val id = nextStageId.getAndIncrement()
- val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
- stageIdToStage(id) = stage
- updateJobIdStageIdMaps(jobId, stage)
- stage
- }
-
- stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) 这里resultStage已创建
- parents = getOrCreateParentStages(rdd, jobId)
- 再下走
-
- —————————————————————————————————3———————————————————————————————
-
- private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
- getShuffleDependencies(rdd).map { shuffleDep =>
- getOrCreateShuffleMapStage(shuffleDep, firstJobId)
- }.toList
- }
-
- getShuffleDependencies
- 再往下走
-
- —————————————————————————————————4———————————————————————————————
-
- private[scheduler] def getShuffleDependencies(
- rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
- val parents = new HashSet[ShuffleDependency[_, _, _]]
- val visited = new HashSet[RDD[_]]
- val waitingForVisit = new Stack[RDD[_]]
- waitingForVisit.push(rdd)
- while (waitingForVisit.nonEmpty) {
- val toVisit = waitingForVisit.pop()
- if (!visited(toVisit)) {
- visited += toVisit
- toVisit.dependencies.foreach {
- case shuffleDep: ShuffleDependency[_, _, _] =>
- parents += shuffleDep
- case dependency =>
- waitingForVisit.push(dependency.rdd)
- }
- }
- }
- parents
- }
-
-
- toVisit.dependencies 它会遍历依赖关系去找ShuffleDependency也即上面图中的一步一步向上找,他会找每个RDD和遍历,只不过他只判断是否是shuffleRDD
-
- 然会会找所有的ShuffleDependency (parents += shuffleDep)然后再回到第三步
- getShuffleDependencies(rdd).map {......}
- 然后再往下
-
- —————————————————————————————————5———————————————————————————————
-
- private def getOrCreateShuffleMapStage(
- shuffleDep: ShuffleDependency[_, _, _],
- firstJobId: Int): ShuffleMapStage = {
- shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
- case Some(stage) =>
- stage
-
- case None =>
- // Create stages for all missing ancestor shuffle dependencies.
- getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
-
- if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
- createShuffleMapStage(dep, firstJobId)
- }
- }
- // Finally, create a stage for the given shuffle dependency.
- createShuffleMapStage(shuffleDep, firstJobId)
- }
- }
-
- createShuffleMapStage(shuffleDep, firstJobId) 为shuffle dependency创建ShuffleMapStage
- 再往下走
-
- —————————————————————————————————6ShuffleMapStage创建———————————————————————————————
-
- def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
- val rdd = shuffleDep.rdd
- val numTasks = rdd.partitions.length
- val parents = getOrCreateParentStages(rdd, jobId)
- val id = nextStageId.getAndIncrement()
- val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
-
- 也即最后一句创建了ShuffleMapStage
-
- ———————————————————以上ResultStage和ShuffleMapStage创建好了(图中可体现过程)——————————————
-
以上ResultStage和ShuffleMapStage创建好了(图中可体现过程)
- ——————————————————————————————执行代码————————————————————————————
-
- private[scheduler] def handleJobSubmitted(jobId: Int,
- finalRDD: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- callSite: CallSite,
- listener: JobListener,
- properties: Properties) {
- var finalStage: ResultStage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return
- }
-
- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
- clearCacheLocs()
- logInfo("Got job %s (%s) with %d output partitions".format(
- job.jobId, callSite.shortForm, partitions.length))
- logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
-
- val jobSubmissionTime = clock.getTimeMillis()
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.setActiveJob(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
- submitStage(finalStage)
- }
-
- -------
-
- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
- finalStage.setActiveJob(job)
- 找到finalStage后(也即上面源码分析中的ResultStage),把最后阶段传了进来,需要和JOb联系在一起
-
- submitStage(finalStage) 下走
-
- ——————————————————————————————2————————————————————————————
-
- private def submitStage(stage: Stage) {
- val jobId = activeJobForStage(stage)
- if (jobId.isDefined) {
- logDebug("submitStage(" + stage + ")")
- if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
- val missing = getMissingParentStages(stage).sortBy(_.id)
-
- 把最后阶段的finalStage(ResultStage)交给了getMissingParentStages 主要目的是找前面的stage
- 下走
-
- ——————————————————————————————3————————————————————————————
-
- private def getMissingParentStages(stage: Stage): List[Stage] = {
- val missing = new HashSet[Stage]
- val visited = new HashSet[RDD[_]]
- // We are manually maintaining a stack here to prevent StackOverflowError
- // caused by recursively visiting
- val waitingForVisit = new Stack[RDD[_]]
- def visit(rdd: RDD[_]) {
- if (!visited(rdd)) {
- visited += rdd
- val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
- if (rddHasUncachedPartitions) {
- for (dep <- rdd.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_, _, _] =>
- val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
- if (!mapStage.isAvailable) {
- missing += mapStage
- }
- case narrowDep: NarrowDependency[_] =>
- waitingForVisit.push(narrowDep.rdd)
- }
- }
- }
- }
- }
- waitingForVisit.push(stage.rdd)
- while (waitingForVisit.nonEmpty) {
- visit(waitingForVisit.pop())
- }
- missing.toList
- }
-
-
- 主要看def visit(rdd: RDD[_])
- for (dep <- rdd.dependencies) 还是找ShuffleDependency 一直到找不到为止,会把ShuffleDependency添加到missing中(看有几个shuffle)
-
-
- ——————————————————————————————4(回到2中)————————————————————————————
-
- private def submitStage(stage: Stage) {
- val jobId = activeJobForStage(stage)
- if (jobId.isDefined) {
- logDebug("submitStage(" + stage + ")")
- if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
- val missing = getMissingParentStages(stage).sortBy(_.id)
- logDebug("missing: " + missing)
- if (missing.isEmpty) {
- logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
- submitMissingTasks(stage, jobId.get)
- } else {
- for (parent <- missing) {
- submitStage(parent)
- }
- waitingStages += stage
- }
- }
- } else {
- abortStage(stage, "No active job for stage " + stage.id, None)
- }
- }
-
- 主要代码 for (parent <- missing) {
- submitStage(parent)
- }
- 会把第三步的missing进行递归 直到 missing为空(stage前面没有了)为止开始submitMissingTasks(stage, jobId.get)
- 开始执行ShuffleMapStage,执行的时候会找到有多少Task
- 下走
-
- ——————————————————————————————5————————————————————————————
-
- private def submitMissingTasks(stage: Stage, jobId: Int) {
- val tasks: Seq[Task[_]] = try {
- stage match {
- case stage: ShuffleMapStage =>
- partitionsToCompute.map { id =>
- val locs = taskIdToLocations(id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
- Option(sc.applicationId), sc.applicationAttemptId)
- }
-
- case stage: ResultStage =>
- partitionsToCompute.map { id =>
- val p: Int = stage.partitions(id)
- val part = stage.rdd.partitions(p)
- val locs = taskIdToLocations(id)
- new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
- Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
- }
- }
- }
-
-
- partitionsToCompute 下走
-
- ——————————————————————————————6————————————————————————————
-
- override def findMissingPartitions(): Seq[Int] = {
- val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
- assert(missing.size == numPartitions - _numAvailableOutputs,
- s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
- missing
- }
-
- 如果ShuffleMapStage阶段最后的Rdd有两个分区
- missing返回的就是 0 和 1
- 下走回到这里
-
- partitionsToCompute.map { id =>
- val locs = taskIdToLocations(id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
- Option(sc.applicationId), sc.applicationAttemptId)
- }
-
- 有两个分区,也就会new 两个 ShuffleMapTask,也就两个Task任务
-
- 匹配result的原理一样,不再阐述
-
- —————————————————————7(和第五步同列代码)—————————————————————————————
-
- if (tasks.size > 0) {
- logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
- stage.pendingPartitions ++= tasks.map(_.partitionId)
- logDebug("New pending partitions: " + stage.pendingPartitions)
- taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
- stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
- }
-
- taskScheduler.submitTasks 提交任务
- 下走
-
- _______________________________8_______________________________
-
- override def submitTasks(taskSet: TaskSet) {
- val tasks = taskSet.tasks
- logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
- this.synchronized {
- val manager = createTaskSetManager(taskSet, maxTaskFailures)
- val stage = taskSet.stageId
- val stageTaskSets =
- taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
- stageTaskSets(taskSet.stageAttemptId) = manager
- val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
- ts.taskSet != taskSet && !ts.isZombie
- }
- ......
-
- 至此完了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。