赞
踩
在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Sgark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。
RDD的分区原则是分区的个数尽星等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism(配置位置在:spark-env.sh)这个参数值来配置默认的分区数目。一般而言,各种模式下的默认分区数目如下:
●Local模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N。
●Standalone或者Yarn模式:在“集群中所有CPU核数总和”和"2”这两者中取较大值作为默认值。
●Mesos模式:默认的分区数是8。
Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。
与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。
需要注意的是,RDD的分区函数是针对(Key,Value)类型的RDD,分区函数根据Key对RDD元素进行分区。因此,当需要对一些非(Key,Value)类型的RDD进行自定义分区时,需要先把RDD元素转换为(Key,Value)类型,再通过分区函数进行分区操作。
如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org.apache.spark.Partitioner类,并实现其中的3个方法,具体如下:
(1) .def numPartitions:Int:用于返回创建的分区个数。
(2) .def getPartition(Key:Any):用于对输入的Key做处理,并返回该Key的分区ID,分区ID的范围是0~numPartitions-1。
(3) .equals(other:Any):用于Spark判断自定义的Partitioner对象和其他的Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。
在Spark中,不同的RDD之间具有依赖的关系。RDD与它所依赖的RDD的依赖关系有两种类型,分别是窄依赖(narrow dependency)和宽依赖(wide dependency) 。
窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。窄依赖的表现一般分为两类,第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区。也就是说,一个父RDD的一个分区不可能对应一个子RDD的多个分区。为了便于理解,我们通常把窄依赖形象的比喻为独生子女。当RDD执行map、filter及union和join操作时,都会产生窄依赖。
RDD做map、filter和union算子操作时,是属于窄依赖的第一类表现;而RDD做join算子操作(对输入进行协同划分)时,是属于窄依赖表现的第二类。这里的输入协同划分是指多个父RDD的某一个分区的所有Key,被划分到子RDD的同一分区,而不是指同一个父RDD的某一个分区,被划分到子RDD的两个分区中。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,只需要重新对父RDD中对应的分区(与子RDD相对应的分区)做算子操作即可恢复数据。
宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即OneToManyDependecies。为了便于理解,我们通常把宽依赖形象的比喻为超生。当RDD做groupByKey和join操作时,会产生宽依赖,如图所示。
父RDD做groupByKey和join(输入未协同划分)算子操作时,子RDD的每一个分区都会依赖于所有父RDD的所有分区。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,则需要重新对父RDD中的所有分区进行算子操作才能恢复数据。
需要注意的是,join算子操作既可以属于窄依赖,也可以属于宽依赖。当join算子操作后,分区数量没有变化则为窄依赖(如join with inputs co-partitioned,输入协同划分)﹔当join算子操作后,分区数量发生变化则为宽依赖(如join with inputs not co-partitioned,输入非协同划分)。
Spark为RDD提供了两个重要的机制,分别是持久化机制(即缓存机制)和容错机制。接下来,本节将针对持久化机制和容错机制进行详细介绍。I
在Spark中,RDD是采用惰性求值,即每次调用行动算子操作,都会从头开始计算。然而,每次调用行动算子操作,都会触发一次从头开始的计算,这对于迭代计算来说,代价是很大的,因为迭代计算经常需要多次重复的使用同一组数据集,所以,为了避免重复计算的开销,可以让Spark
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。