当前位置:   article > 正文

Spark性能优化——数据倾斜-groupByKey导致内存溢出_sparksql和空表join会导致内存溢出吗

sparksql和空表join会导致内存溢出吗

数据倾斜shuffle操作时,由于相同key会被分配到同一个reduce端执行,而大部分数据的key值相同,导致部分task处理的数据量过大,分配不均。

触发shuffle的常见算子distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

查看导致数据倾斜的key的数据分布情况

根据执行操作的不同,可以有很多种查看key分布的方式:

  • 1、如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。

  • 2、如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。

原理:将原本相同的key通过附加随机前缀的方式,变成多个不同key,就可以让原本被一个task处理的数据分散到多个task上做局部聚合,进行解决单个task处理数据量过多的问题。接着去除随机前缀,再次进行全局的聚合,就可以得到最终的结果。

object _01SparkDataSkewTwoStageOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val sc = SparkUtil.sparkContext("local[2]", "_01SparkDataSkewTwoStageOps")
        val list = List(
            "hello you hello hello me",
            "hello you hello hello shit",
            "oh hello she study"
        )

        val listRDD = sc.parallelize(list)

        val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1))
        //step 1 找到发生数据倾斜key
        val sampleRDD = pairsRDD.sample(false, 0.6)
        val cbk= sampleRDD.countByKey()
        // cbkRDD.foreach(println)
        val sortedInfo = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2)
        val dataSkewKey = sortedInfo.head._1
        // sortedInfo.foreach(println)
        println("发生了数据倾斜的Key:" + dataSkewKey)
        //step 2 给对应的key打上N以内的随机前缀
        val prefixPairsRDD = pairsRDD.map{case (word, count) => {
            if(word.equals(dataSkewKey)) {
                val random = new Random()
                val prefix = random.nextInt(2)//0 1
                (s"${prefix}_${word}", count)
            } else {
                (word, count)
            }
        }}
        prefixPairsRDD.foreach(println)
        //step 3 局部聚合
        val partAggrInfo = prefixPairsRDD.reduceByKey(_+_)
        println("===============>局部聚合之后的结果:")
        partAggrInfo.foreach(println)
        //step 4 全局聚合
        //step 4.1 去掉前缀
        val unPrefixPairRDD = partAggrInfo.map{case (word, count) => {
            if(word.contains("_")) {
                (word.substring(word.indexOf("_") + 1), count)
            } else {
                (word, count)
            }
        }}
        println("================>去掉随机前缀之后的结果:")
        unPrefixPairRDD.foreach(println)
        // step 4.2 全局聚合
        val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_)
        println("===============>全局聚合之后的结果:")
        fullAggrInfo.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

先聚合小数据集,再聚合整体。

Post author: Tony Wang
Post link: https://www.yeah366.com/posts/3318546812.html
Copyright Notice: All articles in this blog are licensed under BY-NC-SA unless stating additionally.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/975999
推荐阅读
相关标签
  

闽ICP备14008679号