当前位置:   article > 正文

Spark源码分析之分区(Partition)_spark getpartitions 是什么时候调用的

spark getpartitions 是什么时候调用的

概述

我们知道Task是Spark计算的最小计算单位,一个Partition(分区)对应一个Task,因此Partition个数也是决定RDD并行计算的关键,合理设置Partition个数能够极大的提高Spark程序执行效率。首先我们看下RDD、Partition和task的关系如下图:
在这里插入图片描述
那Spark中分区个数是如何确定的呢?当发生shuffle时候,子RDD的分区个数又是如何确定的呢?
我们知道默认分区个数是通过spark.default.parallelism参数控制的,我们结合该参数看在Spark中如何起作用的。
我们分别以窄依赖、宽依赖和源RDD等分别介绍。(以下代码以Spark2.4.3版本为准)

Spark的分区器(Partitioner)

Spark中的分区器都会继承Partitioner(注意区别Partition),其是一个抽象类,位于 org.apache.spark.Partitioner 中,有两个接口方法:
在这里插入图片描述
Spark在Partitioner类的伴生类中也实现了一个默认分区器,如下图:
在这里插入图片描述
分析见代码注释,可以重点关注spark.default.parallelism配置参数和父RDD最大分区数如何参与运算,最终可以得出:如果存在大于0的父RDD且父RDD的最大分区数大于默认分区数,则分区取该父RDD的分区;否则新建一个默认分区数的HashPartitioner分区。
除此默认分区器之外,Spark实现了几个系统分区器,他们都继承至Partitioner,如下图:
在这里插入图片描述

  • HashPartitioner:一般是默认分区器,分析源码可知是按key求取hash值,再对hash值除以分区个数取余,如果余数<0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。
  • RangePartitioner:由于HashPartitioner根据key值hash取模方法可能导致每个分区中数据量不均匀,RangePartitioner则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。参考:https://www.iteblog.com/archives/1522.html
  • GridPartitioner:一个网格Partitioner,采用了规则的网格划分坐标,numPartitions等于行和列之积,一般用于mlib中。
  • PartitionIdPassthrough:一个虚拟Partitioner,用于已计算好分区的记录,例如:在(Int, Row)对的RDD上使用,其中Int就是分区id。
  • CoalescedPartitioner:把父分区映射为新的分区,例如:父分区个数为5,映射后的分区起始索引为[0,2,4],则映射后的新的分区为[[0, 1], [2, 3], [4]]
  • PythonPartitioner:提供给Python Api的分区器

如果Spark提供默认分区器和系统分区器不能满足需要,用户也可以继承Partitioner实现自定义分区器,下面举例一个简单例子:

class CustomPartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    if(key == 1){
      0
    } else if (key == 2){
      1
    } else{
      2 
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

RDD分区数确认

窄依赖中分区数

在这里插入图片描述
上图是窄依赖,通过分析源码会发现map、flatMap和filter等常用算子,最后都会返回了MapPartitionsRDD对象,不同的仅仅是传入的function不同而已。我们分析其分区源码如下:
在这里插入图片描述
如上图,子RDD直接获得父RDD的分区,因此:生成MapPartitionsRDD对应的算子的子RDD分区与父RDD分区是一致的。

针对union算子,最后返回的是UnionRDD对象,分析其分区源码如下:
在这里插入图片描述
如上图可知,生成UnionRDD对象的算子子RDD分区数是父RDD分区数之和。

宽依赖中分区数

在这里插入图片描述
上图是宽依赖,我们知道,宽依赖一般是发生shuffle的RDD,其中 子RDD分区数是由分区器决定的,分区器包含:默认分区器、系统分区器和自定义分区器。 首先我们看默认分区器RDD,默认分区器的实现在defaultPartitioner函数中(见上节,即:如果存在大于0的父RDD且父RDD的最大分区数大于默认分区数,则分区取该父RDD的分区;否则新建一个默认分区数的HashPartitioner分区),默认分区器一般用于哪些RDD中,如下图:
在这里插入图片描述
我们以reduceByKey函数源码详细看如何使用
在这里插入图片描述
如上图,调用reduceByKey函数时,针对不同的参数调用不同重载函数:

  • 无分区器或分区数参数,则取默认分区器,例如:testRDD.reduceByKey(func)
  • 有分区数参数,则新建HashPartitioner分区器,例如:testRDD.reduceByKey(func, 3)
  • 有分区器参数,则直接使用参数分区器,参数提供的分区器可以是系统分区器也可以是自定义分区器。例如:testRDD.reduceByKey(new CustomPartitioner(3), func)

源RDD的分区数

上面介绍宽窄依赖都涉及到父RDD的分区,那最源头的RDD如何确定分区的呢?我们知道源头RDD一般都是读取加载各种数据源的数据, 分析源码可以发现Spark对接不同的数据源,得到的分区数是不一样的,我们重点分析加载hdfs文件的源RDD(以sc.textFile("hdfs://xx/test.txt")为例),最终会生成HadoopRDD,如下图:
在这里插入图片描述
在调用textFile函数时候,如果没有传入minPartitions,则取默认的defaultMinPartitions,从上图右面代码可以看出其最大值为2,此值参与hdfs分片大小的计算(先忽略下面再介绍)。我们看 org.apache.spark.rdd.HadoopRDD ,重点看其getPartitions函数:
在这里插入图片描述
在hdfs中,block是物理存储概念,split是逻辑概念,hdfs文件的读写是基于split的,从上面代码分析可看出,读取hdfs文件划分了多少个split就会产生多少个Partition,那么分析的关键就是产生可多少split,对应的代码是val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions),我们继续分析getSplit函数源码(此源码属于Hadoop源码部分)如下:
在这里插入图片描述
分析过程见注释,可以得出如下几点结论:

  • 1、文件是否可分割是指hdfs存储的文件格式是text等,则可分割;而如果是一些压缩格式(例如orc等),则整块block不可分割;
  • 2、如果hdfs文件是不可分割的,那么RDD的分区数与该文件的block数量保持一致;如果可分割,那么RDD的分区数大于等于block数量;
  • 3、根据spark加载hdfs文件的代码分析,它只会把一个文件分得越来越小,而不会对小文件采取合并(小文件较多则会导致rdd产生更多的分区,进而影响性能);

参考:https://www.lagou.com/lgeduarticle/70041.html

RDD的重新分区

重新分区可以通过repartition算子实现,其主要是通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀,此操作会进行shuffle。我们看其源码如下图:
在这里插入图片描述
可以看出repartition函数最终会调用coalesce函数,并设置shuffle参数为true,也就是说分区数无论是增加还是减少都会执行shuffle操作。继续分析coalesce函数可知,首先会对每个item随机生成key值,然后使用HashPartitioner分区器进行shuffle分区,最终实现数据的均匀分散。
适用方法示例为:testRDD.repartition(24)
repartition算子适用的场景包括:通过新增分区扩大并行计算能力,通过均匀打散特性解决数据倾斜和通过合并分区降低下游数据处理的并发量等。

Spark分区编程示例

  • 产生shuffle的操作函数内设置并行度参数,优先级最高。
testRDD.groupByKey(24);
testRDD.groupByKey(new CustomPartitioner(3));
testRDD.repartition(24);
  • 1
  • 2
  • 3
  • 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。
val conf = new SparkConf();
conf.set("spark.default.parallelism", 24);
  • 1
  • 2
  • $SPARK_HOME/conf/spark-defaults.conf文件中配置spark.default.parallelism的值,优先级最低。
spark.default.parallelism 24
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/727465
推荐阅读
相关标签
  

闽ICP备14008679号