当前位置:   article > 正文

Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作_spark sql操作

spark sql操作

一、基本操作
Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。
二、默认数据源
(一)默认数据源Parquet
默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此不可以直接读取,文件中包括该文件的实际数据和Schema信息,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。Spark SQL可以很容易地读取Parquet文件并将其数据转为DataFrame数据集。
(二)案例演示读取Parquet文件
执行命令: cd $SPARK_HOME/examples/src/main/resources,查看Spark的样例数据文件users.parquet


cat命令显示users.parquet文件内容,只会显示乱码

将数据文件users.parquet上传到HDFS的/datasource/input目录


1、在Spark Shell中演示
启动Spark Shell,执行命令:spark-shell --master spark://master:7077 


执行命令:val userdf = spark.read.load("hdfs://master:9000/datasource/input/users.parquet")

 

 


执行命令:userdf.show,查看数据帧内容


执行命令:userdf.printSchema,查看数据帧模式

 

 


执行命令:userdf.select("name", "favorite_color").write.save("hdfs://master:9000/datasource/output"),对数据帧指定列进行查询,查询结果依然是数据帧,然后通过write成员的save()方法写入HDFS指定目录


查看HDFS上的输出结果


除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。

 

 

 

基于数据帧创建临时视图,执行命令:userdf.createTempView("t_user")


执行SQL查询,将结果写入HDFS,执行命令:spark.sql("select name, favorite_color from t_user").write.save("hdfs://master:9000/datasource/output2")


查看HDFS上的输出结果


三、手动指定数据源
(一)format()与option()方法概述
使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(JSON、Parquet、JDBC、ORC、Libsvm、CSV、Text)。
通过手动指定数据源,可以将DataFrame数据集保存为不同的文件格式或者在不同的文件格式之间转换。
在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数。
(二)案例演示读取不同数据源
1、读取csv文件
执行命令:cd $SPARK_HOME/examples/src/main/resources,查看Spark的样例数据文件people.csv


将people.csv文件上传到HDFS的/datasource/input目录,然后查看文件内

 


在Spark Shell里,执行命令:val peopleDF = spark.read.format("csv").load("hdfs://master:9000/datasource/input/people.csv"),读取人员csv文件,得到人员数据帧


执行命令:peopleDF.show,查看人员数据帧内容

 

 

 


大家可以看到,people.csv文件第一行是字段名列表,但是转成数据帧之后,却成了第一条记录,这样显然是不合理的,怎么办呢?就需要用到option()方法来传递参数,告诉Spark第一行是表头header,而不是表记录。

执行命令:val peopleDF = spark.read.format("csv").option("header", "true").load("hdfs://master:9000/datasource/input/people.csv")


执行命令:peopleDF.show,查看人员数据帧内容

 

 


由于csv文件默认分隔符是逗号,而people.csv的分隔符是分号,因此要利用option("delimiter", ";")告诉Spark

执行命令:val peopleDF = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("hdfs://master:9000/datasource/input/people.csv")


执行命令:peopleDF.show,查看人员数据帧内容

 

 


2、读取json,保存为parquet
查看people.json文件


将people.json上传到HDFS的/datasource/input目录,并查看其内容

 

 

 


在Spark Shell里,执行命令:val peopleDF = spark.read.format("json").load("hdfs://master:9000/datasource/input/people.json")


执行命令:peopleDF.show

 


执行命令:peopleDF.select("name", "age").write.format("parquet").save("hdfs://master:9000/datasource/output4") (注意:format("parquet")其实可以省掉的)


查看生成的parquet文件(/datasource/output4/part-00000-a1e62c69-59e5-40b6-8391-89bdfffe61ff-c000.snappy.parquet)

 

 

 



四、数据写入模式
(一)mode()方法
在写入数据时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode。
使用SaveMode类,需要import org.apache.spark.sql.SaveMode;
(二)枚举类SaveMode
SaveMode.ErrorIfExists:默认值。当向数据源写入一个DataFrame时,如果数据已经存在,就会抛出异常。
SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,会在原有的基础上进行追加。
SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,就会将其覆盖(包括数据或表的Schema)。
SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,就不会写入内容,类似SQL中的CREATE TABLE IF NOT EXISTS。
(三)案例演示不同写入模式
查看数据源:people.json

查询该文件name里,采用覆盖模式写入/result,/result目录里本来有东西的

执行命令:val peopledf = spark.read.format("json").load("hdfs://master:9000/input/people.json")

导入SaveMode类,执行命令:peopledf.select("name").write.mode(SaveMode.Overwrite).format("json").save("hdfs://master:9000/result")

在slave1虚拟机上查看生成的json文件

查询age列,以追加模式写入HDFS的/result目录,执行命令:peopledf.select("age").write.mode(SaveMode.Append).format("json").save("hdfs://master:9000/result")

在slave1虚拟机上查看追加生成的json文件

五、分区自动推断
(一)分区自动推断概述
表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。
以people作为表名,gender和country作为分区列,给出存储数据的目录结构

(二)分区自动推断演示
1、建四个文件
在master虚拟机上/home里创建如下目录及文件,其中目录people代表表名,gender和country代表分区列,people.json存储实际人口数据

2、读取表数据
执行命令:spark-shell,启动Spark Shell

执行命令:val peopledf = spark.read.format("json").load("file:///home/people")

3、输出Schema信息
执行命令:peopledf.printSchema()

4、显示数据帧内容
执行命令:peopledf.show()

从输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gender和country,并将这两列的值添加到了数据帧peopledf中。
(三)分区自动推断注意事项
分区列的数据类型是自动推断的,目前支持数字、日期、时间戳、字符串数据类型。若不希望自动推断分区列的数据类型,则可以在配置文件中将spark.sql.sources.partitionColumnTypeInference.enabled的值设置为false(默认为true,表示启用)。当禁用自动推断时,分区列将使用字符串数据类型。
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/516033
推荐阅读
相关标签
  

闽ICP备14008679号