当前位置:   article > 正文

73、Spark SQL之开窗函数以及top3销售额统计案例实战

73、Spark SQL之开窗函数以及top3销售额统计案例实战

窗函数以及top3销售额统计案例实战

Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取topn的逻辑。

案例:统计每个种类的销售额排名前3的产品

先说明一下,row_number()开窗函数的作用
其实,就是给每个分组的数据,按照其排序顺序,打上一个分组内的行号
比如说,有一个分组date=20181231,里面有3条数据,1122,1121,1124,
那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号
行号从1开始递增,比如1122 1,1121 2,1124 3

row_number()开窗函数的语法说明
首先可以,在SELECT查询时,使用row_number()函数
其次,row_number()函数后面先跟上OVER关键字
然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组
其次是可以用ORDER BY进行组内排序
然后row_number()就可以给每个组内的行,一个组内行号
Java版本

  1. public class RowNumberWindowFunction {
  2. public static void main(String[] args) {
  3. SparkConf conf = new SparkConf().setAppName("RowNumberWindowFunctionJava");
  4. JavaSparkContext sparkContext = new JavaSparkContext(conf);
  5. // 创建销售额表,sales表
  6. HiveContext hiveContext = new HiveContext(sparkContext.sc());
  7. hiveContext.sql("DROP TABLE IF EXISTS sales");
  8. hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
  9. + "product STRING,"
  10. + "category STRING,"
  11. + "revenue BIGINT)");
  12. hiveContext.sql("LOAD DATA "
  13. + "LOCAL INPATH '/opt/module/datas/sparkstudy/sql/resource/sales.txt' "
  14. + "INTO TABLE sales");
  15. DataFrame top3 = hiveContext.sql(
  16. "select s.product, s.category, s.revenue " +
  17. "from ( " +
  18. "select product, category, revenue, " +
  19. "row_number() over(partition by category order by revenue desc) rank " +
  20. "from sales " +
  21. ") s " +
  22. "where s.rank < 4"
  23. );
  24. hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
  25. top3.saveAsTable("top3_sales");
  26. }
  27. }

Scala版本

  1. object RowNumberWindowFunction {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf().setAppName("RowNumberWindowFunctionScala")
  4. val sparkContext = new SparkContext(conf)
  5. // 创建销售额表,sales表
  6. val hiveContext = new HiveContext(sparkContext)
  7. hiveContext.sql("DROP TABLE IF EXISTS sales")
  8. hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
  9. + "product STRING,"
  10. + "category STRING,"
  11. + "revenue BIGINT)")
  12. hiveContext.sql("LOAD DATA "
  13. + "LOCAL INPATH '/opt/module/datas/sparkstudy/sql/resource/sales.txt' "
  14. + "INTO TABLE sales")
  15. val top3 = hiveContext.sql(
  16. "select s.product, s.category, s.revenue " +
  17. "from ( " +
  18. "select product, category, revenue, " +
  19. "row_number() over(partition by category order by revenue desc) rank " +
  20. "from sales " +
  21. ") s " +
  22. "where s.rank < 4"
  23. )
  24. hiveContext.sql("DROP TABLE IF EXISTS top3_sales")
  25. top3.write.saveAsTable("top3_sales")
  26. }
  27. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/447702
推荐阅读
相关标签
  

闽ICP备14008679号