赞
踩
Spark SQL的DataFrame接口支持多种数据源的操作。可以使用关系转换进行操作,也可以被注册为临时视图。将DataFrame注册为临时视图,即可以通过SQL进行数据查询。
Spark SQL的默认数据源格式为Parquet文件格式,修改配置项spark.sql.sources.default即可更改默认的数据源格式。
操作默认数据源
//load用于read读取文件时加载文件中的数据
val usersDF = spark.read.load("../examples/src/main/resources/users.parquet")
//save用于write写文件时存储写出的数据,未指定具体保存路径的情况下,则文件保存在bin目录下
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手动指定选项
当数据源格式不是默认的Parquet格式的文件时,需要手动指定将要使用的数据源格式,数据源格式需要指定全名(例如org.apache.spark.sql.parquet),但内置的来源,则使用短名称(json, parquet, jdbc, orc, libsvm, csv, text)来指定数据的格式。从任何数据源类型加载的DataFrame都可以使用此语法转换为其他类型。
可以使用SparkSession提供的read.load方法加载数据,使用write.save方法保存数据。
1.1 加载json文件:
val peopleDF = spark.read.format("json").load("../examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
1.2 加载csv文件:
val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("../examples/src/main/resources/people.csv")
peopleDFCsv.select("name", "age").write.format("parquet").save("namesAndAges2.parquet")
在写操作期间还会使用额外的选项。例如:可以控制ORC数据源的Bloom过滤器和字典编码。
//创建bloom过滤器,favorite_color并为name和使用字典编码favorite_color。
usersDF.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("users_with_options.orc")
除了使用读取API将文件加载到DataFrame中并进行查询之外,还可以直接在文件上运行SQL
val sqlDF = spark.sql("SELECT * FROM parquet.`../examples/src/main/resources/users.parquet`")
Scala/Java | 任何语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (默认) | “error” or “errorifexists” (默认) | 将DataFrame保存到数据源时,如果已经存在数据,则将引发异常。 |
SaveMode.Append | “append” | 将DataFrame保存到数据源时,如果已经存在数据/表,则应该将DataFrame的内容附加到现有数据中。 |
SaveMode.Overwrite | “overwrite” | 将DataFrame保存到数据源时,如果已经存在数据/表,则预期现有数据将被DataFrame的内容覆盖。 |
SaveMode.Ignore | “ignore” | 将DataFrame保存到数据源时,如果已经存在数据,则预期保存操作将不保存DataFrame的内容并且不更改现有数据。这类似于CREATE TABLE IF NOT EXISTSSQL中的。 |
与createOrReplaceTempView命令不同,saveAsTable将具体化DataFrame的内容,并在Hive元存储中创建一个指向数据的指针。即使重新启动Spark程序,持久表仍将存在,只要保持与同一metastore的连接即可。可以使用表名在SparkSession上调用方法来创建持久表的DataFrame 。
对于文本,Parquet,json等基于文件的数据源,保存时可以通过path选项指定自定义表路径,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。(存储在指定路径下的文件格式为parquet)。删除表后,自定义表路径将不会删除,且表数据仍然存在。如果未指定自定义表路径,Spark会将数据写入仓库目录下的默认表路径。删除表时,默认表路径也将被删除。
从Spark 2.1开始,持久数据源表在Hive元存储中存储了按分区的元数据。这带来了几个好处:
1)由于元存储只能返回查询的必要分区,因此不再需要在第一个查询中将所有分区发现到表中。
2)Hive DDL,例如ALTER TABLE PARTITION … SET LOCATION现在可用于使用Datasource API创建的表。
需要注意的是,在创建外部数据源表(带有path选项的表)时,默认情况下不会收集分区信息。要同步元存储中的分区信息,可以调用MSCK REPAIR 表名,例:MSCK REPAIR people。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
当使用DataSet的API时,对于save和saveastable,都可以使用分区。
//会在HDFS上的当前用户路径下产生一个文件夹namesPartByColor.parquet,里面包含了两个分区文件夹
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
//会在/user/hive/warehouse/namespartbycolor产生一个分区表。
userDF.write.partitionBy("favorite_color").format("parquet").saveAsTable("namesPartByColor")
可以对单个表同时使用分区和分桶:
usersDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("users_partitioned_bucketed")
partitionBy按照以下二.2节“分区自动识别”部分中的描述创建目录结构。因此,它对具有高基数的列的适用性有限。相反, bucketBy将数据分布在固定数量的存储桶中,并且当许多唯一值不受限制时可以使用。
Parquet是一种列存储格式的文件,支持许多的数据处理系统。Spark SQL提供对Parquet文件的读写支持,该文件会自动保留原始数据的结构。编写Parquet文件时,出于兼容性原因,所有列都将自动转换为可为null的类型。
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("../examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEE
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。