赞
踩
RDD,Resiliennt Distributed Datasets,弹性式分布式数据集,是由若干个分区构成的,那么这每一个分区中的数据又是如何产生的呢?这就是RDD分区策略所要解决的问题,下面我们就一道来学习RDD分区相关。
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。
分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。
1)Partitioner:在Spark中涉及RDD的分区策略的抽象类为Partitioner,其继承体系如图-27所示,有两个核心的子类实现,一个HashPartitioner,一个RangePartitioner。
图-27 spark Partitioner继承体系
Spark中数据分区的主要工具类(数据分区类),主要用于Spark底层RDD的数据重分布的情况中,主要方法两个,如图-28所示:
图-28 Partitioner抽象类
2)HashPartitioner:Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上。
功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作;但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。HashPartitioner代码如下:
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
- //加载数据
- val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
- //通过Hash分区
- val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
- //获取分区方式
- println(result.partitioner)
- //获取分区数
- println(result.getNumPartitions)
- }
我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合我们的需求,这时候我们就可以自定义分区策略。
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
1)numPartitions: Int:返回创建出来的分区数。
2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
案例一:模拟实现HashPartitioner。
- class CustomerPartitoner(numPartiton:Int) extends Partitioner{
-
- // 返回分区的总数
-
- override def numPartitions: Int = numPartiton
-
- // 根据传入的Key返回分区的索引
-
- override def getPartition(key: Any): Int = {
-
- key.hashCode()%numparts
-
- }
-
- }
-
- object CustomerPartitoner {
-
- def main(args: Array[String]): Unit = {
-
- val sparkConf = new SparkConf()
-
- .setAppName("CustomerPartitoner").setMaster("local[*]")
-
- val sc = new SparkContext(sparkConf)
-
- //zipWithIndex该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
-
- val rdd = sc.parallelize(0 to 10,1).zipWithIndex()
-
- val func = (index:Int,iter:Iterator[(Int,Long)]) =>{
-
- iter.map(x => "[partID:"+index + ", value:"+x+"]")
-
- }
-
- val r = rdd.mapPartitionsWithIndex(func).collect()
-
- for (i <- r){
-
- println(i)
-
- }
-
- val rdd2 = rdd.partitionBy(new CustomerPartitoner(5))
-
- val r1 = rdd2.mapPartitionsWithIndex(func).collect()
-
- println("----------------------------------------")
-
- for (i <- r1){
-
- println(i)
-
- }
-
- println("----------------------------------------")
-
- sc.stop()
-
- }
-
- }
总结:
1)分区主要面对KV结构数据,Spark内部提供了两个比较重要的分区器,Hash分区器和Range分区器。
2)hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过水塘抽样的算法来将数据均匀的分配到各个分区中。
3)自定义分区主要通过继承partitioner抽象类来实现,必须要实现两个方法:numPartitions 和 getPartition(key: Any)。
RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
图-29是源码中的一张图,可以发现一个问题Dependency(依赖)的意思可以发现ShuffleDependency是其子类(即宽依赖),NarrowDependency是其子类(即窄依赖)。
图-29 Dependency体系
1)宽窄依赖:所谓窄依赖,指的是子RDD一个分区中的数据,来自于上游RDD中一个分区。所谓宽依赖,指的是子RDD一个分区中的数据,来自于上游RDD所有的分区。
宽窄依赖关系示例如图-30所示:
图-30 宽窄依赖示例图
2)血统Lineage:RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。关于linage说明示意图如图-31所示:
图-31 lineage示例图
3)DAG有向无环图:如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。通俗的来说就是有方向,没有回流的图可以称为有向无环图,示意图如图-32所示。
图-32 有像无环图
4)RDD任务的切分:对于RDD的任务切分,可以很形象的如图-33所示。
图-33 RDD任务的切分
并行度:程序同一时间执行作业的线程个数。
原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,如图-34所示。
图-34 RDD stage的切分
对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的重要依据。Stage阶段计算过程如图所示-35所示。
图-35 RDD stage阶段计算过程
Spark任务生产和提交的四个步骤可以归纳如下:
1)构建DAG:用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统。
DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算。
DAG是有边界的:开始(通过sparkcontext创建的RDD),结束(触发action,调用runjob就是一个完整的DAG形成了,一旦触发action,就形成了一个完整的DAG)。
一个RDD描述了数据计算过程中的一个环节,而一个DAG包含多个RDD,描述了数据计算过程中的所有环节。
一个spark application可以包含多个DAG,取决于具体有多少个action。
2)DAGScheduler:将DAG切分stage(切分依据是shuffle),将stage中生成的task以taskset的形式发送给TaskScheduler
为什么要切分stage?
一个是复杂业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)。
如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,下一个阶段的计算依赖上一个阶段的数据。
在同一个stage中,会有多个算子,可以合并到一起,我们很难称其为pipeline(流水线,严格按照流程、顺序执行)。
3)TaskScheduler:调度task(根据资源情况将task调度到Executors).
4)Executors:接收task,然后将task交给线程池执行。
具体可以简化为如图-38所示。
图-38 spark任务生成和提交图
topN就是上述sortBy/sortByKey之后执行action操作take(N),或者直接takeOrderd(N),建议使用后者,效率高于前者。具体操作省略。
所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。
- object SecondSortOps {
-
- def main(args: Array[String]): Unit = {
-
- val conf = new SparkConf()
-
- .setAppName(s"${SecondSortOps.getClass.getSimpleName}")
-
- .setMaster("local[2]")
-
- val sc = new SparkContext(conf)
-
- //sortByKey 数据类型为k-v,且是按照key进行排序
-
- val personRDD:RDD[Person] = sc.parallelize(List(
-
- Person(1, "吴轩宇", 19, 168),
-
- Person(2, "彭国宏", 18, 175),
-
- Person(3, "随国强", 18, 176),
-
- Person(4, "闫 磊", 20, 180),
-
- Person(5, "王静轶", 18, 168)
-
- ))
-
- personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))
-
- sc.stop()
-
- }
-
- }
-
- case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {
-
- //对学生的身高和年龄依次排序
-
- override def compare(that: Person) = {
-
- var ret = this.height.compareTo(that.height)
-
- if(ret == 0) {
-
- ret = this.age.compareTo(that.age)
-
- }
-
- ret
-
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。