赞
踩
导读
groupBy
rollup
cube
pivot
RelationalGroupedDataset
上的聚合操作
groupBy
算子会按照列将 Dataset
分组, 并返回一个 RelationalGroupedDataset
对象, 通过 RelationalGroupedDataset
可以对分组进行聚合
Step 1: 加载实验数据
- private val spark = SparkSession.builder()
- .master("local[6]")
- .appName("aggregation")
- .getOrCreate()
-
- import spark.implicits._
-
- private val schema = StructType(
- List(
- StructField("id", IntegerType),
- StructField("year", IntegerType),
- StructField("month", IntegerType),
- StructField("day", IntegerType),
- StructField("hour", IntegerType),
- StructField("season", IntegerType),
- StructField("pm", DoubleType)
- )
- )
-
- private val pmDF = spark.read
- .schema(schema)
- .option("header", value = true)
- .csv("dataset/pm_without_null.csv")
Step 2: 使用 functions
函数进行聚合
- import org.apache.spark.sql.functions._
-
- val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)
-
- groupedDF.agg(avg('pm) as "pm_avg")
- .orderBy('pm_avg)
- .show()
Step 3: 除了使用 functions
进行聚合, 还可以直接使用 RelationalGroupedDataset
的 API
进行聚合
- groupedDF.avg("pm")
- .orderBy('pm_avg)
- .show()
-
- groupedDF.max("pm")
- .orderBy('pm_avg)
- .show()
整体demo:
- val spark = SparkSession.builder().
- master("local[6]").appName("app processor").getOrCreate()
- import spark.implicits._
-
- @Test
- def groupBy():Unit = {
- //需求:统计每个月的PM值的平均值 (1)按年月分组(2)求PM平均值
- val schema = StructType(
- List(
- StructField("id",LongType),
- StructField("year",IntegerType),
- StructField("month",IntegerType),
- StructField("day",IntegerType),
- StructField("hour",IntegerType),
- StructField("season",IntegerType),
- StructField("pm",DoubleType)
- )
- )
-
- //2.读取数据
- val sourceDF = spark.read.schema(schema)
- .option("header",true)
- .csv("dataset/beijingpm_with_nan.csv")
- //3.数据去掉空值
- val cleanDF = sourceDF.where('pm =!= Double.NaN)
- //分组
- val groupedDF: RelationalGroupedDataset = cleanDF.groupBy('year,$"month")
- //4.使用functions函数来完成函数
- import org.apache.spark.sql.functions._
- //本质上,avg这个函数定义了一个操作,把表达式设置给pm列
- //select avg(pm) from ... group by
- groupedDF.agg(avg('pm) as "pm_avg")
- .orderBy('pm_avg.desc)
- .show()
-
- groupedDF.agg(stddev('pm) as "pm_stddev")
- .orderBy('pm_stddev.desc)
- .show()//方差
- //5.使用 GroupedDataset的API来完成聚合
- groupedDF.avg("pm")
- .select($"avg(pm)" as "pm_avg")
- .orderBy("pm_avg")
- .show()
-
- groupedDF.sum("pm")
- .select($"sum(pm)" as "pm_avg")
- .orderBy("pm_avg")
- .show()
-
- }
我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下
Step 1: 准备数据
- private val spark = SparkSession.builder()
- .master("local[6]")
- .appName("aggregation")
- .getOrCreate()
-
- import spark.implicits._
-
- private val schemaFinal = StructType(
- List(
- StructField("source", StringType),
- StructField("year", IntegerType),
- StructField("month", IntegerType),
- StructField("day", IntegerType),
- StructField("hour", IntegerType),
- StructField("season", IntegerType),
- StructField("pm", DoubleType)
- )
- )
-
- private val pmFinal = spark.read
- .schema(schemaFinal)
- .option("header", value = true)
- .csv("dataset/pm_final.csv")
Step 2: 进行多维度聚合
- import org.apache.spark.sql.functions._
-
- val groupPostAndYear = pmFinal.groupBy('source, 'year)
- .agg(sum("pm") as "pm")
-
- val groupPost = pmFinal.groupBy('source)
- .agg(sum("pm") as "pm")
- .select('source, lit(null) as "year", 'pm)
-
- groupPostAndYear.union(groupPost)
- .sort('source, 'year asc_nulls_last, 'pm)
- .show()
整体demo:
- @Test
- def multiAgg():Unit = {
- //需求1:不同来源PM的统计
- //需求2:在同一个月,不同来源的PM值的平均值是多少
- //需求3:同一年,不同来源的PM值平均是多少
- //需求4:整体上来看,不同来源的PM值是多少
- //在一个结果集中,包含总计,小计 称为多维聚合
-
- val schemaFinal = StructType(
- List(
- StructField("source",StringType),
- StructField("year",IntegerType),
- StructField("month",IntegerType),
- StructField("day",IntegerType),
- StructField("hour",IntegerType),
- StructField("season",IntegerType),
- StructField("pm",DoubleType)
- )
- )
-
- //2.读取数据
- val pmFinal = spark.read.schema(schemaFinal)
- .option("header",true)
- .csv("dataset/pm_final.csv")
-
- //pmFinal.show()
-
- import org.apache.spark.sql.functions._
- //需求1:不同年,不同来源,PM值的平均数
- //select source,year,avg(pm) as pm from ... group by source,year
- val postAndYearDF = pmFinal.groupBy('source,'year)
- .agg(avg('pm) as "pm")
- //需求2:在整个数据集中,按照不同的来源来统计PM值的平均值
- //select source,avg(pm) as pm from ... group by source
- val portDF = pmFinal.groupBy('source)
- .agg(avg('pm) as "pm")
- .select('source,lit(null) as "year",'pm)//多加一列用于union
- //合并在一个集合中
- postAndYearDF.union(portDF)
- .sort('source,'year asc_nulls_last,'pm)
- .show()
-
- }
大家其实也能看出来, 在一个数据集中又小计又总计, 可能需要多个操作符, 如何简化呢? 请看下面
9.3 rollup
操作符rollup
操作符其实就是 groupBy
的一个扩展, rollup
会对传入的列进行滚动 groupBy
, groupBy
的次数为列数量 + 1
, 最后一次是对整个数据集进行聚合
Step 1: 创建数据集
- import org.apache.spark.sql.functions._
-
- val sales = Seq(
- ("Beijing", 2016, 100),
- ("Beijing", 2017, 200),
- ("Shanghai", 2015, 50),
- ("Shanghai", 2016, 150),
- ("Guangzhou", 2017, 50)
- ).toDF("city", "year", "amount")
Step 2: rollup
的操作
- sales.rollup("city", "year")
- .agg(sum("amount") as "amount")
- .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
- .show()
-
- /**
- * 结果集:
- * +---------+----+------+
- * | city|year|amount|
- * +---------+----+------+
- * | Shanghai|2015| 50| <-- 上海 2015 的小计
- * | Shanghai|2016| 150|
- * | Shanghai|null| 200| <-- 上海的总计
- * |Guangzhou|2017| 50|
- * |Guangzhou|null| 50|
- * | Beijing|2016| 100|
- * | Beijing|2017| 200|
- * | Beijing|null| 300|
- * | null|null| 550| <-- 整个数据集的总计
- * +---------+----+------+
- */
Step 3: 如果使用基础的 groupBy 如何实现效果?
- val cityAndYear = sales
- .groupBy("city", "year") // 按照 city 和 year 聚合
- .agg(sum("amount") as "amount")
-
- val city = sales
- .groupBy("city") // 按照 city 进行聚合
- .agg(sum("amount") as "amount")
- .select($"city", lit(null) as "year", $"amount")
-
- val all = sales
- .groupBy() // 全局聚合
- .agg(sum("amount") as "amount")
- .select(lit(null) as "city", lit(null) as "year", $"amount")
-
- cityAndYear
- .union(city)
- .union(all)
- .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
- .show()
-
- /**
- * 统计结果:
- * +---------+----+------+
- * | city|year|amount|
- * +---------+----+------+
- * | Shanghai|2015| 50|
- * | Shanghai|2016| 150|
- * | Shanghai|null| 200|
- * |Guangzhou|2017| 50|
- * |Guangzhou|null| 50|
- * | Beijing|2016| 100|
- * | Beijing|2017| 200|
- * | Beijing|null| 300|
- * | null|null| 550|
- * +---------+----+------+
- */
很明显可以看到, 在上述案例中, rollup
就相当于先按照 city
, year
进行聚合, 后按照 city
进行聚合, 最后对整个数据集进行聚合, 在按照 city
聚合时, year
列值为 null
, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null
demo案例:
- @Test
- def rollup():Unit = {
- import org.apache.spark.sql.functions._
-
- val sales = Seq(
- ("Beijing", 2016, 100),
- ("Beijing", 2017, 200),
- ("Shanghai", 2015, 50),
- ("Shanghai", 2016, 150),
- ("Guangzhou", 2017, 50)
- ).toDF("city", "year", "amount")
-
- //每个城市的销售额、每个城市一共的销售额、总共的销售额
- sales.rollup('city,'year)
- .agg(sum('amount) as "amount")
- .sort('city asc_nulls_last, 'year asc_nulls_last)
- .show()
- }
使用 rollup
完成 pm
值的统计
上面的案例使用 rollup
来实现会非常的简单
- import org.apache.spark.sql.functions._
-
- pmFinal.rollup('source, 'year)
- .agg(sum("pm") as "pm_total")
- .sort('source.asc_nulls_last, 'year.asc_nulls_last)
- .show()
cube
的功能和 rollup
是一样的, 但也有区别, 区别如下
rollup(A, B).sum©
其结果集中会有三种数据形式: A B C
, A null C
, null null C
不知道大家发现没, 结果集中没有对 B
列的聚合结果
cube(A, B).sum©
其结果集中会有四种数据形式: A B C
, A null C
, null null C
, null B C
不知道大家发现没, 比 rollup
的结果集中多了一个 null B C
, 也就是说, rollup
只会按照第一个列来进行组合聚合, 但是 cube
会将全部列组合聚合
- import org.apache.spark.sql.functions._
-
- pmFinal.cube('source, 'year)
- .agg(sum("pm") as "pm_total")
- .sort('source.asc_nulls_last, 'year.asc_nulls_last)
- .show()
-
- /**
- * 结果集为
- *
- * +-------+----+---------+
- * | source|year| pm_total|
- * +-------+----+---------+
- * | dongsi|2013| 735606.0|
- * | dongsi|2014| 745808.0|
- * | dongsi|2015| 752083.0|
- * | dongsi|null|2233497.0|
- * |us_post|2010| 841834.0|
- * |us_post|2011| 796016.0|
- * |us_post|2012| 750838.0|
- * |us_post|2013| 882649.0|
- * |us_post|2014| 846475.0|
- * |us_post|2015| 714515.0|
- * |us_post|null|4832327.0|
- * | null|2010| 841834.0| <-- 新增
- * | null|2011| 796016.0| <-- 新增
- * | null|2012| 750838.0| <-- 新增
- * | null|2013|1618255.0| <-- 新增
- * | null|2014|1592283.0| <-- 新增
- * | null|2015|1466598.0| <-- 新增
- * | null|null|7065824.0|
- * +-------+----+---------+
- */
SparkSQL
中支持的 SQL
语句实现 cube
功能
SparkSQL
支持 GROUPING SETS
语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现 cube
也可以实现 rollup
的功能
- pmFinal.createOrReplaceTempView("pm_final")
- spark.sql(
- """
- |select source, year, sum(pm)
- |from pm_final
- |group by source, year
- |grouping sets((source, year), (source), (year), ())
- |order by source asc nulls last, year asc nulls last
- """.stripMargin)
- .show()
常见的 RelationalGroupedDataset
获取方式有三种
groupBy
rollup
cube
无论通过任何一种方式获取了 RelationalGroupedDataset
对象, 其所表示的都是是一个被分组的 DataFrame
, 通过这个对象, 可以对数据集的分组结果进行聚合
val groupedDF: RelationalGroupedDataset = pmDF.groupBy('year)
需要注意的是, RelationalGroupedDataset
并不是 DataFrame
, 所以其中并没有 DataFrame
的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成 DataFrame
对象, 然后就可以再次使用 DataFrame
的算子进行操作了
操作符 | 解释 |
---|---|
| 求平均数 |
| 求总数 |
| 求极大值 |
| 求极小值 |
| 求均数 |
| 求和 |
| 聚合, 可以使用 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。