赞
踩
在Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若干个Partition组成。在Job运行期间,参与运算的Partition数据分布在多台机器的内存当中。这里可将RDD看成一个非常大的数组,其中Partition是数组中的每个元素,并且这些元素分布在多台机器中。图一中,RDD1包含了5个Partition,RDD2包含了3个Partition,这些Partition分布在4个节点中。
Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)。一般而言,对于初始读入的数据是不具有任何的数据分区方式的。数据分区方式只作用于<Key,Value>形式的数据。因此,当一个Job包含Shuffle操作类型的算子时,如groupByKey,reduceByKey etc,此时就会使用数据分区方式来对数据进行分区,即确定某一个Key对应的键值对数据分配到哪一个Partition中。在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task对数据进行处理产生中间数据,然后再根据数据分区方式对中间数据进行分区。最终Shffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据。图2中描述了Shuffle阶段与Partition关系。下面则分别介绍Spark中存在的两种数据分区方式。
HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。图3简单描述了HashPartitioner的数据分区过程。
Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。图4简单描述了RangePartitioner的数据分区过程。
1) Spark分布式程序中网络传输的通信代价很大,所以为了较少传输开销,需要控制RDD分区,和单节点的程序需要选择使用合适的数据结构一样,Spark程序需要选择合适的分区方式
2) 只有数据集是基于键时,分区才会有用,Spark可以确保同一个组的键出现在同一个节点上,比如使用键的哈希值做模运算
3) 如果不使用分区partitionBy(),则每次调用Join()等函数都对从来不会变化的数据重新进行哈希值计算和跨节点数据清洗,效率低。
4) sortByKey()可以使用RangePartitioner分区,groupByKey()可以使用HashPartitioner分区
cogroup()/groupWith()/join()/leftOuterJoin()/rightOuterJoin()/groupByKey()/reduceByKey()/combineByKey()/lookup()
根据键跨节点进行数据混洗的操作,都会从分区获益。尤其时对于 join()和cogroup()这种操作两个数据集的操作,如果事先分区,则其中一个分区不发生混洗。
1) Spark提供mapValues()和flatMapValues()两种操作保证每个二元组的键保持不变,使得转化操作的结果可以按照已知的方式分区,只是使用map()或者flatMap()可能会改变键
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。