赞
踩
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit) {
- if (stopped) {
- throw new IllegalStateException("SparkContext has been shutdown")
- }
- val callSite = getCallSite
- val cleanedFunc = clean(func)
- logInfo("Starting job: " + callSite.shortForm)
- if (conf.getBoolean("spark.logLineage", false)) {
- logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
- }
- dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
- resultHandler, localProperties.get)
- progressBar.foreach(_.finishAll())
- rdd.doCheckpoint()
- }
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- callSite: CallSite,
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit,
- properties: Properties = null)
- {
- val start = System.nanoTime
- val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
- waiter.awaitResult() match {
- case JobSucceeded => {
- logInfo("Job %d finished: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- }
- case JobFailed(exception: Exception) =>
- logInfo("Job %d failed: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- throw exception
- }
- }
- // Check to make sure we are not launching a task on a partition that does not exist.
- val maxPartitions = rdd.partitions.length
- partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
- throw new IllegalArgumentException(
- "Attempting to access a non-existent partition: " + p + ". " +
- "Total number of partitions: " + maxPartitions)
- }
- val jobId = nextJobId.getAndIncrement()
- if (partitions.size == 0) {
- return new JobWaiter[U](this, jobId, 0, resultHandler)
- }
- assert(partitions.size > 0)
- val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
- eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
- waiter
- var finalStage: Stage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
- } catch {
- ......
- }
- ......
- private[spark] class Stage(
- val id: Int,
- val rdd: RDD[_],
- val numTasks: Int,
- val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
- val parents: List[Stage],
- val jobId: Int,
- val callSite: CallSite)
- private def newStage(
- rdd: RDD[_],
- numTasks: Int,
- shuffleDep: Option[ShuffleDependency[_, _, _]],
- jobId: Int,
- callSite: CallSite)
- : Stage =
- {
- val parentStages = getParentStages(rdd, jobId)
- val id = nextStageId.getAndIncrement()
- val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
- stageIdToStage(id) = stage
- updateJobIdStageIdMaps(jobId, stage)
- stage
- }
- private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
- val parents = new HashSet[Stage]
- val visited = new HashSet[RDD[_]]
- // We are manually maintaining a stack here to prevent StackOverflowError
- // caused by recursively visiting
- val waitingForVisit = new Stack[RDD[_]]
- def visit(r: RDD[_]) {
- if (!visited(r)) {
- visited += r
- // Kind of ugly: need to register RDDs with the cache here since
- // we can't do it in its constructor because # of partitions is unknown
- for (dep <- r.dependencies) {
- dep match {
- case shufDep: ShuffleDependency[_, _, _] =>
- parents += getShuffleMapStage(shufDep, jobId)
- case _ =>
- waitingForVisit.push(dep.rdd)
- }
- }
- }
- }
- waitingForVisit.push(rdd)
- while (!waitingForVisit.isEmpty) {
- visit(waitingForVisit.pop())
- }
- parents.toList
- }
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
- shuffleToMapStage.get(shuffleDep.shuffleId) match {
- case Some(stage) => stage
- case None =>
- // We are going to register ancestor shuffle dependencies
- registerShuffleDependencies(shuffleDep, jobId)
- // Then register current shuffleDep
- val stage =
- newOrUsedStage(
- shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
- shuffleDep.rdd.creationSite)
- shuffleToMapStage(shuffleDep.shuffleId) = stage
-
- stage
- }
- }
- private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = {
- val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
- while (!parentsWithNoMapStage.isEmpty) {
- val currentShufDep = parentsWithNoMapStage.pop()
- val stage =
- newOrUsedStage(
- currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
- currentShufDep.rdd.creationSite)
- shuffleToMapStage(currentShufDep.shuffleId) = stage
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。