当前位置:   article > 正文

Spark任务流程解析_skipped stage

skipped stage

 

Spark主要的角色

SparkControlProcesses

Driver

Application entry point that contains the SparkContext instance

Master

In charge of scheduling and resource orchestration

Worker

Responsible for node state and running executors

A worker is a control process that runs on a cluster node

Executor

Allocated per job and in charge of executing tasks from that job

An executor is a JVM process that executes on each worker

SparkExecutionHierarchy

示例

提交一个job任务,spark-ui页面的监控图

Job:

  1. Job0,job1分别由action算子触发生成,
  2. job1中有1个skippedstage,说明有一个RDD的复用

Job 0

 

Stage0:

1)repartition 切割的stage0中的2个task并行去hdfs上的读取。

2))每个stage上运行详情可以看到shuffle量、GC量、运行时长等信息

Stage1:

 

1)2个task运行

Stage2:

Job1

  1. 在构成的DAG中和stage执行列表中,可以看到skipped的stage,是哪一个阶段。
  2. Stage4触发直接复用已经划分好的stage
  3. Stage3未执行,直接跳过解析

Stage4:

Stage5:

任务解析

 

知识点:

  1. job的数据由action算子决定
  2. job中task中的数量由分区数决定

RDD的特性:

  1. *
  2. * - A list of partitions
  3. * - A function for computing each split
  4. * - A list of dependencies on other RDDs
  5. * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  6. * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

Stage的划分:

根据RDD之间的宽窄依赖关系进行划分:

  • 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用
  • 宽依赖,父RDD的一个分区会被子RDD的多个分区使用(宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作)

stage划分:

总结:遇到一个宽依赖就分一个stage

问?

  1. RDD数据加载,时一次性加载完所有数据吗? no

sc进行读取数据文件时,以LineRecordReader对象进行一行行读取,通过参数(数据路径,数据读取开始位置,数的据读取结束位置)进行读取。分装成一个个的splits进行后续的计算。

 

迭代计算部分源码:

ResultTask.scala

  1. override def runTask(context: TaskContext): U = {
  2. ...
  3. ...
  4. //传入处理的分区id,和context上下文
  5. func(context, rdd.iterator(partition, context))
  6. }

RDD.scala

  1. /**
  2. * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
  3. * This should ''not'' be called by users directly, but is available for implementors of custom
  4. * subclasses of RDD.
  5. */
  6. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  7. if (storageLevel != StorageLevel.NONE) {
  8. //若RDD有缓存,则直接从缓存中读取计算
  9. getOrCompute(split, context)
  10. } else {
  11. computeOrReadCheckpoint(split, context)
  12. }
  13. }
  14. /**
  15. * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
  16. */
  17. private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  18. {
  19. if (isCheckpointedAndMaterialized) {
  20. //若有checkpoin持久化,则直接获取数据
  21. firstParent[T].iterator(split, context)
  22. } else {
  23. //直接计算
  24. compute(split, context)
  25. }
  26. }

MapPartitionsRDD.compte

  1. override def compute(split: Partition, context: TaskContext): Iterator[U] =
  2. f(context, split.index, firstParent[T].iterator(split, context))

HadoopRDD.scala

  1. override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
  2. //获取conf配置信息
  3. private val jobConf = getJobConf()
  4. //获取字节读取对象
  5. private val inputMetrics = context.taskMetrics().inputMetrics
  6. //获取上次写入的值
  7. private val existingBytesRead = inputMetrics.bytesRead
  8. //设置读取的数据信息
  9. split.inputSplit.value match {
  10. case fs: FileSplit =>
  11. InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
  12. case _ =>
  13. InputFileBlockHolder.unset()
  14. }
  15. //创建RecordReader对象
  16. reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
  17. private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
  18. private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
  19. override def getNext(): (K, V) = {
  20. try {
  21. finished = !reader.next(key, value)
  22. } catch {
  23. case e: FileNotFoundException if ignoreMissingFiles =>
  24. logWarning(s"Skipped missing file: ${split.inputSplit}", e)
  25. finished = true
  26. // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
  27. case e: FileNotFoundException if !ignoreMissingFiles => throw e
  28. case e: IOException if ignoreCorruptFiles =>
  29. logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
  30. finished = true
  31. }
  32. if (!finished) {
  33. inputMetrics.incRecordsRead(1)
  34. }
  35. if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
  36. updateBytesRead()
  37. }
  38. (key, value)
  39. }

InterruptibleIterator迭代器进行数据的迭代,使用hasnext 和 hext进行

  1. @DeveloperApi
  2. class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
  3. extends Iterator[T] {
  4. def hasNext: Boolean = {
  5. //kill掉中断的任务,继续执行下一批数据
  6. context.killTaskIfInterrupted()
  7. delegate.hasNext
  8. }
  9. def next(): T = delegate.next()
  10. }

迭代返回

rdd.HadoopRDD.compute运算完毕后,生成的初始的RDD计算结果。退回到rdd.HadoopRDD.compute便可以调用函数f

  1. override def compute(split: Partition, context: TaskContext): Iterator[U] =
  2. f(context, split.index, firstParent[T].iterator(split, context))

f计算出第二个的RDD计算结果,以此类推,一层层的返回。

 

上述逻辑类似与一个高阶函数:f(f(…f(compute())…))

 

RDD的复用是DAG逻辑复用还是数据复用?

1)逻辑复用,如示例代码,当代码串行执行的时候,遇到第一个action算子时候首先启动一个jvm进程去进行回溯,而第一个RDD进行回溯完毕,并且DAGScheduler将根据RDD之间的lineage进行划分成一个个的stage时,会将解析逻辑存储下来。而当第二个Action算子触发回溯时,当回溯到pairRDD1,时首先拿取到了已经解析完毕的DAG故无需再进行DAG解析直接复用即可。

2)如何数据复用?

使用持久化算子,cache,persist,checkpoint

Cache=persist(Memory_only)

Checkpoint:会对持久化的算子单独启动job取持久化,并且持久化到磁盘下(建议先cache)

持久化后的和未持久化的区别图 ->

代码刨析:

 

任务流程:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/776459
推荐阅读
相关标签
  

闽ICP备14008679号