当前位置:   article > 正文

RDD的依赖内部解密_list(partitionid - outstart + instart)

list(partitionid - outstart + instart)

RDD的依赖内部解密

视频学习来源:DT-大数据梦工厂 IMF传奇行动视频(后附王家林老师联系方式)

本期内容:
    RDD依赖关系的本质
    依赖关系下的数据流程图
    
窄依赖:每个父RDD的Partition最多被子RDD的一个Partition所使用(例如map和filter、union);
宽依赖:每个父RDD中的Partition会被多个子RDD中的Partition所使用(groupByKey和ReduceByKey等作)。
总结:如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。
注:对join操作有两种情况,若果join的时候,每个Partition仅仅和已知的Partition进行join,此时的join的窄依赖并不会产生shuffle,其他情况的操作就是宽依赖。 如果子RDD中的Patition对父RDD的Partiton依赖的数量
不会随着RDD数据规模的改变而改变的话,就是窄依赖,否则的话就是宽宽依赖
    因为是确定的partition数量的依赖关系,所有就是窄依赖,得出一个推论, 窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)

  将所有RDD都看做在一个Stage,每个箭头都生成一个task,每一步都产生新的RDD,但有一个弊端就是需要挨个执行,这样会产生大量的中间数据(中间数据会存储起来,下一步才会执行,而内存不会释放),RDD内部的Partition数据不会受干扰,若将G看做最后一个RDD,final级别的RDD,有三个Partition,为每个Partition分配一个Task,此时会产生,第一个数据分片,数据来自B的一个分片,B的分片来自A的三个分片,同时来自F的四个分片,会导致Task太大,此时遇到shuffle级别的依赖关系,必须计算依赖的RDD的所有的Partition,并且都发生在一个Task中计算。
         回溯和血统 pipeline。
         上面两种假设的核心问题都是在遇到shuffle依赖(宽依赖)的时候无法进行pipeline,退而求其次,在有shuffle依赖的时候断开,此时窄依赖的计算链条,
    1、 每个stage里边的task的数量是由该stage中最后一个RDD的partition的数量所决定的。
    2、 从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入该Stage中
    遇到shuffle的时候断开因为数据量太大
    3、上图中C、D、E、F构成了pipeline,最后一个Stage里边的任务的类型是ResultTask,前面其他Stage里边的任务的类型多是shuffleMapTask。
    4、代表当前Stage的算子一定是该Stage的最后一个计算步骤!!!
启动 集群进程和historyServer进程,查看历史运行任务:

DAG逻辑图:



表面上看,数据在流动,实质是算子在流动,背后核心是函数式编程,包含两层意思:
    1、数据不动代码动(从集群计算的角度);
    2、在一个Stage内部,算子为何会流动进行pipeline?,首先是算子合并,也就是函数式编程执行的时候最终进行函数的展开,从而把一个stage内部的多个算子合并成为一个大算子(其内部包含了当前stage所有数据的计算逻辑)。其次是由于transformation的lazy特性,所以才能最后进行合并(不加lazy会产生中间结果);在具体算子交给Executor计算之前,首先会通过SparkFrameWork(DAGScheduler)进行算子的优化(基于数据本地性的pipeline),产生pipeline不会产生中间结果,框架帮我们管理了算子,引擎将所以算子产生一个大算子(若是进行cache或者checkpoint可以添加标记进行cache和checkpoint)。
(coGroupRDD类似于二维表)
  1. /**
  2. * Generic function to combine the elements for each key using a custom set of aggregation聚合
  3. * functions. This method is here for backward compatibility. It does not provide combiner
  4. * classtag information to the shuffle.
  5. *
  6. * @see [[combineByKeyWithClassTag]]
  7. */
  8. def combineByKey[C](
  9. createCombiner: V => C,
  10. mergeValue: (C, V) => C,
  11. mergeCombiners: (C, C) => C,
  12. partitioner: Partitioner,
  13. mapSideCombine: Boolean = true,
  14. serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  15. combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
  16. partitioner, mapSideCombine, serializer)(null)
  17. }
  18. /**
  19. * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
  20. * This method is here for backward compatibility. It does not provide combiner
  21. * classtag information to the shuffle.
  22. *
  23. * @see [[combineByKeyWithClassTag]]
  24. */
  25. def combineByKey[C](
  26. createCombiner: V => C,
  27. mergeValue: (C, V) => C,
  28. mergeCombiners: (C, C) => C,
  29. numPartitions: Int): RDD[(K, C)] = self.withScope {
  30. combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
  31. }
产生shuffle依赖,key不可以是数组,ReduceByKey边抓数据边计算提高效率,其参数createCombiner初始化抓进来的数据,mergeValue,V变成C的算子,mergeCombiner将相同的C加起来。
depandecy中的窄依赖:
  1. @DeveloperApi
  2. abstract class Dependency[T] extends Serializable {
  3. def rdd: RDD[T]
  4. }
  5. /**
  6. * :: DeveloperApi ::
  7. * Base class for dependencies where each partition of the child RDD depends on a small number
  8. * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
  9. */
  10. @DeveloperApi
  11. abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  12. /**
  13. * Get the parent partitions for a child partition.
  14. * @param partitionId a partition of the child RDD
  15. * @return the partitions of the parent RDD that the child partition depends upon
  16. */
  17. def getParents(partitionId: Int): Seq[Int]
  18. override def rdd: RDD[T] = _rdd
  19. }
两种RDD的窄依赖:
  1. /**
  2. * :: DeveloperApi ::
  3. * Represents a one-to-one dependency between partitions of the parent and child RDDs.
  4. */
  5. @DeveloperApi
  6. class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  7. override def getParents(partitionId: Int): List[Int] = List(partitionId)
  8. }
  9. /**
  10. * :: DeveloperApi ::
  11. * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
  12. * @param rdd the parent RDD
  13. * @param inStart the start of the range in the parent RDD
  14. * @param outStart the start of the range in the child RDD
  15. * @param length the length of the range
  16. */
  17. @DeveloperApi
  18. class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  19. extends NarrowDependency[T](rdd) {
  20. override def getParents(partitionId: Int): List[Int] = {
  21. if (partitionId >= outStart && partitionId < outStart + length) {
  22. List(partitionId - outStart + inStart)
  23. } else {
  24. Nil
  25. }
  26. }
  27. }
oneToone一对一传递Partition
RangeDependency,进行join操作
shuffleDependency:
  1. class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
  2. @transient private val _rdd: RDD[_ <: Product2[K, V]],
  3. val partitioner: Partitioner,
  4. val serializer: Option[Serializer] = None,
  5. val keyOrdering: Option[Ordering[K]] = None,
  6. val aggregator: Option[Aggregator[K, V, C]] = None,
  7. val mapSideCombine: Boolean = false)
  8. extends Dependency[Product2[K, V]] {
注:Hadoop中的MapReduce中的mapper和reducer相当于Spark中map和ReduceByKey算子。



王家林老师是大数据技术集大成者,中国Spark第一人:

DT大数据梦工厂

新浪微博:www.weibo.com/ilovepains/

微信公众号:DT_Spark

博客:http://.blog.sina.com.cn/ilovepains

TEL:18610086859

Email:18610086859@vip.126.com








声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/600856
推荐阅读
相关标签
  

闽ICP备14008679号