赞
踩
Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化。可以通过SQL、DataFrames API、Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理。用户可以根据自己喜好,在不同API中选择合适的进行处理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中执行。
执行SQL语句的方法有多种:
DataFrame是由分布式数据集合组成的一系列命名列,它与关系数据库的表类似,但有很多优化的地方。DataFrame支持多种数据源,包括结构化数据、Hive的表、外部数据库、RDDs等。DataFrame API支持scala 、java、Python和R语言。
数据集接口在Spark1.6才加入,它可以使用Spark SQL的优化器对RDD操作进行优化。Dataset有JVM对象构建,并可以进行map、flatMap、filter等操作。Dataset API统一接口支持java和scala语言。
SQLContext是Spark SQL所有功能的入口,通过SparkContext可以创建该对象的实例:
- val sc: SparkContext // An existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
除了SQLContext,还可以创建HiveContext对象,它包含更多的功能,例如HiveQL解析器支持更完善的语法、使用Hive用户自定义函数UDFs、从Hive表中读取数据等。HiveContext不依赖Hive是否安装,Spark默认支持HiveContext。从Spark1.3以后,推荐使用HiveContext,未来SQLContext会包含HiveContext中的功能。
可以通过spark.sql.dialect选项更改SQL解析器,这个参数可以再SQLContext的setConf方法设置,也可以通过SQL的ky=value语法设计。在SQLContext中dialect只支持一种简单的SQL解析器“sql”。HiveContext默认解析器是“hiveql”,同时支持“sql”,但一般推荐hiveql,因为它语法更全。
DataFrames的数据源多种多样,例如RDD、Hive table或者其他数据源。 下面代码从JSON文件创建了一个DataFrame
- JavaSparkContext sc = ...; // An existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
-
- DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();
DataFrame支持结构化数据领域常用的数据操作,支持Scala、Java、Python和R语言,下面是一些基本操作示例:
- JavaSparkContext sc // An existing SparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
- // Create the DataFrame
- DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
-
- // Show the content of the DataFrame
- df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1
对于DataFrame的所有操作类型可以参考API文档。除了简单的列操作,DataFrame还支持字符串操作、日期算法、数据操作等等,可以参考DataFrame函数文档
SQLContext的sql方法支持运行sql语法的查询,并返回DataFrame类型的结果集:
- SQLContext sqlContext = ... // An existing SQLContext
- DataFrame df = sqlContext.sql("SELECT * FROM table")
Dataset与RDD类似,但它不适用java序列化也不适用Kryo,而是使用特定的Encoder作为序列化工具。Encoder可以对Spark对象进行序列化和反序列化,同时不需要反序列化在字节级别就能支持filtering、sorting和hashing等操作。
- // Encoders for most common types are automatically provided by importing sqlContext.implicits._
- val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) // Encoders are also created for case classes. case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
在Spark SQL中有两种方式可以在DataFrame和RDD进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。
Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。case class可以嵌套组合成Sequences或者Array。这种RDD可以高效的转换为DataFrame并注册为表。
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
当case class不能提前定义好时,可以通过以下三步通过代码创建DataFrame
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
- // Create an RDD
- val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
DataFrame接口支持一系列的数据源,它可以按照普通RDD进行操作,也能被注册为临时表进行操作。注册临时表后可以使用SQL查询操作数据集,本章节介绍了常用加载保存数据的方法,同时给出了内部数据源的特殊操作。
未配置spark.sql.sources.default情况下,默认使用parquet数据源处理所有操作。
- val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
- df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
用户可以手动指定数据源加载的选项,对于数据源类型需要使用完整名称指定例如(org.apache.spark.sql.parquet),但对于内部类型可以使用简称,例如(json parquet jdbc等)。可以通过以上方法在不同DataFrame之间进行转换。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
除了需要将文件加载到DataFrame再执行sql以外,还可以直接执行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save通过SaveMode指定如何维护现有的数据。需要注意的是savemode未对数据加锁,因而不是源自操作。若使用overwrite模式时,原有数据会先被清空。
Scala/Java | Any Language | 含义 |
---|---|---|
SaveMode.ErrorIfExists (default) | "error" (default) | 当数据输出的位置已存在时,抛出此异常 |
SaveMode.Append | "append" | 当数据输出的位置已存在时,在文件后面追加 |
SaveMode.Overwrite | "overwrite" | 当数据输出的位置已存在时,重写 |
SaveMode.Ignore | "ignore" | 当数据输出的位置已存在时,不执行任何操作,与 CREATE IF NOT EXISTS类似 |
使用HiveContext时,DataFrame可以使用saveAsTable方法保存到持久化表中。与registerTempTable不同,saveASTable会为其真正创建数据区并创建指向该区域的指针放入HiveMetaStore中。在持有同一个metastore的连接期间,持久化的数据会一直存在,即使spark程序重启也不影响。可以通过SQLContext的table方法创建用于持久化表的DataFrame。
默认的saveASTable会创建“managed table”,其数据位置会被metastore维护,被管理的表数据会在表被删除时清空。
parquet是一种流行的列式存储格式。SparkSQL支持对parquet的读写以及schema和数据的维护。在写parquet文件时,为了兼容,所有列都会转换为nullable格式。
- // sqlContext from the previous example is used in this example.
- // This is used to implicitly convert an RDD to a DataFrame.
- import sqlContext.implicits._
-
- val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
-
- // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
表分区是Hive等系统的常用优化手段。在一个分区表中,数据经常分布在不同目录下,分区列的值相同的数据分布在同一目录中。目前支持对parquet文件进行自动推断分区。例如我们可以将之前的数据增加两列gender和country,并将两列作为分区列进行数据分区。
- path
- └── to
- └── table
- ├── gender=male
- │ ├── ...
- │ │
- │ ├── country=US
- │ │ └── data.parquet
- │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
将数据路径传给SQLContext后,可以自动推断DataFrame数据的分区信息。注意,数据的分区列是自动推断出来你的,目前分区列支持数值类型和string类型。若用户不希望自动推断分区列时,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自动推断,此时会使用string类型列进行分区。 分区类型会根据传入的路径进行推断,但用户可以配置数据源的basePath属性设置分析的路径。
parquet支持列增加等操作,当出现多个互相兼容的schemas时,parquet可以自动检测并合并这些文件的schema。由于schema 合并会消耗大量的资源,默认关闭该操作,可以通过以下方法打开:
- // sqlContext from the previous example is used in this example.
- // This is used to implicitly convert an RDD to a DataFrame.
- import sqlContext.implicits._
-
- // Create a simple DataFrame, stored into a partition directory
- val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
SparkSQL使用内部库而不是Hive SerDe,对Hive metasotre Parquet表进行读写,性能很好,可以通过spark.sql.hive.convertMetastoreParquet配置。
由于Hive和Parquet的元数据处理方式不同,如下所示
将Hive metastore Parquet table转换为Spark SQL parquet表时,遵从以下规则:
相同名称的字段的数据类型必须相同,nullable类型被忽略。由于融合的数据类型需要在parquet中有对应的类型,所以nullability类型需要处理。
融合后schema中包含了Hive元数据中定义的值
Spark SQL会缓存parquet元数据以便提高性能。若Hive metastore Parquet table转换被启用,则转换的表元数据也会被cache。若这些元数据被外部工具修改,则需要手动更新缓存元数据保持一致性。
- // sqlContext is an existing HiveContext
- sqlContext.refreshTable("my_table")
与parquet相关的配置参数如下所示
参数 | 默认值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 该选项让SparkSQL将string安装二进制数据按照字符串处理,以便兼容老系统 |
spark.sql.parquet.int96AsTimestamp | true | Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
spark.sql.parquet.cacheMetadata | true | 缓存Parquet的Schema元数据,提高查询静态数据效率 |
spark.sql.parquet.compression.codec | gzip | 设置Parquet文件的压缩编码方式,支持 uncompressed, snappy, gzip, lzo. |
spark.sql.parquet.filterPushdown | true | 启用过滤谓词下推优化,将过滤下推到抽取数据时,取得性能的提升 |
spark.sql.hive.convertMetastoreParquet | true | 若设为false,Spark SQL使用Hive SerDe支持对Parquet tables的操作. |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. |
spark.sql.parquet.mergeSchema | false | 是否开启Schema合并 |
SQLContext.read.josn()接口可以自动推断JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一个完整的JSON对象,不支持跨行的json对象。
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Spark SQL支持从Hive中读取数据,但由于Hive依赖过多,默认不支持Hive,需要在编译时添加-Phive -Phive-thriftserver
选项。由于用到Hive的序列化和反序列化需要保证Hive包在各个worker中都存在。
将hive-site.xml、core-site.xml和hdfs-site.xml放入conf目录下配置Hive环境。在Yarn集群上面运行时,需要确定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通过spark-submit的--jars和--file参数检查是否存在。
若通过Spark SQL操作Hive需要创建HiveContext,增加元数据功能及HiveQL支持。若没有部署Hive环境同样可以创建HiveContext。若没有在hive-site.xml中配置,会自动在当前目录创建metastore_db并在/user/hive/warehouse创建仓储目录,需要给hive对/user/hive/warehouse的写权限。
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
由于Spark SQL可以与不同版本的Hive Metastor(而不是Hive的版本)进行交互,只需要修改部分的配置信息,相关配置如下:
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore的版本信息,从0.12.0到1.2.1 |
spark.sql.hive.metastore.jars | builtin | 指定metastore的Jar包位置,builtin:该jar被打包到spark应用程序中;maven:使用maven远程仓储下载;类路径:需要包含hive所有的依赖包 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender。 |
spark.sql.hive.metastore.barrierPrefixes | (empty) | 使用逗号分隔的类名前缀列表,Spark SQL所访问的每个Hive版本都会被显式的reload这些类。 |
SparkSQL通过JdbcRDD实现对支持jdbc的数据库进行数据加载,将其作为DataFrame进行操作。JDBC加载的数据源不需要提供classTag。使用前需要将JDBC Driver包含在spark的classpath中。例如连接postgres需要如下设置
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
数据库中的表可以作为DataFrame或SparkSQL的临时表加载,支持以下的选项:
属性 | 描述 |
---|---|
url | JDBC连接URL |
dbtable | 需要读取的JDBC表。任何在From子句中的元素都可以,例如表或者子查询等。 |
partitionColumn, lowerBound, upperBound, numPartitions | 这些选项需要同时制定,他们制定了如何并发读取数据的同时进行分区。lowerBound, upperBound仅用于确定分区边界不用于过滤数据,所有数据都会被分区 |
fetchSize | 决定了每次数据取多少行 |
- val jdbcDF = sqlContext.read.format("jdbc").options(
- Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
对于一些负载可以通过内存缓存数据或者调整参数提高性能。
Spark SQL可以通过sqlContext.cacheTable("tableName") 或 dataFrame.cache()接口将RDD数据缓存到内存中。SparkSql可以近扫描需要的列并自动压缩、进行垃圾回收等。可以通过sqlContext.uncacheTable("Tablename")从内存中移除表。
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 若设为true,Spark SQL会基于列的统计数据自动选择压缩器进行数据压缩 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制列缓存的每批次的数据大小,数据越大则内存利用率及压缩比例越大,但OOM风险也越大 |
可以通过修改以下配置提高查询执行的性能,以后可能会弃用以下设置,而变为自动进行最优化配置。
属性 | 默认值 | 描述 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置做join操作时被广播变量的表的大小。当设为-1时禁用广播。目前只有Hive元数据支持统计信息,可以通过ANALYZE TABLE <tablename> COMPUTE STATISTICS 进行信息统计 |
spark.sql.tungsten.enabled | true | 若为true,或使用tungsten物理优化执行,显式地管理内存并动态生成表达式计算的字节码 |
spark.sql.shuffle.partitions | 200 | 配置shuffle操作时的分区数量 |
当使用JDBC/ODBC或者命令行进行交互时,SparkSQL可以作为分布式查询引擎执行。在这种模式下,Spark SQL的应用能够不写代码便执行查询。
这里的实现与HiveServer2类似,可以通过beeline测试Spakr或者Hive1.2.1的JDBC驱动。通过以下命令启动jdbc驱动
./sbin/start-thriftserver.sh
这脚本支持所有的spark-submit的参数,还支持--hiveconf指定特定的Hive属性。可以通过--help查看本脚本具体参数。默认server监听的端口是10000,可以覆盖一些环境变量:
- export HIVE_SERVER2_THRIFT_PORT=<listening-port>
- export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
- ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
或者修改系统属性
- ./sbin/start-thriftserver.sh \
- --hiveconf hive.server2.thrift.port=<listening-port> \
- --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
可以通过beeline测试Thrift JDBC/ODBC驱动
./bin/beeline
连接JDBC/ODBC驱动
beeline> !connect jdbc:hive2://localhost:10000
可能需要输入用户和密码进行安全验证,在非安全模式下,只需要本机的用户名和空密码即可。通过hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驱动同时支持通过HTTP端口发送thrift RPC消息。通过hive-site.xml中的配置开启HTTP模式作为系统属性:
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
beeline可以通过http模式连接JDBC/ODBC
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
CLI是在单点模式下执行Hive元数据服务和查询的命令工具,但它不能与Thrift JDBC驱动进行会话。
./bin/spark-sql
Spark SQL设计时考虑对Hive metastore,SerDes以及UDF的兼容。目前是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已经部署Hive的环境运行。
Spark SQL和DataFrame支持以下数据类型
所有的数据类型都在org.apache.spark.sql.types中。
NaN是not a number的简写,用于处理不符合浮点数格式的float和double数据,其语义需要特殊处理:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。