赞
踩
用于处理结构化数据的Spark模块。
可以通过DataFrame和DataSet处理数据。
1、易整合
可以使用java、scala、python、R等语言的API操作。
2、统一的数据访问
连接到任何数据源的方式相同。
3、兼容Hive
4、标准的数据连接(JDBC/ODBC)
优点:表达非常清晰,难度低、易学习。
缺点:复杂的业务需要复杂的SQL, 复杂分析,SQL嵌套较多。机器学习较难实现。
Hive是将sql转化成MapReduce进行计算
SparkSQL是将sql转化成rdd集进行计算
什么RDD??
弹性分布式数据集。
DataFrame
什么是DataFrame??
DataFrame是以RDD为基础的带有Schema元信息的分布式数据集。
(DataFrame=Schema+RDD*n)
什么是DataSaet??
含有类型信息的DataFrame就是DataSet
(DataSaet=DataFrame+类型= Schema+RDD*n+类型)
DataSet包含了DataFrame的功能
SparkSQL驱动为SparkSession
SparkSession可以执行SparkSQL也可以执行HiveSQL
在llinux中进行操作 开启hadoop和spark 进入 spark-shell窗口 //读取数据 val lineRDD= sc.textFile("hdfs://node01:8020/tt.txt").map(_.split(" ")) //实例样例类(类似于表的结构) case class Person(id:Int, name:String, age:Int) //遍历数据,将数据填充到样例类中 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //将RDD转换成DataFrame val personDF = personRDD.toDF //查看数据 personDF.show //输出表结构 personDF.printSchema //将DataFrame注册为张表 personDF.createOrReplaceTempView("t_person") //通过SQL语句进行查询 spark.sql("select id,name from t_person where id > 3").show |
使用SparkSession对象(spark)直接读取数据,读取文本文件是没有元数据信息,读取json文件有元数据信息。
1.通过spark.createDataset创建Dataset
val fileRdd = sc.textFile("hdfs://node01:8020/person.txt") //RDD[String] val ds1 = spark.createDataset(fileRdd) //DataSet[String] ds1.show |
2.通RDD.toDS方法生成DataSet
case class Person(name:String, age:Int) val data = List(Person("zhangsan",20),Person("lisi",30)) //List[Person] val dataRDD = sc.makeRDD(data) val ds2 = dataRDD.toDS //Dataset[Person] ds2.showS |
3.通过DataFrame.as[泛型]转化生成DataSet
case class Person(name:String, age:Long) val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json") val jsonDS = jsonDF.as[Person] //DataSet[Person] jsonDS.show |
准备数据
val lineRDD= sc.textFile("hdfs://node01:8020/tt.txt").map(_.split(" "))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
查询
personDF.select("id","name","age").show
personDF.select($"id",$"name",$"age"+1).show
personDF.select($"id",$"name",$"age"+1).filter($"age">25).show
注册成一张表
personDF.createOrReplaceTempView("t_person")
查询
spark.sql("select * from t_person").show
spark.sql("select * from personDFT ").show
spark.sql("select * from personDFT where age >25").show
总结:
1.DataFrame和DataSet都可以通过RDD来进行创建
2.也可以通过读取普通文本创建--注意:直接读取没有完整的约束,需要通过RDD+Schema
3.通过josn/parquet会有完整的约束
4.不管是DataFrame还是DataSet都可以注册成表,之后就可以使用SQL进行查询了! 也可以使用DSL!
第1种:指定列名添加Schema
第2种:通过StructType指定Schema
第3种:编写样例类,利用反射机制推断Schema
第一种:指定列名添加Schema
|
第二种:通过StructType指定Schema
|
第三种:编写样例类,利用反射机制推断Schema
|
1、SQL查询的方式
//0.注册表
personDF.createOrReplaceTempView("t_person")
//1.查询所有数据
spark.sql("select * from t_person").show()
2、DSL查询的方式
//1.查询所有数据
personDF.select("name","age")
//2.查询age+1
personDF.select($"name",$"age" + 1)
//1.RDD-->DF
.toDF .rdd .toDS |
SQL风格
package cn.itcast.sql |
package cn.itcast.sql |
Spark SQL可以与多种数据源交互,如普通文本、json、parquet、csv、MySQL等
1.写入mysql数据库中不同数据源
2.从mysql数据库中读取不同数据源
|
1.SparkSQL写数据:
DataFrame/DataSet.write.json/csv/jdbc
2.SparkSQL读数据:
SparkSession.read.json/csv/text/jdbc/format
类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能。
spark中的自定义函数有如下3类
1.UDF(User-Defined-Function)
输入一行,输出一行
2.UDAF(User-Defined Aggregation Funcation)
输入多行,输出一行
3.UDTF(User-Defined Table-Generating Functions)
输入一行,输出多行
自定义UDF
有udf.txt数据格式如下:
Hello
abc
study
small
通过自定义UDF函数将每一行数据转换成大写
select value,smallToBig(value) from t_word
代码演示
package cn.itcast.sql //register为参数 smallToBig为使用时的名字自定义 类型为str:String 将没行单词转化为大写str.toUpperCase() 如果业务需求过多可以=》后面加{} |
需求
有udaf.json数据内容如下
{"name":"Michael","salary":3000}
{"name":"Andy","salary":4500}
{"name":"Justin","salary":3500}
{"name":"Berta","salary":4000}
求取平均工资
继承UserDefinedAggregateFunction方法重写说明
inputSchema:输入数据的类型
bufferSchema:产生中间结果的数据类型
dataType:最终返回的结果类型
deterministic:确保一致性,一般用true
initialize:指定初始值
update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)
merge:全局聚合(将每个分区的结果进行聚合)
evaluate:计算最终的结果
代码演示
package cn.itcast.sql |
介绍
开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。
聚合函数和开窗函数
聚合函数是将多行变成一行,count,avg....
开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中
开窗函数可以不使用group by,直接将所有信息显示出来
开窗函数分类
1.聚合开窗函数
聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
2.排序开窗函数
排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。
/export/servers/spark/bin/spark-shell
//实例一个样例类分数
case class Score(name: String, clazz: Int, score: Int)
//存入些数据 转化成DF
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.show()
示例1
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准允许将所有聚合函数用做聚合开窗函数。
spark.sql("select count(name) from scores").show
spark.sql("select name, class, score, count(name)over()name_count from scores").show
查询结果如下所示:
OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。
如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。
下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。
//按照class进行分区,分区返回class分区后的总数,并按照开窗函数显示出来
spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show
查询结果如下所示:
row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。
●示例1
//按照分数进行排序,并且按开窗函数显示出来
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()
//按照相同class进行分区,然后按照分区后的分数排序
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名
//按照分数进行排序
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()
//按照相同的class进行分区,然后按照分数进行排序 如果相同排序的序号相同,下一个序号就加上相同的个数
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名之后的排名是并列排名加1
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名
//然后按照分数进行排序 如果相同排序的序号相同,下一个数就按照上一个数的序号加一
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()
执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务 Hive将SQL转成MapReduce执行速度慢
使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表内的数据 所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据 |
1: 修改 hive/conf/hive-site.xml 新增如下配置
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://node01:9083</value> </property> </configuration> |
2: 后台启动 Hive MetaStore服务
nohup /export/servers/hive/bin/hive --service metastore &
Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse,所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录
hive-site.xml 元数据仓库的位置等信息
core-site.xml 安全相关的配置
hdfs-site.xml HDFS 相关的配置
将上面三个文件拷贝到/export/servers/spark/conf目录下
使用IDEA本地测试直接把以上配置文件放在resources目录即可
hive-site.xml core-site.xml hdfs-site.xml拷贝到项目得resources中,清空target.
如果没有msql驱动包记得下载 下载/查找mysql连接驱动包添加到spark得jars文件夹中
如下链接讲解无msql驱动包
https://blog.csdn.net/weixin_44519124/article/details/105515411
package cn.itcast.sql |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。