赞
踩
Spark主要的角色
SparkControlProcesses
Driver | Application entry point that contains the SparkContext instance |
Master | In charge of scheduling and resource orchestration |
Worker | Responsible for node state and running executors A worker is a control process that runs on a cluster node |
Executor | Allocated per job and in charge of executing tasks from that job An executor is a JVM process that executes on each worker |
SparkExecutionHierarchy
示例
提交一个job任务,spark-ui页面的监控图
Job:
Job 0
Stage0:
1)repartition 切割的stage0中的2个task并行去hdfs上的读取。
2))每个stage上运行详情可以看到shuffle量、GC量、运行时长等信息
Stage1:
1)2个task运行
Stage2:
Job1
Stage4:
Stage5:
任务解析
知识点:
RDD的特性:
- *
- * - A list of partitions
- * - A function for computing each split
- * - A list of dependencies on other RDDs
- * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
Stage的划分:
根据RDD之间的宽窄依赖关系进行划分:
stage划分:
问?
sc进行读取数据文件时,以LineRecordReader对象进行一行行读取,通过参数(数据路径,数据读取开始位置,数的据读取结束位置)进行读取。分装成一个个的splits进行后续的计算。
迭代计算部分源码:
ResultTask.scala
- override def runTask(context: TaskContext): U = {
- ...
- ...
- //传入处理的分区id,和context上下文
- func(context, rdd.iterator(partition, context))
- }
RDD.scala
- /**
- * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
- * This should ''not'' be called by users directly, but is available for implementors of custom
- * subclasses of RDD.
- */
- final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
- if (storageLevel != StorageLevel.NONE) {
- //若RDD有缓存,则直接从缓存中读取计算
- getOrCompute(split, context)
- } else {
- computeOrReadCheckpoint(split, context)
- }
- }
- /**
- * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
- */
- private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
- {
- if (isCheckpointedAndMaterialized) {
- //若有checkpoin持久化,则直接获取数据
- firstParent[T].iterator(split, context)
- } else {
- //直接计算
- compute(split, context)
- }
- }
MapPartitionsRDD.compte
- override def compute(split: Partition, context: TaskContext): Iterator[U] =
- f(context, split.index, firstParent[T].iterator(split, context))
HadoopRDD.scala
- override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
-
- //获取conf配置信息
- private val jobConf = getJobConf()
- //获取字节读取对象
- private val inputMetrics = context.taskMetrics().inputMetrics
- //获取上次写入的值
- private val existingBytesRead = inputMetrics.bytesRead
- //设置读取的数据信息
- split.inputSplit.value match {
- case fs: FileSplit =>
- InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
- case _ =>
- InputFileBlockHolder.unset()
- }
-
- //创建RecordReader对象
- reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
-
-
- private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
- private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
-
- override def getNext(): (K, V) = {
- try {
- finished = !reader.next(key, value)
- } catch {
- case e: FileNotFoundException if ignoreMissingFiles =>
- logWarning(s"Skipped missing file: ${split.inputSplit}", e)
- finished = true
- // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
- case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e: IOException if ignoreCorruptFiles =>
- logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
- finished = true
- }
- if (!finished) {
- inputMetrics.incRecordsRead(1)
- }
- if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
- updateBytesRead()
- }
- (key, value)
- }
InterruptibleIterator迭代器进行数据的迭代,使用hasnext 和 hext进行
- @DeveloperApi
- class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
- extends Iterator[T] {
-
- def hasNext: Boolean = {
- //kill掉中断的任务,继续执行下一批数据
- context.killTaskIfInterrupted()
- delegate.hasNext
- }
- def next(): T = delegate.next()
- }
迭代返回
当rdd.HadoopRDD.compute
运算完毕后,生成的初始的RDD计算结果。退回到rdd.HadoopRDD.compute
便可以调用函数f
:
- override def compute(split: Partition, context: TaskContext): Iterator[U] =
- f(context, split.index, firstParent[T].iterator(split, context))
f
计算出第二个的RDD计算结果,以此类推,一层层的返回。
上述逻辑类似与一个高阶函数:f(f(…f(compute())…))
1)逻辑复用,如示例代码,当代码串行执行的时候,遇到第一个action算子时候首先启动一个jvm进程去进行回溯,而第一个RDD进行回溯完毕,并且DAGScheduler将根据RDD之间的lineage进行划分成一个个的stage时,会将解析逻辑存储下来。而当第二个Action算子触发回溯时,当回溯到pairRDD1,时首先拿取到了已经解析完毕的DAG故无需再进行DAG解析直接复用即可。
2)如何数据复用?
使用持久化算子,cache,persist,checkpoint
Cache=persist(Memory_only)
Checkpoint:会对持久化的算子单独启动job取持久化,并且持久化到磁盘下(建议先cache)
持久化后的和未持久化的区别图 ->
代码刨析:
任务流程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。