赞
踩
分区只不过是将原来大的数据分成几部分。 比如分布式系统中的分区,我们可以将其定义为大型数据集的分区,并将它们存储为整个群集中的多个部分。
通过分区可以减少网络I/O,从而可以更快地处理数据。在Spark中,co-group
,groupBy
,groupByKey
等操作需要大量的I/O操作。 在这种情况下,如果我们应用分区,那么我们可以快速减少I/O操作的数量,以便我们可以加速数据处理。
Spark适用于数据局部性原则。 工作节点获取更接近它们的处理数据。 通过分区网络,I/O将减少,从而可以更快地处理数据。
同时,Spark可以为RDD的每个分区运行1个并发任务(最多为集群中的核心数)。
Spark有两种类型的分区技术。 一个是HashPartitioner,另一个是RangePartitioner。
key-value pair RDD中的元素是按key的值分进行分区的。
每个key都会被映射到对应的分区ID,从0到numPartitions - 1
。
分区程序必须是确定性的,即它必须在给定相同分区键的情况下返回相同的分区ID。
如果设置了spark.default.parallelism,我们将使用SparkContext defaultParallelism的值作为默认分区号,否则我们将使用上游分区的最大数量。
一般来说,我们尽量从rdds中选择具有最大分区数的分区程序。 如果此分区符合条件(rdds中最大分区数量级别内的分区数),或者分区数大于或等于默认分区数,则使用此分区程序。
否则,我们将使用具有默认分区数的新HashPartitioner。
除非设置了spark.default.parallelism,否则分区数将与最大上游RDD中的分区数相同,因为这种配置最保险, 可以避免导致内存不足。
HashPartitioner,它使用Java的Object.hashCode
实现基于散列的分区。
hashcode()的概念是相等的对象应该具有相同的哈希码。 因此,基于此hashcode()概念,HashPartitioner将划分具有相同hashcode()的键到同一分区。
HashPartitioner是Spark的默认分区程序。 如果我们没有配置任何分区器,那么Spark将使用这个散列分区器来对数据进去分区。
注意
Java数组具有基于数组的身份而不是其内容的hashCode,因此尝试使用HashPartitioner对RDD [Array [_]]或RDD[(Array [_],_)]进行分区将产生意外或不正确的结果。
scala中的示例:
scala> import org.apache.spark.HashPartitioner
scala> val textFile = sc.textFile("file:///Users/lestat/Desktop/test.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:/Users/lestat/Desktop/test.txt MapPartitionsRDD[13] at
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。