当前位置:   article > 正文

Spark_SparkSQL / DataFrame 中 groupby 数据倾斜处理方法_spark sql 有set hive.groupby.skewindata = ture参数?

spark sql 有set hive.groupby.skewindata = ture参数?

 

数据倾斜,是一个有可能遇到的问题,Hive 中 groupby 数据倾斜, 已经有参数可以很好的支持了。Hive 参考文章

https://blog.csdn.net/u010003835/article/details/105495135

下面我们看下 SparkSQL 如何解决这种 GroupBy 类型的数据倾斜 

 

思路如下:

   其实是和 Hive 的 调优参数,将作业拆分为2个参数一样的。

  1. set hive.map.aggr=true;

  2. set hive.groupby.skewindata=true;

  增加一个JOB做随机分组聚合后, 再根据中间结果按照预先的Key聚合

 

具体做法

   1.根据 生成一个(0~10)随机数列

   2.根据随机数列,和预先要聚合的key进行聚合

  3.按照预先要聚合的key进行聚合

 

具体实现

具体的做法又有两种方式

1.

//方法一 : DataFrame Functions && UDF

2.

//方法二 : SQL + SQL Functions

实现代码

  1. package com.spark.test.offline.skewed_data
  2. import java.util.Random
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.sql.functions._
  5. import org.apache.spark.sql.types._
  6. import org.apache.spark.sql.{Row, SparkSession}
  7. /**
  8. * Created by szh on 2020/5/29.
  9. */
  10. object GroupBySkewedData {
  11. def main(args: Array[String]): Unit = {
  12. val sparkConf = new SparkConf
  13. sparkConf
  14. .setAppName("Union data test")
  15. .setMaster("local[1]")
  16. val spark = SparkSession.builder()
  17. .config(sparkConf)
  18. .getOrCreate()
  19. val sparkContext = spark.sparkContext
  20. sparkContext.setLogLevel("WARN")
  21. val arrayA = Array(
  22. (1, "mm", BigDecimal.valueOf(33.2))
  23. , (2, "cs", BigDecimal.valueOf(22.1))
  24. , (3, "cc", BigDecimal.valueOf(22.2))
  25. , (4, "px", BigDecimal.valueOf(22))
  26. , (5, "kk", BigDecimal.valueOf(22))
  27. , (2, "cs", BigDecimal.valueOf(22)))
  28. val rddA = sparkContext
  29. .parallelize(arrayA)
  30. .map(x => Row(x._1, x._2, x._3))
  31. // .parallelize(arrayA, 4)
  32. //parallelize 第二个参数实际指定了并行度
  33. println("rddA partition num :" + rddA.partitions.length)
  34. val rddAStruct = StructType(
  35. Array(
  36. StructField("uid", IntegerType, nullable = true)
  37. , StructField("name", StringType, nullable = true)
  38. , StructField("money", DecimalType.SYSTEM_DEFAULT, nullable = true)
  39. )
  40. )
  41. val rddADF = spark.createDataFrame(rddA, rddAStruct)
  42. rddADF.createOrReplaceTempView("tmpA")
  43. //定义UDF
  44. val rand = (arg: Int) => {
  45. val random = new Random()
  46. random.nextInt(10)
  47. }
  48. val randUdf = udf(rand)
  49. //方法一 : DataFrame Functions && UDF
  50. val midDF = rddADF.withColumn("salt", randUdf(rddADF("uid")))
  51. .groupBy("salt", "uid", "name")
  52. .agg(Map("money" -> "sum"))
  53. val resultDF = midDF
  54. .groupBy("uid", "name")
  55. .sum("sum(money)")
  56. .toDF("uid", "name", "total_money")
  57. println("resultDF's rdd partition num :" + resultDF.rdd.partitions.length)
  58. resultDF.explain()
  59. resultDF.show()
  60. System.out.println(" ")
  61. System.out.println(" ----------------------------- ")
  62. System.out.println(" ----------------------------- ")
  63. System.out.println(" ----------------------------- ")
  64. System.out.println(" ----------------------------- ")
  65. System.out.println(" ----------------------------- ")
  66. System.out.println(" ")
  67. //方法二 : SQL + SQL Functions
  68. spark
  69. .sql("SELECT uid, name, money, cast(rand() * 10 as int) as salt " +
  70. "FROM tmpA ")
  71. .createOrReplaceTempView("midResult")
  72. val resultDF2 = spark.sql("" +
  73. "SELECT uid, name, SUM(mid_money) AS total_money " +
  74. "FROM ( " +
  75. " SELECT uid, name, salt, SUM(money) AS mid_money " +
  76. " FROM midResult " +
  77. " GROUP BY uid, name, salt " +
  78. " ) AS tmp " +
  79. "GROUP BY uid, name "
  80. )
  81. resultDF2.explain()
  82. resultDF2.show()
  83. Thread.sleep(60 * 10 * 1000)
  84. sparkContext.stop()
  85. }
  86. }

 

输出

  1. rddA partition num :1
  2. resultDF's rdd partition num :200
  3. == Physical Plan ==
  4. *HashAggregate(keys=[uid#3, name#4], functions=[sum(sum(money)#22)])
  5. +- Exchange hashpartitioning(uid#3, name#4, 200)
  6. +- *HashAggregate(keys=[uid#3, name#4], functions=[partial_sum(sum(money)#22)])
  7. +- *HashAggregate(keys=[salt#11, uid#3, name#4], functions=[sum(money#5)])
  8. +- Exchange hashpartitioning(salt#11, uid#3, name#4, 200)
  9. +- *HashAggregate(keys=[salt#11, uid#3, name#4], functions=[partial_sum(money#5)])
  10. +- *Project [uid#3, name#4, money#5, if (isnull(uid#3)) null else UDF(uid#3) AS salt#11]
  11. +- Scan ExistingRDD[uid#3,name#4,money#5]
  12. +---+----+--------------------+
  13. |uid|name| total_money|
  14. +---+----+--------------------+
  15. | 3| cc|22.20000000000000...|
  16. | 4| px|22.00000000000000...|
  17. | 1| mm|33.20000000000000...|
  18. | 2| cs|44.10000000000000...|
  19. | 5| kk|22.00000000000000...|
  20. +---+----+--------------------+
  21. -----------------------------
  22. -----------------------------
  23. -----------------------------
  24. -----------------------------
  25. -----------------------------
  26. == Physical Plan ==
  27. *HashAggregate(keys=[uid#3, name#4], functions=[sum(mid_money#65)])
  28. +- Exchange hashpartitioning(uid#3, name#4, 200)
  29. +- *HashAggregate(keys=[uid#3, name#4], functions=[partial_sum(mid_money#65)])
  30. +- *HashAggregate(keys=[uid#3, name#4, salt#58], functions=[sum(money#5)])
  31. +- Exchange hashpartitioning(uid#3, name#4, salt#58, 200)
  32. +- *HashAggregate(keys=[uid#3, name#4, salt#58], functions=[partial_sum(money#5)])
  33. +- *Project [uid#3, name#4, money#5, cast((rand(-7591047829286253872) * 10.0) as int) AS salt#58]
  34. +- Scan ExistingRDD[uid#3,name#4,money#5]
  35. +---+----+--------------------+
  36. |uid|name| total_money|
  37. +---+----+--------------------+
  38. | 3| cc|22.20000000000000...|
  39. | 4| px|22.00000000000000...|
  40. | 1| mm|33.20000000000000...|
  41. | 2| cs|44.10000000000000...|
  42. | 5| kk|22.00000000000000...|
  43. +---+----+--------------------+

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/975946
推荐阅读
相关标签
  

闽ICP备14008679号