赞
踩
val inputRDD = sc.parallelize(Array[(Int,Char)] ((1, 'a'),(2,'b'),(3,'C') (4,'d'),(2,'e'),(3,'f'),(2,'g'),(1, 'h')),3)
val resultRDD = inputRDD.partitionBy(new HashPartitioner (2))//使用HashPartitioner重新分区
val resultRDD = inputRDD.partitionBy(new RangePartitioner(2,inputRDD))//使用RangePartitioner重新分区
用来改变分区数,根据随机生成的key,使用随机策略均匀的分布数据,只能传入分区数,不能指定partitioner
val sc = new SparkContext()
val inputRDD = sc.parallelize(Array[(Int, Char)]((3, 'c'), (3, 'f'), (1, 'a'), (4, 'd'), (1, 'h'), (2, 'b'), (5, 'e'), (2, 'g')), 5)
var coalesceRDD = inputRDD.coalesce(2) //图3.19中的第1个图
coalesceRDD = inputRDD.coalesce(6) //图3.19中的第2个图
coalesceRDD = inputRDD.coalesce(2, true) // 图3.19中的第3个图
coalesceRDD = inputRDD.coalesce(6, true) //图3.19中的第4个图
随机
将数据打乱,从而使得生成的RDD中每个分区中的数据比较均衡。具体采用的方法是为rdd1中的每个record添加一个特殊的Key,如第3个图中的MapPartitionsRDD,Key是 Int类型,并从[0, numPartitions)中随机生成
,如<3,f >=><2,(3,f)>
中,2是随机生成的Key,接下来的record的Key递增1,如<1,a> =><3,(1,a)>
。这样,Spark可以根据Key的 Hash值将rdd1中的数据分发到rdd2的不同的分区中,然后去掉Key即可(见最后的 MapPartitionsRDD)。repartition(partitionNums): Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
中文:通过创建更多或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀。这个操作经常是通过网络进行数打散。
从设计的角度上来说,repartition 是用来让数据更均匀分布的
语义上的区别:repartition = coalesce(numPartitions,true)
coalesce算子默认只能减少分区数量,如果设置为false且参数大于调用RDD的分区数,那调用RDD的分区数不会变化。
coalesce的作用常常是减少分区数,已达到输出时合并小文件的效果。减少分区数有2种情况:
repartition 返回一定一个parNum个分区的RDD,一定会shuffle,一般用这个就是为了增加分区数,提高并行度!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。