当前位置:   article > 正文

Spark(三)-- SparkSQL扩展(数据操作) -- 聚合(四)_relationalgroupeddataset

relationalgroupeddataset

10. 聚合

导读

  1. groupBy

  2. rollup

  3. cube

  4. pivot

  5. RelationalGroupedDataset 上的聚合操作

10.1 groupBy

groupBy 算子会按照列将 Dataset 分组, 并返回一个 RelationalGroupedDataset 对象, 通过 RelationalGroupedDataset 可以对分组进行聚合

Step 1: 加载实验数据

  1. private val spark = SparkSession.builder()
  2. .master("local[6]")
  3. .appName("aggregation")
  4. .getOrCreate()
  5. import spark.implicits._
  6. private val schema = StructType(
  7. List(
  8. StructField("id", IntegerType),
  9. StructField("year", IntegerType),
  10. StructField("month", IntegerType),
  11. StructField("day", IntegerType),
  12. StructField("hour", IntegerType),
  13. StructField("season", IntegerType),
  14. StructField("pm", DoubleType)
  15. )
  16. )
  17. private val pmDF = spark.read
  18. .schema(schema)
  19. .option("header", value = true)
  20. .csv("dataset/pm_without_null.csv")

Step 2: 使用 functions 函数进行聚合

  1. import org.apache.spark.sql.functions._
  2. val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)
  3. groupedDF.agg(avg('pm) as "pm_avg")
  4. .orderBy('pm_avg)
  5. .show()

Step 3: 除了使用 functions 进行聚合, 还可以直接使用 RelationalGroupedDataset 的 API 进行聚合

  1. groupedDF.avg("pm")
  2. .orderBy('pm_avg)
  3. .show()
  4. groupedDF.max("pm")
  5. .orderBy('pm_avg)
  6. .show()

整体demo:

  1. val spark = SparkSession.builder().
  2. master("local[6]").appName("app processor").getOrCreate()
  3. import spark.implicits._
  4. @Test
  5. def groupBy():Unit = {
  6. //需求:统计每个月的PM值的平均值 (1)按年月分组(2)求PM平均值
  7. val schema = StructType(
  8. List(
  9. StructField("id",LongType),
  10. StructField("year",IntegerType),
  11. StructField("month",IntegerType),
  12. StructField("day",IntegerType),
  13. StructField("hour",IntegerType),
  14. StructField("season",IntegerType),
  15. StructField("pm",DoubleType)
  16. )
  17. )
  18. //2.读取数据
  19. val sourceDF = spark.read.schema(schema)
  20. .option("header",true)
  21. .csv("dataset/beijingpm_with_nan.csv")
  22. //3.数据去掉空值
  23. val cleanDF = sourceDF.where('pm =!= Double.NaN)
  24. //分组
  25. val groupedDF: RelationalGroupedDataset = cleanDF.groupBy('year,$"month")
  26. //4.使用functions函数来完成函数
  27. import org.apache.spark.sql.functions._
  28. //本质上,avg这个函数定义了一个操作,把表达式设置给pm列
  29. //select avg(pm) from ... group by
  30. groupedDF.agg(avg('pm) as "pm_avg")
  31. .orderBy('pm_avg.desc)
  32. .show()
  33. groupedDF.agg(stddev('pm) as "pm_stddev")
  34. .orderBy('pm_stddev.desc)
  35. .show()//方差
  36. //5.使用 GroupedDataset的API来完成聚合
  37. groupedDF.avg("pm")
  38. .select($"avg(pm)" as "pm_avg")
  39. .orderBy("pm_avg")
  40. .show()
  41. groupedDF.sum("pm")
  42. .select($"sum(pm)" as "pm_avg")
  43. .orderBy("pm_avg")
  44. .show()
  45. }

9.2 多维聚合

我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下

Step 1: 准备数据

  1. private val spark = SparkSession.builder()
  2. .master("local[6]")
  3. .appName("aggregation")
  4. .getOrCreate()
  5. import spark.implicits._
  6. private val schemaFinal = StructType(
  7. List(
  8. StructField("source", StringType),
  9. StructField("year", IntegerType),
  10. StructField("month", IntegerType),
  11. StructField("day", IntegerType),
  12. StructField("hour", IntegerType),
  13. StructField("season", IntegerType),
  14. StructField("pm", DoubleType)
  15. )
  16. )
  17. private val pmFinal = spark.read
  18. .schema(schemaFinal)
  19. .option("header", value = true)
  20. .csv("dataset/pm_final.csv")

Step 2: 进行多维度聚合

  1. import org.apache.spark.sql.functions._
  2. val groupPostAndYear = pmFinal.groupBy('source, 'year)
  3. .agg(sum("pm") as "pm")
  4. val groupPost = pmFinal.groupBy('source)
  5. .agg(sum("pm") as "pm")
  6. .select('source, lit(null) as "year", 'pm)
  7. groupPostAndYear.union(groupPost)
  8. .sort('source, 'year asc_nulls_last, 'pm)
  9. .show()

整体demo:

  1. @Test
  2. def multiAgg():Unit = {
  3. //需求1:不同来源PM的统计
  4. //需求2:在同一个月,不同来源的PM值的平均值是多少
  5. //需求3:同一年,不同来源的PM值平均是多少
  6. //需求4:整体上来看,不同来源的PM值是多少
  7. //在一个结果集中,包含总计,小计 称为多维聚合
  8. val schemaFinal = StructType(
  9. List(
  10. StructField("source",StringType),
  11. StructField("year",IntegerType),
  12. StructField("month",IntegerType),
  13. StructField("day",IntegerType),
  14. StructField("hour",IntegerType),
  15. StructField("season",IntegerType),
  16. StructField("pm",DoubleType)
  17. )
  18. )
  19. //2.读取数据
  20. val pmFinal = spark.read.schema(schemaFinal)
  21. .option("header",true)
  22. .csv("dataset/pm_final.csv")
  23. //pmFinal.show()
  24. import org.apache.spark.sql.functions._
  25. //需求1:不同年,不同来源,PM值的平均数
  26. //select source,year,avg(pm) as pm from ... group by source,year
  27. val postAndYearDF = pmFinal.groupBy('source,'year)
  28. .agg(avg('pm) as "pm")
  29. //需求2:在整个数据集中,按照不同的来源来统计PM值的平均值
  30. //select source,avg(pm) as pm from ... group by source
  31. val portDF = pmFinal.groupBy('source)
  32. .agg(avg('pm) as "pm")
  33. .select('source,lit(null) as "year",'pm)//多加一列用于union
  34. //合并在一个集合中
  35. postAndYearDF.union(portDF)
  36. .sort('source,'year asc_nulls_last,'pm)
  37. .show()
  38. }

大家其实也能看出来, 在一个数据集中又小计又总计, 可能需要多个操作符, 如何简化呢? 请看下面

9.3 rollup 操作符

rollup 操作符其实就是 groupBy 的一个扩展, rollup 会对传入的列进行滚动 groupBygroupBy 的次数为列数量 + 1, 最后一次是对整个数据集进行聚合

Step 1: 创建数据集

  1. import org.apache.spark.sql.functions._
  2. val sales = Seq(
  3. ("Beijing", 2016, 100),
  4. ("Beijing", 2017, 200),
  5. ("Shanghai", 2015, 50),
  6. ("Shanghai", 2016, 150),
  7. ("Guangzhou", 2017, 50)
  8. ).toDF("city", "year", "amount")

Step 2: rollup 的操作

  1. sales.rollup("city", "year")
  2. .agg(sum("amount") as "amount")
  3. .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
  4. .show()
  5. /**
  6. * 结果集:
  7. * +---------+----+------+
  8. * | city|year|amount|
  9. * +---------+----+------+
  10. * | Shanghai|2015| 50| <-- 上海 2015 的小计
  11. * | Shanghai|2016| 150|
  12. * | Shanghai|null| 200| <-- 上海的总计
  13. * |Guangzhou|2017| 50|
  14. * |Guangzhou|null| 50|
  15. * | Beijing|2016| 100|
  16. * | Beijing|2017| 200|
  17. * | Beijing|null| 300|
  18. * | null|null| 550| <-- 整个数据集的总计
  19. * +---------+----+------+
  20. */

Step 3: 如果使用基础的 groupBy 如何实现效果?

  1. val cityAndYear = sales
  2. .groupBy("city", "year") // 按照 city 和 year 聚合
  3. .agg(sum("amount") as "amount")
  4. val city = sales
  5. .groupBy("city") // 按照 city 进行聚合
  6. .agg(sum("amount") as "amount")
  7. .select($"city", lit(null) as "year", $"amount")
  8. val all = sales
  9. .groupBy() // 全局聚合
  10. .agg(sum("amount") as "amount")
  11. .select(lit(null) as "city", lit(null) as "year", $"amount")
  12. cityAndYear
  13. .union(city)
  14. .union(all)
  15. .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
  16. .show()
  17. /**
  18. * 统计结果:
  19. * +---------+----+------+
  20. * | city|year|amount|
  21. * +---------+----+------+
  22. * | Shanghai|2015| 50|
  23. * | Shanghai|2016| 150|
  24. * | Shanghai|null| 200|
  25. * |Guangzhou|2017| 50|
  26. * |Guangzhou|null| 50|
  27. * | Beijing|2016| 100|
  28. * | Beijing|2017| 200|
  29. * | Beijing|null| 300|
  30. * | null|null| 550|
  31. * +---------+----+------+
  32. */

很明显可以看到, 在上述案例中, rollup 就相当于先按照 cityyear 进行聚合, 后按照 city 进行聚合, 最后对整个数据集进行聚合, 在按照 city 聚合时, year 列值为 null, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null

demo案例:

  1. @Test
  2. def rollup():Unit = {
  3. import org.apache.spark.sql.functions._
  4. val sales = Seq(
  5. ("Beijing", 2016, 100),
  6. ("Beijing", 2017, 200),
  7. ("Shanghai", 2015, 50),
  8. ("Shanghai", 2016, 150),
  9. ("Guangzhou", 2017, 50)
  10. ).toDF("city", "year", "amount")
  11. //每个城市的销售额、每个城市一共的销售额、总共的销售额
  12. sales.rollup('city,'year)
  13. .agg(sum('amount) as "amount")
  14. .sort('city asc_nulls_last, 'year asc_nulls_last)
  15. .show()
  16. }

使用 rollup 完成 pm 值的统计

上面的案例使用 rollup 来实现会非常的简单

  1. import org.apache.spark.sql.functions._
  2. pmFinal.rollup('source, 'year)
  3. .agg(sum("pm") as "pm_total")
  4. .sort('source.asc_nulls_last, 'year.asc_nulls_last)
  5. .show()

9.4 cube

cube 的功能和 rollup 是一样的, 但也有区别, 区别如下

rollup(A, B).sum©

其结果集中会有三种数据形式: A B CA null Cnull null C

不知道大家发现没, 结果集中没有对 B 列的聚合结果

cube(A, B).sum©

其结果集中会有四种数据形式: A B CA null Cnull null Cnull B C

不知道大家发现没, 比 rollup 的结果集中多了一个 null B C, 也就是说, rollup 只会按照第一个列来进行组合聚合, 但是 cube 会将全部列组合聚合

  1. import org.apache.spark.sql.functions._
  2. pmFinal.cube('source, 'year)
  3. .agg(sum("pm") as "pm_total")
  4. .sort('source.asc_nulls_last, 'year.asc_nulls_last)
  5. .show()
  6. /**
  7. * 结果集为
  8. *
  9. * +-------+----+---------+
  10. * | source|year| pm_total|
  11. * +-------+----+---------+
  12. * | dongsi|2013| 735606.0|
  13. * | dongsi|2014| 745808.0|
  14. * | dongsi|2015| 752083.0|
  15. * | dongsi|null|2233497.0|
  16. * |us_post|2010| 841834.0|
  17. * |us_post|2011| 796016.0|
  18. * |us_post|2012| 750838.0|
  19. * |us_post|2013| 882649.0|
  20. * |us_post|2014| 846475.0|
  21. * |us_post|2015| 714515.0|
  22. * |us_post|null|4832327.0|
  23. * | null|2010| 841834.0| <-- 新增
  24. * | null|2011| 796016.0| <-- 新增
  25. * | null|2012| 750838.0| <-- 新增
  26. * | null|2013|1618255.0| <-- 新增
  27. * | null|2014|1592283.0| <-- 新增
  28. * | null|2015|1466598.0| <-- 新增
  29. * | null|null|7065824.0|
  30. * +-------+----+---------+
  31. */

SparkSQL 中支持的 SQL 语句实现 cube 功能

SparkSQL 支持 GROUPING SETS 语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现 cube 也可以实现 rollup 的功能

  1. pmFinal.createOrReplaceTempView("pm_final")
  2. spark.sql(
  3. """
  4. |select source, year, sum(pm)
  5. |from pm_final
  6. |group by source, year
  7. |grouping sets((source, year), (source), (year), ())
  8. |order by source asc nulls last, year asc nulls last
  9. """.stripMargin)
  10. .show()

9.5 RelationalGroupedDataset

常见的 RelationalGroupedDataset 获取方式有三种

  • groupBy

  • rollup

  • cube

无论通过任何一种方式获取了 RelationalGroupedDataset 对象, 其所表示的都是是一个被分组的 DataFrame, 通过这个对象, 可以对数据集的分组结果进行聚合

val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)

需要注意的是, RelationalGroupedDataset 并不是 DataFrame, 所以其中并没有 DataFrame 的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成 DataFrame 对象, 然后就可以再次使用 DataFrame 的算子进行操作了

操作符解释

avg

求平均数

count

求总数

max

求极大值

min

求极小值

mean

求均数

sum

求和

agg

聚合, 可以使用 sql.functions 中的函数来配合进行操作

  1. pmDF.groupBy('year)
  2. .agg(avg('pm) as "pm_avg")

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/899618
推荐阅读
相关标签
  

闽ICP备14008679号