赞
踩
SparkSQL:
1.sparksql可以和hive集成
问题1:sparksql什么时候和hive集成?
答:当开发者需要使用sparksql来代替mapreduce,去计算hive中的表的时候,就需要集成hive。(mapreduce计算太慢了,所以我们采用sparksql去访问hive,来达到提供计算效率的目的)
问题2:如何和hive集成?
SparkSQL和hive集成的步骤:
hive中的数据是储存在HDFS中的,元数据是储存在mysql中的
所以sparksql如果想要访问hive中表
也就是说,sparksql必须要访问HDFS和mysql
1.开启HDFS的服务 sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemon.sh start datanode
2.spark本身的问题:
当初我们进行spark源码编译的时候,就提出过一个问题:
如果spark需要后期集成hive的话,那么我们在编译的时候
就必须要加上这样的一句话。在make-distribution.sh文件中
添加SPARK_HIVE=1,这个意思是spark编译的时候集成hive。
最后,执行编译命令的时候,还要加上-Phive -Phive-thriftserver
表示编译集成hive,以及集成hive-thriftserver多服务
3.sparksql必须访问mysql,怎么操作。
注意:不管什么框架,想要访问mysql,必须要提交四要素:URL,账号,密码,驱动包
因为连接mysql的四要素在hive-site.xml文件中已经配置好了,所以我们可以直接将
hive-site.xml文件放置到spark的conf目录即可
cp conf/hive-site.xml /opt/cdh5.14.2/modules/spark-2.2.1-bin-2.6.0-cdh5.14.2/conf/
再把mysql的驱动包直接放到spark的jars目录即可
cp lib/mysql-connector-java-5.1.38.jar /opt/cdh5.14.2/modules/spark-2.2.1-bin-2.6.0-cdh5.14.2/jars/
4.sparksql现在可以去访问mysql元数据库了,但是请大家注意,在学习hive的时候
我们学到过一个配置。叫做metastore服务。该服务就是mysql元数据库的远程登入服务
也就是说,如果你hive-site.xml文件中配置了metastore服务,那么必须要先开启该服务,
sparksql才能正常的连接到mysql。不然就失败
<property>
<name>hive.metastore.uris</name>
<value>thrift://superman-bigdata.com:9083</value>
</property>
如果你没有配置matestore服务,那么不需要开启,sparksql可以直接去连接了
命令:bin/hive --service metastore &
================================================================================
sparksql和hive的集成已经完毕,接下来可以使用
SparkSQL和hive集成的使用。
在学习sparkcore的时候,我们可以使用spark-shell命令行进行编写代码
其实sparksql也是一样,我们可以使用spark-sql命令行来进行编写sql语句
1.开启spark-sql的命令行脚本,编写sql语句
命令:bin/spark-sql
我们可以发现,现在spark-sql中看到的就是hive数据仓库中的信息
说明sparksql已经可以正常的访问到hive了
spark-sql的一个小毛病:当我们切换数据库的时候,显示数据库名称的地方并没有正常切换,但是不影响正常操作,其实已经切换成功了。
spark-sql (default)> show databases;
spark-sql (default)> use o2o31;
spark-sql (default)> show tables;
database tableName isTemporary
数据的名称 当前数据库中的表 该表是否是临时表
说明:hive中的表不是临时表。什么是临时表呢? sparksql使用创建临时表的方法,创建的表叫做临时表
测试:
相同的sql语句,分别在sparksql和hive中计算,比较运行时间:
hive:Time taken: 22.9 seconds
sparksql:Time taken: 1.705 seconds
可以很直观的看到,spark的执行速度是非常快的
其实spark-sql命令行中,可以执行hive中的所有的sql语句
比如建表语句create,加载数据load
================================================================================
在spark-shell命令行中进行编程代码:
bin/spark-shell
Spark context available as ‘sc’ (master = local[*], app id = local-1582079672812).
这个是sparkcore的上下文sparkcontext,用来构建核心抽象RDD的
Spark session available as ‘spark’.
这个是sparksql的上下文sparksession,用来构建核心抽象dataframe
如果我想在spark-shell中仍然编写sql语句,怎么操作呢?
spark.sql(sql语句)
scala> spark.sql(“show databases”).show()
scala> spark.sql(“use o2o31”).show()
scala> spark.sql(“select e.ename,e.sal,d.dname,d.loc from emp e join dept d on e.deptno = d.deptno”).show()
================================================================================
SparkSQL的thriftserver服务,多用户服务,基于hive的thriftserver服务
1.hive的thriftserver服务必须配置完成
hive.server2.thrift.port
10000
<property> <name>hive.server2.thrift.bind.host</name> <value>superman-bigdata.com</value> </property> hive中多服务的开启方式: bin/hive --service hiveserver2 & hive中用户连接多服务操作: bin/beeline !connect jdbc:hive2://superman-bigdata.com:10000 输入账号和密码即可 2.sparksql中开启多服务 -1.开启spark-sql的thriftserver服务 sbin/start-thriftserver.sh -2.使用spark自带的beeline脚本 bin/beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://superman-bigdata.com:10000 Connecting to jdbc:hive2://superman-bigdata.com:10000 Enter username for jdbc:hive2://superman-bigdata.com:10000: 默认是没有账号密码的,你可以不输入,也可以随意输入 Connected to: Spark SQL (version 2.2.1) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ 0: jdbc:hive2://superman-bigdata.com:10000> 0: jdbc:hive2://superman-bigdata.com:10000> select * from emp;
注意:
hive的bin/hive --service hiveserver2 &
和sparksql的sbin/start-thriftserver.sh不能同时开启
如果你只是使用hive,那么开启in/hive --service hiveserver2 &即可
如果你想使用sparksql的多服务,只要开启sbin/start-thriftserver.sh就可以了
为什么?
因为sparksql的sbin/start-thriftserver.sh服务就是基于bin/hive --service hiveserver2 &的
所以这两个服务不能同时开启,不然就矛盾了。
!!!!使用IDEA利用scala语言,进行sparksql的sbin/start-thriftserver.sh服务的连接。而不是使用sparksql自带的的beeline脚本
1.在pom文件中添加需要的依赖
org.apache.spark
spark-sql_2.11
2.2.1
2.sparksql和hive集成的多服务的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>2.2.1</version>
</dependency>
3.我们还需要访问mysql元数据库,所以还需要添加mysql的驱动包依赖
mysql
mysql-connector-java
5.1.27
4.编写代码
遇到问题:
Exception in thread “main” org.apache.hive.service.cli.HiveSQLException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: java.net.ConnectException: 拒绝连接 (Connection refused)
报错:拒绝连接到元数据服务
怎么才能连接上元数据服务呢?
连接mysql元数据服务,需要访问四要素。
所以必须要把hive-site.xml文件放在当前项目的中
package com.bigdata.SparkSQL
import java.sql.{DriverManager, PreparedStatement}
object SparkSQLThriftServer {
def main(args: Array[String]): Unit = {
//sparksql连接thriftserver服务,使用的是JDBC的方式 //1.添加驱动类 val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName(driver) //2.创建连接对象 val url = "jdbc:hive2://superman-bigdata.com:10000" val user = "xiaoming" val password = "123456" val connect = DriverManager.getConnection(url,user,password) //3.执行sql语句 connect.prepareStatement("use o2o31").execute() var pstmt: PreparedStatement = connect.prepareStatement("select empno,ename,sal from emp") var rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")},ename = ${rs.getString("ename")},sal = ${rs.getDouble("sal")}") } rs.close() pstmt.close() println("=================================================================") pstmt = connect.prepareStatement("select empno,ename,sal from emp where sal > ?") pstmt.setDouble(1,1550.0) rs = pstmt.executeQuery() while (rs.next()){ println(s"empno = ${rs.getInt("empno")},ename = ${rs.getString("ename")},sal = ${rs.getDouble("sal")}") } rs.close() pstmt.close() connect.close()
}
2.sparksql也可以不和hive集成
案例:
SparkSQL读取HDFS上的json文件(和hive是没有任何关系的)
1.上传people.json文件到HDFS
2.编写代码(使用spark-shell)
scala> val path = “/input/people.json”
scala> val df = spark.read.json(path)
3.将DataFrame注册成为一张临时表
df.createTempView(“temp”)
scala> spark.sql(“select * from temp”).show()
================================================================================
DataFrame
DataFrame是SparkSQL中的核心抽象。
就好比SparkCore中的RDD
DataFrame就是内部以RDD为基础的分布式数据集
问题:
RDD和DataFrame有什么区别??
区别在于:DataFrame 内部就是RDD,除此以外,还有schema。
schema在sparksql中指的是二维表格
所以: DataFrame = RDD + schema
DataFrame的创建方式呢?
使用SparkSession(是SparkSQL中的上下文,类似于SparkCore中的SparkContext)
下的read接口自带的一系列的读取数据的方法
DataFrame有哪些编程的方式(处理DataFrame中的数据的方式)
1.使用HQL语句 ==》 写sql语句
必须将DataFrame注册成为临时表,或者以表的形式保存到hive中
2.DSL语法 ==》 API的链式编程
直接使用dataFrame(不需要注册为表)的一系列的API
3.通常的,我们在开发中会遇到很多表解决不了的问题,我们会将dataframe转成RDD
使用RDD来进行数据处理,处理完毕之后,再转成DataFrame,得到结果表
SparkSQL中的数据处理的流程:
1.读取数据形成DataFrame(read) 怎么读???
2.DataFrame中的数据进行处理 (transformation) 在课程案例的基础上,自己多去找一些案例练习
3.结果数据保存 (write) 怎么写???
================================================================================
SparkSQL中的read和write的编程模型
功能:通过SparkSQL内部定义好的read和write读写接口进行数据的加载和数据的输出保存操作
read(读):def read: DataFrameReader = new DataFrameReader(self)
注明:IDEA中找源码的快捷键:ctrl + N 搜索查看的类名或者object名称
所有的read下的方法都在这个文件中org.apache.spark.sql.DataFrameReader
功能:提供了一系列的读取数据的方法,形成DataFrame
相关的方法:
def format(source: String): DataFrameReader
指定输入数据的数据格式
|王五 |39 |
def option(key: String, value: String): DataFrameReader
表示可以添加的参数,可给可不给,可以给定多个
使用option的时候,一次只能给一个参数
如果你需要添加10个参数,那么久需要使用10次option
def options(options: scala.collection.Map[String, String]): DataFrameReader
表示可以同时添加多个参数
如果你需要添加10个参数,可以先把10个键值对的参数保存在Map集合中,然后只需要使用一次options就可以访问到了
def load(): DataFrame
表示加载数据,但是注意没有参数的,也就是说加载的是没有路径的数据
例子:比如直接加载网上的资源,spark.read.format(“com.databricks.spark.git”).option(…).load()
def load(path: String): DataFrame
表示加载某一个路径下的数据文件
def load(paths: String*): DataFrame
表示加载多个路径下的文件,路径之间使用逗号分隔
例子:spark.read.load("/input/1.txt","/input2/2.txt","/input3/3.txt")
def json(path: String): DataFrame
表示指定读取json格式的文件,给定文件路径即可
def json(paths: String*): DataFrame
表示指定读取多个json格式的文件,给定文件路径即可
def json(jsonRDD: JavaRDD[String]): DataFrame
表示传入一个JavaRDD的参数
例子: java代码
String path = “/input/people.json”
JavaRDD jsonRDD = JavaSparkContext.textFile(path)
spark.read.json(jsonRDD)
def json(jsonRDD: RDD[String]): DataFrame
表示传入一个RDD的参数
例子: scala代码
val path = “/input/people.json”
val RDD = SparkContext.textFile(path)
spark.read.json(RDD)
def json(jsonDataset: Dataset[String]): DataFrame
表示传入一个DataSet的参数
def csv(path: String): DataFrame
表示给定一个csv文件的路径。 csv文件就是每行数据是以逗号分隔的数据。execl文件
def csv(csvDataset: Dataset[String]): DataFrame
表示传入一个DataSet的参数
def csv(paths: String*): DataFrame
表示同时传入多个csv文件的路径
def parquet(path: String): DataFrame
表示传入一个parquet文件的路径
def parquet(paths: String*): DataFrame
表示传入多个parquet文件的路径
def orc(path: String): DataFrame
表示传入一个orc文件的路径
def orc(paths: String*): DataFrame
表示传入多个orc文件的路径
def table(tableName: String): DataFrame
表示读取表
例子:spark.read.table(“表名”)
问题:表是什么表?
sparksql可以直接读取的表:
1.集成hive之后的,hive中的表 eg:spark.read.table(“school.student”)
读取hive中school数据库下的student表
2.sparksession维护的临时表 eg: spark.read.table("temp")
def text(path: String): DataFrame
表示读取一个文本文件的路径
def text(paths: String*): DataFrame
表示给定多个文本文件的路径
def textFile(path: String): Dataset[String]
表示读取一个文本文件的路径,请注意,返回值类型是DataSet,不是DataFrame
def textFile(paths: String*): Dataset[String]
表示读取多个文本文件的路径,请注意,返回值类型是DataSet,不是DataFrame
!!!!读取关系型数据库中的表
def jdbc(
url: String, 数据库的连接地址
table: String, 待读取的表
properties: Properties 配置参数
): DataFrame
例子:假设我们现在需要读取mysql数据中school数据库下的student表
val url = “jdbc:mysql://hostname:3306/school”
val table = “student”
val props = new Properties()
props.put(“user”,“root”)
props.put(“password”,“123456”)
val df = spark.read.jdbc(url,table,properties)
操作:查看当前DataFrame的分区数
df.partitions.size 这个是错误
因为dataframe没有partitions的方法,所以必须将DataFrame转成RDD之后,才能查看分区数
scala> df.rdd.partitions.size
res14: Int = 1
也就是说当我们使用spark的def jdbc(url: String, table: String, properties: Properties): DataFrame 这个方法读取mysql的表的返回值的分区数为1
def jdbc(
url: String, 数据库的连接地址
table: String, 待读取的表
columnName: String, 分区字段 deptno
lowerBound: Long, 分区字段值的下限 10
upperBound: Long, 分区字段值的上限 30
numPartitions: Int, 分区数 4
connectionProperties: Properties 配置参数
): DataFrame
这个方法表示sparksql读取数据库中的表,自定义分区数
以dept表为例,deptno(10,20,30,40),dname,loc三个字段
该方法的分区的规则:
step:(upperBound - lowerBound) / 4 = 5
current = current + step
直到numPartitions - 1次为止
分区:前闭后开
负无穷 - 15
15 - 20
20 - 25
25 - 正无穷
用法:
val df = spark.read.jdbc(url,table,columnName,lowerBound,upperBound,numPartitions,connectionProperties)
查看当前DataFrame的分区数
scala> df.rdd.partitions.size
res19: Int = 4
查看数据的具体存放位置:
df.rdd.glom().collect()
Array(
Array([10,ACCOUNTING,NEW YORK]),
Array(),
Array([20,RESEARCH,DALLAS]),
Array([30,SALES,CHICAGO], [40,OPERATIONS,BOSTON])
)
def jdbc(
url: String,数据库的连接地址
table: String, 待读取的表
predicates: Array[String], 自定分区方式,数组中有几个元素就表示当前的DataFrame有几个分区
connectionProperties: Properties 配置参数
): DataFrame
这种方法也是自定义分区,比较常用
例子:
以dept表为例,三个字段 deptno,dname,loc
val url = “jdbc:mysql://superman-bigdata.com:3306/o2o31”
val table = “dept”
val props = new Properties()
props.put(“user”,“root”)
props.put(“password”,“123456”)
val predicates = Array(“deptno >= 10 and deptno < 20”,“deptno >= 20 and deptno < 30”,“deptno >= 30 and deptno < 50”)
val df = spark.read.jdbc(url,table,predicates,props)
查看当前的DataFrame的分区数,应该为3
scala> df.rdd.partitions.size
res24: Int = 3
实际也是为3
查看具体数据的存放:
scala> df.rdd.glom().collect()
Array(
Array([10,ACCOUNTING,NEW YORK]),
Array([20,RESEARCH,DALLAS]),
Array([30,SALES,CHICAGO], [40,OPERATIONS,BOSTON])
)
验证没有任何问题
================================================================================
write(写):DataFrameWriter
功能:将DataFrame中的数据输出到外部
备注:默认的情况下。输出到HDFS中,那么默认使用的是snappy压缩以及parquet类型的文件
write下的所有的方法都在这个文件中org.apache.spark.sql.DataFrameWriter
相关方法:
def mode(saveMode: SaveMode): DataFrameWriter[T]
表示数据输出的模式是什么
SaveMode.Overwrite
: overwrite the existing data. //覆盖SaveMode.Append
: append the data. //追加SaveMode.Ignore
: ignore the operation (i.e. no-op). //忽视,如果存在,则不作任何操作SaveMode.ErrorIfExists
: default option, throw an exception at runtime. //默认的,如果存在则报错def mode(saveMode: String): DataFrameWriter[T]
表示数据输出的模式是什么
和上面的区别在于,参数的数据类型不一样。
this.mode = saveMode.toLowerCase(Locale.ROOT) match {
case “overwrite” => SaveMode.Overwrite
case “append” => SaveMode.Append
case “ignore” => SaveMode.Ignore
case “error” | “default” => SaveMode.ErrorIfExists
def format(source: String): DataFrameWriter[T]
表示指定数据数据的类型。如果不给定,则默认是parquet类型
def option(key: String, value: String): DataFrameWriter[T]
表示可以添加的参数,可给可不给,可以给定多个
使用option的时候,一次只能给一个参数
如果你需要添加10个参数,那么久需要使用10次option
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
表示可以同时添加多个参数
如果你需要添加10个参数,可以先把10个键值对的参数保存在Map集合中,然后只需要使用一次options就可以访问到了
def partitionBy(colNames: String*): DataFrameWriter[T]
表示根据给定的字段进行分区。注意:输出的外部系统必须要支持分区,比如hive
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
表示根据指定的桶数以及分桶的字段,进行分桶操作。
问题1:hive中的分桶是干什么用的??
分桶是优化表之间的join的。增大查询效率
问题2:hive中的分区和分桶有什么区别以及各自的使用场景?
分区:为了将一个庞大的文件按照分区字段进行分割,便于我们获取部分数据,加快查询效率
分桶:是在分区的基础上,进一步的细分,使用场景就是在多表join的情况下,进行桶表之间的join操作
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
根据给定的字段排序:但是请注意,并不是简单的排序,而是对分桶表中的,桶内数据进行排序
def save(path: String): Unit
表示保存数据,直接给定一个输出的路径即可
def save(): Unit
表示保存数据,但是这个save是没有输入的参数的
这个保存类似于没有输入参数的load方法
df.write.format(“jdbc”).option(url).option(user).option(password).option(table).save()
def insertInto(tableName: String): Unit
表示dataframe中的数据插入表中
注意:这个表是什么表
1.集成hive之后的,hive中的表
2.sparksession维护的临时表
例子:df.write.insertInto(“表名”)
def saveAsTable(tableName: String): Unit
表示将dataframe中的数据以表的形式保存到hive中,并不是临时表
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
表示基于jdbc的方式,将dataframe中的数据,以表的形式保存到数据库中
例子:
import java.util.Properties
val url = “jdbc:mysql://superman-bigdata.com:3306/yangpu1005”
val table = “dept”
val props = new Properties()
props.put(“user”,“root”)
props.put(“password”,“123456”)
spark.read.table(“o2o31.dept”).write.mode(“overwrite”).jdbc(url,table,props)
def json(path: String): Unit
表示以json的形式保存到一个路径
def parquet(path: String): Unit
表示以parquet的形式保存到一个路径
def orc(path: String): Unit
表示以orc的形式保存到一个路径
def text(path: String): Unit
表示以text的形式保存到一个路径
def csv(path: String): Unit
表示以csv的形式保存到一个路径
总结:
当我们使用sparksql读或者写的时候,可以使用read和write下自带的方法
如果当你遇到sparksql没有自带的方法的时候,首先可以从spark的第三方库中
找对应的解决方法:
第三库的地址:
https://github.com/databricks/ https://spark-packages.org/
================================================================================
SparkSQL的案例:
题目:将Hive表数据输出到MySQL表中,将Hive表和MySQL表进行数据Join操 作,并将最终结果保存为Parquet格式的数据,存储在HDFS中
1.在windows下的IDEA中编写代码
-1.在项目的pom文件中添加一些依赖
org.apache.spark
spark-hive_2.11
2.2.1
-2.将hive-site.xml文件放到项目的resource目录中
-3.如果你在执行代码的时候,报错没有权限访问数据库
解决方法:
进入mysql命令行
输入:grant all on *.* to root@'localhost' identifiedy '123456';
flush privileges;
-4.编写代码
代码:
报错:
Failed to connect to /172.31.210.61:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
从报错来看可知:
1.连接超时 ==》 指的是连接HDFS的datanode节点超时了
所以这个问题就是出在了HDFS上面了。
其实呢,这个问题主要是出现在ip地址的通信上面
如果你使用的是虚拟机,则不会出现这个问题
如果你使用的是商业的服务器则就会出现这个问题
因为这边我使用的是阿里云的服务器存在内存ip和外网ip
我们的程序是从windows计算的,那么是通过外网主机名映射外网ip进入的服务器
但是datanode节点在计算的时候,是服务器的内存ip进行通信的,所以当返回结果的时候,仍然
使用的是内网ip,所以出不来。因此报错了
那怎么解决呢?
只要datanode节点计算使用的是主机名,就可以了
因为hadoop的hdfs-site.xml中的配置已经给我们解决了这个问题了,所以我们只需要在
hdfs-site.xml中加上这个配置即可
package com.bigdata.SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
object HiveJoinMysqlSparkSQL {
def main(args: Array[String]): Unit = {
//1.构建上下文
//回忆sparkcore的上下文如何构建
/* val conf = new SparkConf()
.setMaster(“local[*]”)
.setAppName(“HiveJoinMysqlSparkSQL”)
val sc = SparkContext.getOrCreate(conf)*/ //SparkSQL的上下文的构建方式 SparkSession(spark) val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("HiveJoinMysqlSparkSQL") .enableHiveSupport() //表示当sparksql需要集成hive的时候,添加这个参数,如果不需要集成,则不需要添加 .getOrCreate() //2.案例具体实现 //2.1首先将hive中的dept表保存到mysql中 val url = "jdbc:mysql://superman-bigdata.com:3306/yangpu1005" val table = "dept" val props = new Properties() props.put("user","root") props.put("password","123456") spark .read .table("o2o31.dept") .write .mode("overwrite") .jdbc(url,table,props) //2.2将hive中的o2o31数据库下的emp表和mysql中的dept表进行join操作 //注意:接下来我们需要编写join的sql语句,但是由于mysql中的表不能直接使用sql语句访问到,所以 //我们需要先将mysql中的dept表通过sparksession注册为临时表保存 //2.2.1将mysql中的dept表注册为临时表 val df: DataFrame = spark.read.jdbc(url,table,props) df.createTempView("tmp_dept") //临时表的讲解
/* df.createTempView(“dept1”) //创建临时视图
df.createOrReplaceTempView(“dept2”) //创建临时视图,如果表名存在,则替换
spark.sql("select * from dept1").show()
spark.sql("select * from dept2").show()*/
/* df.createGlobalTempView(“dept1”) //创建全局临时视图
df.createOrReplaceGlobalTempView(“dept2”) //创建全局临时视图,如果表名存在,则替
spark.sql(“select * from dept1”).show()
spark.sql(“select * from dept2”).show()
org.apache.spark.sql.AnalysisException: Table or view not found: dept1;
org.apache.spark.sql.AnalysisException: Table or view not found: dept2;
报错:表示这两个个全局的临时表,都没有找到
解决的方法:普通的临时表示可以直接访问到的,但是全局临时表是默认存放在一个数据库里面的
这个数据库的名字叫做global_temp,所以访问的时候,必须要在表名前加上数据库的名称
spark.sql(“select * from global_temp.dept1”).show()
spark.sql(“select * from global_temp.dept2”).show()*/
/* df.createTempView(“dept1”) //创建临时视图
df.createGlobalTempView(“dept2”) //创建全局临时视图
//临时视图和全局临时视图的区别?
//spark.sql(“select * from dept1”).show() //可以访问
//spark.sql(“select * from global_temp.dept2”).show() //可以访问
//构建一个新的SparkSession对象
val newSpark = spark.newSession()
newSpark.sql(“select * from dept1”).show() // Table or view not found: dept1;
newSpark.sql(“select * from global_temp.dept2”).show() //可以访问
//总结:createTempView和createGlobalTempView的区别在于:
//临时视图只能被当前的sparksession对象访问
//全局临时视图可以被程序中任意的sparksession对象访问*/
//2.2.2hive中的o2o31数据库下的emp表和临时视图tmp_dept表进行join操作 //sql语句中可以直接访问的表是sparksql集成hive后的hive的表或者是sparksession维护的临时表 //如果你觉得代码运行的时候,日志太多了,那么我们可以修改日志输出的等级 //-1.在上下文那边使用sc.setLogLevel("日志等级") //-2.将spark目录下的conf下的log4j文件放置到reources里面,然后修改配置文件 val df1 = spark.sql( """ |select |tmp.empno,tmp.ename,tmp.dname,tmp.loc |from |( select |e.*,d.dname,d.loc |from o2o31.emp e |join tmp_dept d on e.deptno = d.deptno ) as tmp |where tmp.empno < '7900' """.stripMargin) df1.show() df1.createTempView("tmp_hive_join_mysql_result") //2.3将结果保存到HDFS中,默认是parquet文件,使用snappy压缩 df1 .write .mode("overwrite") .save("/yangpu1005/tmp_hive_join_mysql_result") //为什么HDFS上没有出现/yangpu1005/tmp_hive_join_mysql_result这个路径?? //我们当前的环境是windows的环境 //所以默认的文件系统的路径是widnows的路径 //所以这个/yangpu1005/tmp_hive_join_mysql_result表示是当前windows下的路径 //其实完整写法是:file:///c:/yangpu1005/tmp_hive_join_mysql_result //解决方法: //-1.在路径前面加上hdfs://superman-bigdata.com:9000 没有问题 //如果你想把当前spark的文件系统的环境修改为HDFS //可以将core-site.xml和hdfs-site.xml文件放到reosurces里面即可 //2.4将结果保存到hive中 df1 .write .mode("overwrite") .saveAsTable("yangpu1005.tmp_hive_join_mysql_result") //2.5将结果保存到mysql中 val table1= "tmp_hive_join_mysql_result" df1 .write .mode("overwrite") .jdbc(url,table1,props) //3.表的缓存 //为什么要缓存表的数据? //因为当一张表经常被用到,我们将表缓存下来,那么以后再用的时候,直接从内存中获取, // 不需要再执行之前的代码了,这样呢,可以加快代码执行的效率 spark.read.table("tmp_hive_join_mysql_result").cache() spark.sql("select * from tmp_hive_join_mysql_result").cache() spark.sql("cache table tmp_hive_join_mysql_result") df1.cache() //当程序结束了,我们就可以释放缓存 spark.read.table("tmp_hive_join_mysql_result").unpersist() spark.sql("select * from tmp_hive_join_mysql_result").unpersist() spark.sql("uncache table tmp_hive_join_mysql_result") df1.unpersist()
}
}
================================================================================
SparkSQL的应用程序提交在yarn上运行
1.项目代码打包
注意点:
-1.将上下文中的setMaster一行注释掉。
2.上传jar包到linux
3.编写提交命令
bin/spark-submit
–class com.bigdata.SparkSQL.HiveJoinMysqlSparkSQL
–master yarn
–deploy-mode cluster
/opt/cdh5.14.2/modules/spark-2.2.1-bin-2.6.0-cdh5.14.2/extjars/SparkDemo.jar
程序运行成功,但是在控制台上没看到任何输出结果。
在spark on yarn 的模式下,cluster模式,表示driver在集群中的任意一台worker上面,所以当前机器不是driver所在的地方,所以看不到返回的结果
client模式下,driver在提交应用程序的那一台机器上,所以可以看到
bin/spark-submit
–class com.bigdata.SparkSQL.HiveJoinMysqlSparkSQL
–master yarn
–deploy-mode client
–driver-memory 1024m
–executor-memory 1024m
–executor-cores 2
/opt/cdh5.14.2/modules/spark-2.2.1-bin-2.6.0-cdh5.14.2/extjars/SparkDemo.jar
================================================================================
案例:RDD DataFrame DataSet 之间的转换和关系
RDD是什么???
弹性分布式数据集
rdd是不可改变的,分区的,不保存数据,仅仅只是保存元数据信息
rdd的底层实现,rdd使用textFile的时候,通过使用mapreduce的inputformat接口实现读取数据
rdd使用textFile的时候,默认分区数是通过两个公式得到的
minPartitions: Int = defaultMinPartitions
math.min(defaultParallelism, 2)
conf.getInt(“spark.default.parallelism”, math.max(totalCoreCount.get(), 2))
totalCoreCount.get()是集群总的核心数
rdd使用parallelize的时候,默认的分区数是通过一条公式得到的
conf.getInt(“spark.default.parallelism”, math.max(totalCoreCount.get(), 2))
DataFrame是以RDD为基础的分布式数据集
DataFrame = RDD + schema(二维表格)
泛型为Row
DataSet是以RDD为基础的分布式数据集
DataSet泛型是DataSet[T]
DataFrame是特殊的DataSet
DataFrame= RDD + schema = DataSet[Row]
代码:
package com.bigdata.SparkSQL
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object RDDAndDataFrameAndDataSet {
def main(args: Array[String]): Unit = {
//1.构建上下文 val spark = SparkSession .builder() .master("local[*]") .appName("RDDAndDataFrameAndDataSet") .enableHiveSupport() .getOrCreate() //2.读取数据形成DataFrame val df: DataFrame = spark.read.table("o2o31.dept") //3.DataFrame如何转成rdd val rdd: RDD[Row] = df.rdd //4.DataFrame如何转成DataSet //当DataFrmae转成DataSet的时候,必须要导入隐式转换 import spark.implicits._ //4.1如果你想讲当前的dataframe中的字段名称修改掉,你可以再次使用toDF来修改 //|deptno|dname| loc| =》 a,b,c val df1: DataFrame = df.toDF("a","b","c") //4.2DataFrmae转成DataSet,并没有直接toDS的方法,必须使用转换的API,比如: val ds: Dataset[(Int, String, String)] = df.map(row => { val deptno = row.getAs[Int]("deptno") val dname = row.getAs[String]("dname") val loc = row.getAs[String]("loc") (deptno,dname,loc) }) ds.show()
/* ±–±---------±-------+
| _1| _2| _3|
±–±---------±-------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
±–±---------±-------+*/
//4.3DataSet转成RDD
val rdd2: RDD[(Int, String, String)] = ds.rdd
//4.4DataSet转成DataFrame
val arr = Array("id","name","address")
val df2: DataFrame = ds.toDF()
val df3 = ds.toDF(arr:_*)
df2.show()
df3.show()
/* ±–±---------±-------+
| _1| _2| _3|
±–±---------±-------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
±–±---------±-------+
+---+----------+--------+
| id| name| address|
+---+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+---+----------+--------+*/
}
}
================================================================================
根据一个已经存在的RDD,如何转成DataFrame DataFrame = RDD + schema
官方地址:http://spark.apache.org/docs/2.2.1/sql-programming-guide.html#interoperating-with-rdds
有两种方式:
方式1:Inferring the Schema Using Reflection
利用反射机制来自动推断shcema信息
方式2:Programmatically Specifying the Schema
直接给定schema信息
================================================================================
SparkSQL的内置函数和自定义函数的使用
sparksql的内置函数,都在这个文件中org.apache.spark.sql.functions中
一般请况下,hive中的内置函数,sparksql都可以使用
使用方式,直接使用,和hive一致
select max(sal) from emp;
select min(sal) from emp;
SparkSQL的自定义函数:
hive的自定义函数:
UDF:一对一 转换函数
UDAF:多对一 聚合函数
UDTF:一对多 转换函数
sparksql的自定义函数:
UDF:转换函数
UDAF:聚合函数
注意:在Sparksql中,创建的自定义函数必须要使用sparksession的udf函数注册之后才能使用
================================================================================
案例:使用RDD转成DataFrame和sparksql的自定义函数,这两个知识点,来学习以下案例
报错:
Exception in thread “main” java.lang.IllegalArgumentException: Error while instantiating ‘org.apache.spark.sql.hive.HiveSessionStateBuilder’:
注意:只要在运行代码中,看到HiveSessionStateBuilder。。。metastore。。。。那就是元数据服务的问题
要么元数据服务没开,要么元数据服务配置错误
代码:
package com.bigdata.SparkSQL
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.util.Random
case class student(id:Int,sal:Double)
object UDFandUDAFSparkSQL {
def main(args: Array[String]): Unit = {
//1.构建上下文 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("UDFandUDAFSparkSQL") .enableHiveSupport() .getOrCreate() //1.1.因为我们要使用RDD转成DataFrame,所以我们需要先构建RDD,所以需要构建SparkContext //但是我们并不需要使用完整的创建SparkContext的方式,因为SparkSession本身就是基于SparkContext的 //所以直接使用已经构建的SparkSession就可以得到SparkContext val sc: SparkContext = spark.sparkContext //!!!!自定义函数习惯上写在上下文这边 spark.udf.register("UDFRound",(value:Double,scale:Int) => { import java.math.BigDecimal val bd: BigDecimal = new BigDecimal(value) bd.setScale(scale,BigDecimal.ROUND_HALF_UP).doubleValue() }) spark.udf.register("UDAFAVG",UDAFAVG) //2.读取数据形成RDD,这边的数据,我们使用代码模拟产生,最终得到的是键值对数据(Int,Double) val random = Random val rdd: RDD[(Int, Double)] = sc.parallelize((0 to 10).flatMap(i => { //随机得到重复的key的值 (0 to (random.nextInt(10) + 2)).map(j => i) }).map(i => { //随机得到value值 (i,random.nextDouble() * 1000 + 10) })) //println(rdd.collect().mkString("\n")) //3.将RDD转成DataFrame //3.1方式1:Inferring the Schema Using Reflection //利用反射机制来自动推断shcema信息 DataFrame = rdd + schema //注意:使用这种方法,必须导入隐式转换 import spark.implicits._ val df1 = rdd.toDF("id","sal") //df1.show() //3.2方式2:Programmatically Specifying the Schema //直接给定schema信息 //3.2.1 // def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame //rdd: RDD[(Int, Double)] val rdd1: RDD[student] = rdd.map(t => { student(t._1,t._2) }) val df2: DataFrame = spark.createDataFrame(rdd1) //df2.show() //3.2.2 //def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame //rdd: RDD[(Int, Double)] val rowRDD: RDD[Row] = rdd.map(t => Row(t._1,t._2)) val schema = StructType(Array( StructField("id",IntegerType), StructField("sal",DoubleType) )) val df3: DataFrame = spark.createDataFrame(rowRDD,schema) //df3.show() //将得到的DataFrame注册为临时表 df1.createTempView("tmp01") //id,sal //4.需求分析 //4.1需求1:计算每个用户的平均薪资 //4.2需求2:在平均值的基础上,进行保留两位小数,并四舍五入,使用round函数 //4.3需求3:我们不使用round,自己使用自定义函数来创建一个和round一样效果的函数 //4.4需求4:平均值我们可以直接使用avg,但是你知道avg底层是如何实现的吗? //平均值 = 总和 / 总数 自定义UDAF函数来实现 //因为UDAF函数,必须要继承UDAF,所以我们一般会重新创建一个object文件 spark .sql( """ |select |id, |avg(sal) as sal1, |round(avg(sal),2) as sal2, |UDFRound(avg(sal),2) as sal3, |UDAFAVG(sal) as sal4 |from tmp01 |group by id """.stripMargin).show()
}
}
自定义聚合函数
package com.bigdata.SparkSQL
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
//继承UADF
//在scala中,extends即可继承也可以实现
//当extends后面跟的class,那就是继承,后面跟的是特质,那就是实现
object UDAFAVG extends UserDefinedAggregateFunction{
//平均值怎么算?
//总和 / 总数 UDAFAVG(sal:Double)
//输入数据的数据类型
override def inputSchema: StructType = {
StructType(Array(
StructField(“v1”,DoubleType)
))
}
//缓存区的数据类型
override def bufferSchema: StructType = {
StructType(Array(
StructField(“b1”,DoubleType), //总和字段
StructField(“b2”,IntegerType) //总数字段
))
}
//最终的函数的返回值类型
override def dataType: DataType = DoubleType
//多次计算结果,最终的返回值是否一致
override def deterministic: Boolean = true
//初始化缓存区的值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0,0.0) //总和字段的初始值
buffer.update(1,0) //总数字段的初始值
}
//缓存区数据输入开始更新(数据的更新是每个分区中都会进行的)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//1.读取输入的数据
val inputvalue = input.getDouble(0) //当前输入的数据
//2.取出缓存区的数据
val buffersum = buffer.getDouble(0) //取出缓存区中的总和字段的值
val buffercount = buffer.getInt(1) //取出缓存区中的总数字段的值
//3.更新数据
buffer.update(0,buffersum + inputvalue)
buffer.update(1,buffercount + 1)
}
//合并,在全局范围内,将每个分区中得到的 临时的总和和总数再次聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
//1.获取输入数据
val buffersum1 = buffer2.getDouble(0)
val buffercount1 = buffer2.getInt(1)
//2.获取缓存区的数据
val buffersum2 = buffer1.getDouble(0)
val buffercount2 = buffer1.getInt(1)
//3.更新数据
buffer1.update(0,buffersum1 + buffersum2)
buffer1.update(1,buffercount1 + buffercount2)
}
//计算平均值
override def evaluate(buffer: Row): Double = {
buffer.getDouble(0) / buffer.getInt(1)
}
}
================================================================================
案例:使用SparkSQL读取HDFS上的CSV文件
代码:
package com.bigdata.SparkSQL
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadCSVSparkSQL {
def main(args: Array[String]): Unit = {
//1.构建上下文 val spark = SparkSession .builder() .master("local[*]") .appName("ReadCSVSparkSQL") .enableHiveSupport() .config("spark.sql.shuffle.partitions","4") .getOrCreate() //1.1构建SparkContext val sc = spark.sparkContext //2.读取数据形成RDD val path = "hdfs://superman-bigdata.com:9000/input/taxi.csv" val rdd: RDD[String] = sc.textFile(path) //3.强调RDD转成DataFrame的方式 //3.1方式1:基于反射机制自动推断schema信息 toDF的方式 val rdd1: RDD[(Int, Double, Double, String)] = rdd .map(line => line.split(",")) .filter(arr => arr.length == 4) .map(arr => (arr(0).toInt,arr(1).toDouble,arr(2).toDouble,arr(3))) import spark.implicits._ val df1 = rdd1.toDF("id","lat","lon","time") //df1.show() //3.2方式2:直接给定schema信息。 val schema =StructType(Array( StructField("id",IntegerType), StructField("lat",DoubleType), StructField("lon",DoubleType), StructField("time",StringType) )) val df2: DataFrame = spark .read .format("csv") .schema(schema) .load(path) //df2.show() //我们现在使用的是spark2.2.1的版本,因为spark2.0以后是自带读取csv的方法的。 //如果我们在读取数据的时候,sparksql并没有直接提供,就好比写入hbase一样,我们需要将DataFrame转成RDD之后,再写入 //因此,当我们遇到sparksql没有直接提供api的时候,首先去第三方库找有没有现成的api方法 //接下来,举一个spark1.6的时候,没有直接读取csv文件的方法,如何使用第三方库 //-1.查找第三方网址https://github.com/databricks/ https://spark-packages.org/ //https://github.com/databricks/spark-csv //-2.在项目的pom文件中添加第三方依赖 val df3 = spark .read .format("com.databricks.spark.csv") .option("header","false") .schema(schema) .load(path) //df3.show() //将dataframe注册为临时表 df3.createTempView("tmp_taxi") //4.具体的需求实现 //4.1获取出租车司机在同一个时间点(小时)的载客信息 spark .sql( """ |select id, |substring(time,0,2) as hour |from tmp_taxi """.stripMargin).createTempView("tmp01") //4.2得到各个出租车司机在各个时间点的载客数 spark .sql( """ |select |id, |hour, |count(1) as count |from tmp01 |group by id,hour """.stripMargin).createTempView("tmp02") //4.3获取每个时间点,载客数最多的前3名出租车司机的信息 ==》 row_number spark .sql( """ |select |tmp03.id,tmp03.hour,tmp03.count,tmp03.rn |from |(select id,hour,count, |row_number() over(partition by hour order by count desc) as rn |from tmp02) as tmp03 |where tmp03.rn <= 3 """.stripMargin).createTempView("result") spark.read.table("result").cache() //5.结果输出 //5.1输出到HDFS spark .read .table("result") .repartition(2) .write .mode("overwrite") .format("json") .save("hdfs://superman-bigdata.com:9000/yangpu1005/result") //5.2输出到hive中 spark .read .table("result") .repartition(2) .write .mode("overwrite") .saveAsTable("yangpu1005.result") //5.3输出到mysql中 val url = "jdbc:mysql://superman-bigdata.com:3306/yangpu1005" val table = "result" val props = new Properties() props.put("user","root") props.put("password","123456") spark .read .table("result") .repartition(2) .write .mode("overwrite") .jdbc(url,table,props) //5.4作业:自己写代码,将dataframe转成RDD,然后把数据写入hbase的表中 spark.read.table("result").unpersist()
}
}
================================================================================
SparkSQL的开发方式:
1.sql + hql:写sql语句
2.DSL语法:api的链式编程
案例:SparkSQL的DSL语法的编写 (使用DSL语法的时候,dataframe不需要注册为临时表的)
代码:
package com.bigdata.SparkSQL
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Int,sex:String,sal:Double,deptno:Int)
case class Dept(deptno:Int,dname:String)
object SparkSQLDSLDemo {
def main(args: Array[String]): Unit = {
//1.构建上下文 val spark = SparkSession .builder() .master("local[*]") .appName("SparkSQLDSLDemo") .enableHiveSupport() .getOrCreate() //2.构建SparkContext val sc: SparkContext = spark.sparkContext //自定义函数 ==》UDF spark.udf.register("sexToInt",(sex:String)=>{ //采用什么方式,将M和F 变成 1 或者 0 sex match { case "M" => 1 case "F" => 0 case _ => -1 } }) //3.读取数据形成RDD ==》 模拟创建两张表 ==》 两份数据RDD //这个是员工表的数据 val rdd1: RDD[Person] = sc.parallelize(Array( Person("xiaoming",88,"F",6731.2,2), Person("laowang",56,"F",8392.1,2), Person("caixukun",17,"M",7429.1,2), Person("xiaohong",21,"M",781.4,1), Person("zhangsan",67,"F",134.1,1), Person("lisi",20,"F",1898.3,2), Person("wangwu",51,"M",4532.9,1), Person("lufei",22,"M",8998.4,1) )) //这个是部门表的数据 val rdd2: RDD[Dept] = sc.parallelize(Array( Dept(1,"吃饭部"), Dept(2,"打牌部") )) //将RDD转成DataFrame import spark.implicits._ val personDataFrame: DataFrame = rdd1.toDF() //员工信息表 val deptDataFrame: DataFrame = rdd2.toDF() //部门信息表 personDataFrame.show() deptDataFrame.show() //因为接下来,这两个dataframe会被多次使用,所以建议缓存一下 personDataFrame.cache() deptDataFrame.cache() //4.使用DSL语法,来进行练习 //sql中的select查询 println("=====================select=========================")
/* import org.apache.spark.sql.functions._
//select name,age,sex from person
personDataFrame.select(“name”,“age”,“sex”).show()
personDataFrame.select(
"
n
a
m
e
"
.
a
l
i
a
s
(
"
n
a
m
e
1
"
)
,
"name".alias("name1"),
"name".alias("name1"),“age”.alias(“age1”),$“sex”).show()
personDataFrame.select(col(“name”).alias(“name2”),col(“age”),col(“sex”)).show()
//注意:当我们使用自定义函数查询的时候,不能直接使用select,而是使用selectExpr来查询
personDataFrame.selectExpr(“name”,“age”,“sexToInt(sex) as sex2”).show()*/
//sql中的where条件
println("====================where/filter==================")
/* //select name,age,sex from person where age > ‘30’
personDataFrame
.where(“age > 30”)
.select(“name”,“age”,“sex”)
.show()
personDataFrame .where($"age" > 22) .filter("sex = 'M'") .select("name","age","sex") .show() personDataFrame .where("age > 22 and sex = 'M' and deptno = 1") .show() personDataFrame .where($"age" > 22 && $"sex" === "M" && $"deptno" === 1) .show()*/ //sql中的排序 ==》 全局排序 、 局部排序 //hive中全局排序是order by 局部排序是sort by println("=====================sort=====================") //全局排序 ==> sort orderby
/* personDataFrame
.sort(“sal”) //默认从小到大排序
.select(“name”,“sal”)
.show()
personDataFrame
.sort($"sal".desc) //从大到小排序
.select("name","sal")
.show()
personDataFrame
.orderBy("sal")
.select("name","sal")
.show()*/
//局部排序 => 全局没有顺序,但是分区中的数据是排序的
/* personDataFrame
.repartition(3)
.sortWithinPartitions($“age”.desc) //从大到小
.select(“name”,“age”)
.write
.mode(“overwrite”)
.format(“json”)
.save(“file:///C:\Users\ibf\Desktop\data”)*/
//sql中的分组之后的聚合操作
//select avg(sal) from person group by sex
/* println("===groupby aggregate")
import org.apache.spark.sql.functions._
personDataFrame
.groupBy(“sex”)
.agg(
avg(“sal”),
max(“age”)
)
.show()
//如果需要修改别名 personDataFrame .groupBy("sex") .agg( avg($"sal").as("sal1"), max($"age").as("age1") ) .show() personDataFrame .groupBy("sex") .agg( "sal" -> "avg", "age" -> "max" ) .show()*/ //sql中的join操作 personDataFrame deptDataFrame //select * from emp e join dept d on e.deptno = d.deptno println("==================join=====================")
/* //inner join
personDataFrame
.join(deptDataFrame,“deptno”)
.show()
//left join personDataFrame .join(deptDataFrame,Seq("deptno"),"left") .show() //right join personDataFrame .join(deptDataFrame,Seq("deptno"),"right") .show()*/ //sql中的窗口函数 //select empno,ename,sal,row_number() over(partition by empno order by sal desc) from emp println("===================窗口函数===============") val w = Window.partitionBy("sex").orderBy($"sal".desc) //不同性别中薪资最高的前3名 import org.apache.spark.sql.functions._ personDataFrame .select($"name",$"sex",$"sal",row_number().over(w).as("rn")) .where("rn <= 3") .show() //当程序写完之后,将缓存释放掉 personDataFrame.unpersist() deptDataFrame.unpersist()
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。