赞
踩
在Spark中分区器直接决定了RDD中分区的个数;也决定了RDD中每条数据经过Shuffle过程属于哪个分区;也决定了Reduce的个数。这三点看起来是不同的方面的,但其深层的含义是一致的。
我们需要注意的是,只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None的。
注:有的时候,HashPartitioner存在 分区碰撞问题,即不同的值可能计算出来的分区是一样的 key的hashcode % reduce个数.(例如:java.itcast.cn与php.itcast.cn的hashcode值就是一样的,当然这是小概率事件,但是有的时候还真的是会发生的),所以有的时候需要我们自己实现分区程序!在spark中实现自定义的分区只需要实现partitioner trait并实现里面的方法!
当然:在Spark中,存在两类分区函数:HashPartitioner和RangePartitioner,它们都是继承自Partitioner,主要提供了每个RDD有几个分区(numPartitions)以及对于给定的值返回一个分区ID(0~numPartitions-1),也就是决定这个值是属于那个分区的。
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下:
01 |
/ |
02 |
User : 过往记忆 |
03 |
Date : 2015 - 11 - 10 |
04 |
Time : 06 : 59 |
05 |
bolg : http : //www.iteblog.com |
06 |
本文地址:http : //www.iteblog.com/archives/1522 |
07 |
过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
08 |
过往记忆博客微信公共帐号:iteblog _ hadoop |
09 |
/ |
10 |
11 |
class HashPartitioner(partitions : Int) extends Partitioner {
|
12 |
require(partitions > = 0 , s "Number of partitions ($partitions) cannot be negative." ) |
13 |
14 |
def numPartitions : Int = partitions |
15 |
16 |
def getPartition(key : Any) : Int = key match {
|
17 |
case null = > 0 |
18 |
case _ = > Utils.nonNegativeMod(key.hashCode, numPartitions) |
19 |
} |
20 |
21 |
override def equals(other : Any) : Boolean = other match {
|
22 |
case h : HashPartitioner = > |
23 |
h.numPartitions == numPartitions |
24 |
case _ = > |
25 |
false |
26 |
} |
27 |
28 |
override def hashCode : Int = numPartitions |
29 |
} |
从HashPartitioner分区的实现原理我们可以看出,其结果可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,这显然不是我们需要的。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
前面讨论过,RangePartitioner分区器的主要作用就是将一定范围内的数映射到某一个分区内,所以它的实现中分界的算法尤为重要。这个算法对应的函数是rangeBounds。这个函数主要经历了两个过程:以Spark 1.1版本为界,Spark 1.1版本社区对rangeBounds函数进行了一次重大的重构。
因为在Spark 1.1版本之前,RangePartitioner分区对整个数据集进行了2次的扫描:一次是计算RDD中元素的个数;一次是进行采样。具体的代码如下:
01 |
// An array of upper bounds for the first (partitions - 1) partitions |
02 |
private val rangeBounds : Array[K] = {
|
03 |
if (partitions == 1 ) {
|
04 |
Array() |
05 |
} else {
|
06 |
val rddSize = rdd.count() |
07 |
val maxSampleSize = partitions * 20.0 |
08 |
val frac = math.min(maxSampleSize / math.max(rddSize, 1 ), |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。