赞
踩
(图片来源:北风网)
找到RDD的iterator方法
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
//如果持久化级别不为none
//直接去取持久化数据并计算
getOrCompute(split, context)
} else {
//否则计算或者读取checkpoint
computeOrReadCheckpoint(split, context)
}
}
进入computeOrReadCheckpoint
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
//如果rdd被checkpoint和materialized
//调用父RDD的iterator方法,就是从checkpoint外部文件系统中读取数据
firstParent[T].iterator(split, context)
} else {
//没有checkpoint就老老实实计算
compute(split, context)
}
}
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
* return value. Exposed for testing.
* 返回此 RDD 是否被可靠地或本地地检查点和物化。这是作为 `isCheckpointed` 的别名引入的,以阐明返回值的语义。暴露用于测试。
*/
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)
/** Returns the first parent RDD */
//获取父RDD
//
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
读取checkpoint
/**
* Read the content of the checkpoint file associated with the given partition.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
可以看到,采用了hadoop的api去读取hdfs数据
/** * Read the content of the specified checkpoint file. */ def readCheckpointFile[T]( path: Path, broadcastedConf: Broadcast[SerializableConfiguration], context: TaskContext): Iterator[T] = { val env = SparkEnv.get //从hdfs读数据 val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) //hadoop的api val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) deserializeStream.asIterator.asInstanceOf[Iterator[T]] }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。