赞
踩
数据倾斜:shuffle操作时,由于相同key会被分配到同一个reduce端执行,而大部分数据的key值相同,导致部分task处理的数据量过大,分配不均。
触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
查看导致数据倾斜的key的数据分布情况
根据执行操作的不同,可以有很多种查看key分布的方式:
1、如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
2、如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。
原理:将原本相同的key通过附加随机前缀的方式,变成多个不同key,就可以让原本被一个task处理的数据分散到多个task上做局部聚合,进行解决单个task处理数据量过多的问题。接着去除随机前缀,再次进行全局的聚合,就可以得到最终的结果。
object _01SparkDataSkewTwoStageOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) val sc = SparkUtil.sparkContext("local[2]", "_01SparkDataSkewTwoStageOps") val list = List( "hello you hello hello me", "hello you hello hello shit", "oh hello she study" ) val listRDD = sc.parallelize(list) val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1)) //step 1 找到发生数据倾斜key val sampleRDD = pairsRDD.sample(false, 0.6) val cbk= sampleRDD.countByKey() // cbkRDD.foreach(println) val sortedInfo = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2) val dataSkewKey = sortedInfo.head._1 // sortedInfo.foreach(println) println("发生了数据倾斜的Key:" + dataSkewKey) //step 2 给对应的key打上N以内的随机前缀 val prefixPairsRDD = pairsRDD.map{case (word, count) => { if(word.equals(dataSkewKey)) { val random = new Random() val prefix = random.nextInt(2)//0 1 (s"${prefix}_${word}", count) } else { (word, count) } }} prefixPairsRDD.foreach(println) //step 3 局部聚合 val partAggrInfo = prefixPairsRDD.reduceByKey(_+_) println("===============>局部聚合之后的结果:") partAggrInfo.foreach(println) //step 4 全局聚合 //step 4.1 去掉前缀 val unPrefixPairRDD = partAggrInfo.map{case (word, count) => { if(word.contains("_")) { (word.substring(word.indexOf("_") + 1), count) } else { (word, count) } }} println("================>去掉随机前缀之后的结果:") unPrefixPairRDD.foreach(println) // step 4.2 全局聚合 val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_) println("===============>全局聚合之后的结果:") fullAggrInfo.foreach(println) sc.stop() } }
先聚合小数据集,再聚合整体。
Post author: Tony Wang
Post link: https://www.yeah366.com/posts/3318546812.html
Copyright Notice: All articles in this blog are licensed under BY-NC-SA unless stating additionally.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。