赞
踩
我们知道Task是Spark计算的最小计算单位,一个Partition(分区)对应一个Task,因此Partition个数也是决定RDD并行计算的关键,合理设置Partition个数能够极大的提高Spark程序执行效率。首先我们看下RDD、Partition和task的关系如下图:
那Spark中分区个数是如何确定的呢?当发生shuffle时候,子RDD的分区个数又是如何确定的呢?
我们知道默认分区个数是通过spark.default.parallelism
参数控制的,我们结合该参数看在Spark中如何起作用的。
我们分别以窄依赖、宽依赖和源RDD等分别介绍。(以下代码以Spark2.4.3版本为准)
Spark中的分区器都会继承Partitioner(注意区别Partition),其是一个抽象类,位于 org.apache.spark.Partitioner 中,有两个接口方法:
Spark在Partitioner类的伴生类中也实现了一个默认分区器,如下图:
分析见代码注释,可以重点关注spark.default.parallelism
配置参数和父RDD最大分区数如何参与运算,最终可以得出:如果存在大于0的父RDD且父RDD的最大分区数大于默认分区数,则分区取该父RDD的分区;否则新建一个默认分区数的HashPartitioner分区。
除此默认分区器之外,Spark实现了几个系统分区器,他们都继承至Partitioner,如下图:
如果Spark提供默认分区器和系统分区器不能满足需要,用户也可以继承Partitioner实现自定义分区器,下面举例一个简单例子:
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
if(key == 1){
0
} else if (key == 2){
1
} else{
2
}
}
}
上图是窄依赖,通过分析源码会发现map、flatMap和filter等常用算子,最后都会返回了MapPartitionsRDD对象,不同的仅仅是传入的function不同而已。我们分析其分区源码如下:
如上图,子RDD直接获得父RDD的分区,因此:生成MapPartitionsRDD对应的算子的子RDD分区与父RDD分区是一致的。
针对union算子,最后返回的是UnionRDD对象,分析其分区源码如下:
如上图可知,生成UnionRDD对象的算子子RDD分区数是父RDD分区数之和。
上图是宽依赖,我们知道,宽依赖一般是发生shuffle的RDD,其中 子RDD分区数是由分区器决定的,分区器包含:默认分区器、系统分区器和自定义分区器。 首先我们看默认分区器RDD,默认分区器的实现在defaultPartitioner
函数中(见上节,即:如果存在大于0的父RDD且父RDD的最大分区数大于默认分区数,则分区取该父RDD的分区;否则新建一个默认分区数的HashPartitioner分区),默认分区器一般用于哪些RDD中,如下图:
我们以reduceByKey
函数源码详细看如何使用
如上图,调用reduceByKey函数时,针对不同的参数调用不同重载函数:
testRDD.reduceByKey(func)
testRDD.reduceByKey(func, 3)
testRDD.reduceByKey(new CustomPartitioner(3), func)
上面介绍宽窄依赖都涉及到父RDD的分区,那最源头的RDD如何确定分区的呢?我们知道源头RDD一般都是读取加载各种数据源的数据, 分析源码可以发现Spark对接不同的数据源,得到的分区数是不一样的,我们重点分析加载hdfs文件的源RDD(以sc.textFile("hdfs://xx/test.txt")
为例),最终会生成HadoopRDD,如下图:
在调用textFile函数时候,如果没有传入minPartitions,则取默认的defaultMinPartitions,从上图右面代码可以看出其最大值为2,此值参与hdfs分片大小的计算(先忽略下面再介绍)。我们看 org.apache.spark.rdd.HadoopRDD ,重点看其getPartitions
函数:
在hdfs中,block是物理存储概念,split是逻辑概念,hdfs文件的读写是基于split的,从上面代码分析可看出,读取hdfs文件划分了多少个split就会产生多少个Partition,那么分析的关键就是产生可多少split,对应的代码是val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
,我们继续分析getSplit函数源码(此源码属于Hadoop源码部分)如下:
分析过程见注释,可以得出如下几点结论:
参考:https://www.lagou.com/lgeduarticle/70041.html
重新分区可以通过repartition算子实现,其主要是通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀,此操作会进行shuffle。我们看其源码如下图:
可以看出repartition函数最终会调用coalesce函数,并设置shuffle参数为true,也就是说分区数无论是增加还是减少都会执行shuffle操作。继续分析coalesce函数可知,首先会对每个item随机生成key值,然后使用HashPartitioner分区器进行shuffle分区,最终实现数据的均匀分散。
适用方法示例为:testRDD.repartition(24)
repartition算子适用的场景包括:通过新增分区扩大并行计算能力,通过均匀打散特性解决数据倾斜和通过合并分区降低下游数据处理的并发量等。
testRDD.groupByKey(24);
testRDD.groupByKey(new CustomPartitioner(3));
testRDD.repartition(24);
val conf = new SparkConf();
conf.set("spark.default.parallelism", 24);
$SPARK_HOME/conf/spark-defaults.conf
文件中配置spark.default.parallelism
的值,优先级最低。spark.default.parallelism 24
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。