当前位置:   article > 正文

SparkSQL与Hive整合 、SparkSQL函数操作

SparkSQL与Hive整合 、SparkSQL函数操作

SparkSQL与Hive整合

SparkSQL和Hive的整合,是一种比较常见的关联处理方式,SparkSQL加载Hive中的数据进行业务处理,同时将计算结果落地回Hive中。

整合需要注意的地方

1)需要引入hive的hive-site.xml,添加classpath目录下面即可,或者放到$SPARK_HOME/conf。

2)为了能够正常解析hive-site.xml中hdfs路径,需要将hdfs-site.xml和core-site.xml到classpath下面。整合编码如下:

  1. object Hive_Support {
  2.   def main(args: Array[String]): Unit = {
  3.     //创建sparkSql程序入口
  4.     val spark: SparkSession = SparkSession.builder()
  5.       .appName("demo")
  6.       .master("local[*]")
  7.       .enableHiveSupport()
  8.       .getOrCreate()
  9.     //调用sparkContext
  10.     val sc: SparkContext = spark.sparkContext
  11.     //设置日志级别
  12.     sc.setLogLevel("WARN")
  13.     //导包
  14.     import spark.implicits._
  15.     //查询hive当中的表
  16.     spark.sql("show tables").show()
  17.     //创建表
  18.     spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '")
  19.     //导入数据
  20.     spark.sql("load data local inpath'./person.txt' into table person")
  21.     //查询表当中数据
  22.     spark.sql("select * from person").show()
  23.   }
  24. }

SparkSQL函数操作

函数的定义

SQL中函数,其实说白了就是各大编程语言中的函数,或者方法,就是对某一特定功能的封装,通过它可以完成较为复杂的统计。这里的函数的学习,就基于Hive中的函数来学习。

函数的分类

函数的分类方式非常多,主要从功能和实现方式上进行区分。

实现方式上分类

1)UDF(User Defined function)用户自定义函数:一路输入,一路输出,比如year,date_add, instr。

2)UDAF(User Defined aggregation function)用户自定义聚合函数:多路输入,一路输出,常见的聚合函数:count、sum、collect_list。

3)UDTF(User Defined table function)用户自定义表函数:一路输入,多路输出,explode。

4)开窗函数:row_number(),sum/max/min over。

用户自定义函数

当系统提供的这些函数,满足不了我们的需要的话,就只能进行自定义相关的函数,一般自定义的函数两种,UDF和UDAF。

1)UDF:一路输入,一路输出,完成就是基于scala函数。

通过模拟获取字符串长度的udf来学习自定义udf操作。

  1. object UDF_Demo {
  2.   def main(args: Array[String]): Unit = {
  3.     //创建sparkSql程序入口
  4.     val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").getOrCreate()
  5.     //调用sparkContext
  6.     val sc: SparkContext = spark.sparkContext
  7.     //设置日志级别
  8.     sc.setLogLevel("WARN")
  9.     //导包
  10.     import spark.implicits._
  11.     //加载文件
  12.     val personDF: DataFrame = spark.read.json("E:\\data\\people.json")
  13.     //展示数据
  14.     //personDF.show()
  15.     //注册成为一张表
  16.     personDF.createOrReplaceTempView("t_person")
  17.     //赋予什么功能
  18.     val fun = (x:String)=>{
  19.       "Name:"+x
  20.     }
  21.     //没有addName这个函数,就注册它
  22.     spark.udf.register("addName",fun)
  23.     //查询
  24.     spark.sql("select name,addName(name) from t_person").show()
  25. //释放资源
  26. spark.stop()
  27.   }}

2)开窗函数:over()开窗函数是按照某个字段分组,然后查询出另一字段的前几个的值,相当于分组取topN。

row_number() over (partitin by XXX order by XXX)

rank() 跳跃排序,有两个第二名是,后边跟着的是第四名

dense_rank()  连续排序,有两个第二名是,后边跟着的是第三名

row_number() 连续排序,两个值相同排序也是不同

在使用聚合函数后,会将多行变成一行,而over()开窗函数其实就是给每个分组的数据,按照其排序的顺序,打上一个分组内的行号,直接将所有列信息显示出来。在使用聚合函数后,如果要显示其它的列必须将列加入到group by中,而使用开窗函数后,可以不使用group by。

代码如下:

  1. case class StudentScore(name:String,clazz:Int,score:Int)
  2. object SparkSqlOverDemo {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setMaster("local[*]").setAppName("sparksqlover")
  5.     val sc = new SparkContext(conf)
  6.     val spark = SparkSession.builder().config(conf).getOrCreate()
  7.     val arr01 = Array(("a",1,88),
  8.       ("b",1,78),
  9.       ("c",1,95),
  10.       ("d",2,74),
  11.       ("e",2,92),
  12.       ("f",3,99),
  13.       ("g",3,99),
  14.       ("h",3,45),
  15.       ("i",3,53),
  16.       ("j",3,78))
  17.     import spark.implicits._
  18.     val scoreRDD = sc.makeRDD(arr01).map(x=>StudentScore(x._1,x._2,x._3)).toDS
  19.     scoreRDD.createOrReplaceTempView("t_score")
  20.     //查询t_score表数据
  21.     spark.sql("select * from t_score").show()
  22.     //使用开窗函数查找topN,rank() 跳跃排序,有两个第二名是,后边跟着的是第四名
  23.     spark.sql("select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score ").show()
  24.     //讲使用开窗函数后的查询结果作为一张临时表,这个临时表有每个班的成绩排名,再取前三名
  25.     spark.sql("select * from (select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score) t1 where rownum <=3 ").show()
  26.   }
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/987988
推荐阅读
相关标签
  

闽ICP备14008679号