当前位置:   article > 正文

hashpartitioner-Spark分区计算器

spark hashpartition

首先,我们回顾的知识点是RDD的五大特性:

1,一系列的分区。

2,一个函数作用于分区上。

3,RDD之间有一系列的依赖。

4,分区器。

5,最佳位置。

Spark属于链式计算,rdd之间有着依赖关系:窄依赖,宽依赖。

RDD执行的时候会将计算链条分为很多task,rdd的task分为:ResultTask和ShuffleMapTask。

1.Partitioner简介

书归正传,RDD之间的依赖如果是宽依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下:

 
 
  1. abstract class Partitioner extends Serializable {
  2. def numPartitions: Int
  3. def getPartition(key: Any): Int
  4. }

Partitioner的numPartitions方法用于获取分区数量。Partitioner的getPartition方法用于将输入的key映射到下游的RDD的从0到numPartitions-1这个范围中的某一个分区中去。

Partitioner根据不同的需求有着具体的实现类,在idea打开源码,在该抽象类上按下F4键,可以看到继承关系,如下图:

640?wx_fmt=png

本分区系列,会将CoalescedPartitioner,GridPartitioner,HashPartitioner,RangePartitioner及自定义分区器逐个介绍。本文重点在hashPartitioner。

2.HashPartitioner

首先,我们先看HashPartitioner的源码实现。

 
 
  1. class HashPartitioner(partitions: Int) extends Partitioner {
  2. require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  3. def numPartitions: Int = partitions
  4. def getPartition(key: Any): Int = key match {
  5. case null => 0
  6. case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  7. }
  8. override def equals(other: Any): Boolean = other match {
  9. case h: HashPartitioner =>
  10. h.numPartitions == numPartitions
  11. case _ =>
  12. false
  13. }
  14. override def hashCode: Int = numPartitions
  15. }

根据上面代码我们可以看到,其传入的参数partitions,决定总的分区数,重写的numPartitions方法也只是简单返回该值。重写的getPartition方法实际上是以key的hashcode和numPartitions作为参数调用了Utils工具类的nonNegativeMod方法,该方法的具体实现如下:

 
 
  1. def nonNegativeMod(x: Int, mod: Int): Int = {
  2. val rawMod = x % mod
  3. rawMod + (if (rawMod < 0) mod else 0)
  4. }

nonNegativeMod方法将对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。使用哈希和取模的方式,可以方便地计算出下游RDD的各个分区将具体处理哪些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。

根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系并不再是一对一的,而取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependcy的RDD的任何一个分区。我们分析的内容可以作图如下:

640?wx_fmt=png

3.用HashPartitioner的RDD算子

举几个常见的使用HashPartitioner的例子。

Reducebykey

 
 
  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  2. reduceByKey(defaultPartitioner(self), func)
  3. }

aggregateByKey

 
 
  1. def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
  2. combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  3. aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
  4. }

join

 
 
  1. def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
  2. join(other, defaultPartitioner(self, other))
  3. }

4.简单介绍defaultPartitioner

简单看一一下,上面代码片段提到的defaultPartitioner方法,该方法的源码如下:

 
 
  1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
  2. val rdds = (Seq(rdd) ++ others)
  3. val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
  4. val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
  5. Some(hasPartitioner.maxBy(_.partitions.length))
  6. } else {
  7. None
  8. }
  9. val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
  10. rdd.context.defaultParallelism
  11. } else {
  12. rdds.map(_.partitions.length).max
  13. }
  14. // If the existing max partitioner is an eligible one, or its partitions number is larger
  15. // than the default number of partitions, use the existing partitioner.
  16. if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
  17. defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
  18. hasMaxPartitioner.get.partitioner.get
  19. } else {
  20. new HashPartitioner(defaultNumPartitions)
  21. }
  22. }

该方法首先,会获取带分区数大于零的RDD,然后假如不为空,就采用分区数最大的RDD的分区器当做得到的分区器返回。

假如,都没有分区器,就会默认给定一个HashPartitioner分区器,前面我们也说到了HashPartitioner分区构建的时候要传入一个分区数的参数。这里获取分区数的方式,首先是判断是否设置了spark.default.parallelism参数,假如有的话,可以对rdd.context.defaultParallelism进行追述,最终假如是集群模式调用的是CoarseGrainedSchedulerBackend的下面方法:

 
 
  1. override def defaultParallelism(): Int = {
  2. conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  3. }

结果是假如设定了该参数,采用我们的设定值。没设定的话总core数和2取最大值作为分区数。

后续文章:

RangePartitioner,CoalescedPartitioner,HashPartitioner及自定义分区器

推荐阅读:

Spark源码系列之foreach和foreachPartition的区别

大数据基础系列之提交spark应用及依赖管理

金融反欺诈场景下的Spark实践

Scala语言基础之结合demo和spark讲实现链式计算

640?wx_fmt=png

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/727512
推荐阅读
相关标签
  

闽ICP备14008679号