当前位置:   article > 正文

Spark源码——CheckPoint原理_spark readcheckpointfile

spark readcheckpointfile

在这里插入图片描述
(图片来源:北风网)

找到RDD的iterator方法

/**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
    //如果持久化级别不为none
    //直接去取持久化数据并计算
      getOrCompute(split, context)
    } else {
    //否则计算或者读取checkpoint
      computeOrReadCheckpoint(split, context)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

进入computeOrReadCheckpoint

 /**
   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
   */
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
    //如果rdd被checkpoint和materialized
    //调用父RDD的iterator方法,就是从checkpoint外部文件系统中读取数据
      firstParent[T].iterator(split, context)
    } else {
    //没有checkpoint就老老实实计算
      compute(split, context)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  /**
   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
   * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
   * return value. Exposed for testing.
   * 返回此 RDD 是否被可靠地或本地地检查点和物化。这是作为 `isCheckpointed` 的别名引入的,以阐明返回值的语义。暴露用于测试。
   */
  private[spark] def isCheckpointedAndMaterialized: Boolean =
    checkpointData.exists(_.isCheckpointed)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
 /** Returns the first parent RDD */
 //获取父RDD
 //
  protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
    dependencies.head.rdd.asInstanceOf[RDD[U]]
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

读取checkpoint

/**
   * Read the content of the checkpoint file associated with the given partition.
   */
  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可以看到,采用了hadoop的api去读取hdfs数据


 /**
   * Read the content of the specified checkpoint file.
   */
  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get
    //从hdfs读数据
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    //hadoop的api
    val fileInputStream = fs.open(path, bufferSize)
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener(context => deserializeStream.close())

    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/593242
推荐阅读
相关标签
  

闽ICP备14008679号