当前位置:   article > 正文

Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark版本:1.6.2

概览

Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化。可以通过SQL、DataFrames API、Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理。用户可以根据自己喜好,在不同API中选择合适的进行处理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中执行。

SQL

执行SQL语句的方法有多种:

  • 可以使用基础SQL语法或HiveQL语法在Spark SQL上执行查询,SparkSQL可以从已安装的Hive中读取数据。当使用其他编程语言时,结果集以DataFrame类型返回
  • 通过SQL命令行进行交互(spark-sql)
  • 可以通过JDBC/ODBC驱动进行交互

DataFrames

DataFrame是由分布式数据集合组成的一系列命名列,它与关系数据库的表类似,但有很多优化的地方。DataFrame支持多种数据源,包括结构化数据、Hive的表、外部数据库、RDDs等。DataFrame API支持scala 、java、Python和R语言。

Datasets

数据集接口在Spark1.6才加入,它可以使用Spark SQL的优化器对RDD操作进行优化。Dataset有JVM对象构建,并可以进行map、flatMap、filter等操作。Dataset API统一接口支持java和scala语言。

开始

程序入口: SQLContext

SQLContext是Spark SQL所有功能的入口,通过SparkContext可以创建该对象的实例:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
  • 1
  • 2
  • 3
  • 4
  • 5

除了SQLContext,还可以创建HiveContext对象,它包含更多的功能,例如HiveQL解析器支持更完善的语法、使用Hive用户自定义函数UDFs、从Hive表中读取数据等。HiveContext不依赖Hive是否安装,Spark默认支持HiveContext。从Spark1.3以后,推荐使用HiveContext,未来SQLContext会包含HiveContext中的功能。
可以通过spark.sql.dialect选项更改SQL解析器,这个参数可以再SQLContext的setConf方法设置,也可以通过SQL的ky=value语法设计。在SQLContext中dialect只支持一种简单的SQL解析器“sql”。HiveContext默认解析器是“hiveql”,同时支持“sql”,但一般推荐hiveql,因为它语法更全。

创建DataFrames

DataFrames的数据源多种多样,例如RDD、Hive table或者其他数据源。
下面代码从JSON文件创建了一个DataFrame

JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

DataFrame 操作

DataFrame支持结构化数据领域常用的数据操作,支持Scala、Java、Python和R语言,下面是一些基本操作示例:

JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

对于DataFrame的所有操作类型可以参考API文档。除了简单的列操作,DataFrame还支持字符串操作、日期算法、数据操作等等,可以参考DataFrame函数文档

编码实现SQL查询

SQLContext的sql方法支持运行sql语法的查询,并返回DataFrame类型的结果集:

SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
  • 1
  • 2

创建Datasets

Dataset与RDD类似,但它不适用java序列化也不适用Kryo,而是使用特定的Encoder作为序列化工具。Encoder可以对Spark对象进行序列化和反序列化,同时不需要反序列化在字节级别就能支持filtering、sorting和hashing等操作。

// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

RDD交互操作

在Spark SQL中有两种方式可以在DataFrame和RDD进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。

使用反射推断Schema

Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。case class可以嵌套组合成Sequences或者Array。这种RDD可以高效的转换为DataFrame并注册为表。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

编程指定schema

当case class不能提前定义好时,可以通过以下三步通过代码创建DataFrame

  • 将RDD转为包含row对象的RDD
  • 基于structType类型创建schema,与第一步创建的RDD相匹配
  • 通过SQLContext的createDataFrame方法对第一步的RDD应用schema
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

数据源

DataFrame接口支持一系列的数据源,它可以按照普通RDD进行操作,也能被注册为临时表进行操作。注册临时表后可以使用SQL查询操作数据集,本章节介绍了常用加载保存数据的方法,同时给出了内部数据源的特殊操作。

常规Load/Save函数

未配置spark.sql.sources.default情况下,默认使用parquet数据源处理所有操作。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • 1
  • 2

手动指定选项

用户可以手动指定数据源加载的选项,对于数据源类型需要使用完整名称指定例如(org.apache.spark.sql.parquet),但对于内部类型可以使用简称,例如(json parquet jdbc等)。可以通过以上方法在不同DataFrame之间进行转换。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • 1
  • 2

在文件上直接执行SQL

除了需要将文件加载到DataFrame再执行sql以外,还可以直接执行sql

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
  • 1

保存模式

Save通过SaveMode指定如何维护现有的数据。需要注意的是savemode未对数据加锁,因而不是源自操作。若使用overwrite模式时,原有数据会先被清空。

Scala/JavaAny Language含义
SaveMode.ErrorIfExists (default)“error” (default)当数据输出的位置已存在时,抛出此异常
SaveMode.Append“append”当数据输出的位置已存在时,在文件后面追加
SaveMode.Overwrite“overwrite”当数据输出的位置已存在时,重写
SaveMode.Ignore“ignore”当数据输出的位置已存在时,不执行任何操作,与 CREATE IF NOT EXISTS类似

保存到持久化表中

使用HiveContext时,DataFrame可以使用saveAsTable方法保存到持久化表中。与registerTempTable不同,saveASTable会为其真正创建数据区并创建指向该区域的指针放入HiveMetaStore中。在持有同一个metastore的连接期间,持久化的数据会一直存在,即使spark程序重启也不影响。可以通过SQLContext的table方法创建用于持久化表的DataFrame。
默认的saveASTable会创建“managed table”,其数据位置会被metastore维护,被管理的表数据会在表被删除时清空。

Parquet文件

parquet是一种流行的列式存储格式。SparkSQL支持对parquet的读写以及schema和数据的维护。在写parquet文件时,为了兼容,所有列都会转换为nullable格式。

编程实现数据加载

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

分区推断

表分区是Hive等系统的常用优化手段。在一个分区表中,数据经常分布在不同目录下,分区列的值相同的数据分布在同一目录中。目前支持对parquet文件进行自动推断分区。例如我们可以将之前的数据增加两列gender和country,并将两列作为分区列进行数据分区。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

将数据路径传给SQLContext后,可以自动推断DataFrame数据的分区信息。注意,数据的分区列是自动推断出来你的,目前分区列支持数值类型和string类型。若用户不希望自动推断分区列时,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自动推断,此时会使用string类型列进行分区。
分区类型会根据传入的路径进行推断,但用户可以配置数据源的basePath属性设置分析的路径。

Schema合并

parquet支持列增加等操作,当出现多个互相兼容的schemas时,parquet可以自动检测并合并这些文件的schema。由于schema 合并会消耗大量的资源,默认关闭该操作,可以通过以下方法打开:

  • 设置数据源mergeSchema属性为true
  • 设置SQL的选项spark.sql.parquet.mergeSchema为true
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

Hive metasotre Parquet表转化

SparkSQL使用内部库而不是Hive SerDe,对Hive metasotre Parquet表进行读写,性能很好,可以通过spark.sql.hive.convertMetastoreParquet配置。

Hive/Parquet Schema Reconciliation

由于Hive和Parquet的元数据处理方式不同,如下所示

  • Hive忽略大小写,而Parquet没有
  • Hive所有字段都是nullable,而parquet中null是有意义的值(避免理解错误,贴上原文:Hive considers all columns nullable, while nullability in Parquet is significant)

将Hive metastore Parquet table转换为Spark SQL parquet表时,遵从以下规则:

  • 相同名称的字段的数据类型必须相同,nullable类型被忽略。由于融合的数据类型需要在parquet中有对应的类型,所以nullability类型需要处理。
  • 融合后schema中包含了Hive元数据中定义的值

    • 任何只在Parquet schema中出现的字段被抛弃
    • 任何旨在Hive元数据中出现的字段作为nullable增加到融合后元数据中
元数据刷新

Spark SQL会缓存parquet元数据以便提高性能。若Hive metastore Parquet table转换被启用,则转换的表元数据也会被cache。若这些元数据被外部工具修改,则需要手动更新缓存元数据保持一致性。

// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
  • 1
  • 2

配置

与parquet相关的配置参数如下所示

参数默认值描述
spark.sql.parquet.binaryAsStringfalse该选项让SparkSQL将string安装二进制数据按照字符串处理,以便兼容老系统
spark.sql.parquet.int96AsTimestamptrueSome Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadatatrue缓存Parquet的Schema元数据,提高查询静态数据效率
spark.sql.parquet.compression.codecgzip设置Parquet文件的压缩编码方式,支持 uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdowntrue启用过滤谓词下推优化,将过滤下推到抽取数据时,取得性能的提升
spark.sql.hive.convertMetastoreParquettrue若设为false,Spark SQL使用Hive SerDe支持对Parquet tables的操作.
spark.sql.parquet.output.committer.classorg.apache.parquet.hadoop.ParquetOutputCommitterThe output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it’s also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.
spark.sql.parquet.mergeSchemafalse是否开启Schema合并

JSON数据集

SQLContext.read.josn()接口可以自动推断JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一个完整的JSON对象,不支持跨行的json对象。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

Hive 表

Spark SQL支持从Hive中读取数据,但由于Hive依赖过多,默认不支持Hive,需要在编译时添加-Phive -Phive-thriftserver选项。由于用到Hive的序列化和反序列化需要保证Hive包在各个worker中都存在。

将hive-site.xml、core-site.xml和hdfs-site.xml放入conf目录下配置Hive环境。在Yarn集群上面运行时,需要确定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通过spark-submit的–jars和–file参数检查是否存在。
若通过Spark SQL操作Hive需要创建HiveContext,增加元数据功能及HiveQL支持。若没有部署Hive环境同样可以创建HiveContext。若没有在hive-site.xml中配置,会自动在当前目录创建metastore_db并在/user/hive/warehouse创建仓储目录,需要给hive对/user/hive/warehouse的写权限。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

与不同版本Hive Metastore交互

由于Spark SQL可以与不同版本的Hive Metastor(而不是Hive的版本)进行交互,只需要修改部分的配置信息,相关配置如下:

属性默认值描述
spark.sql.hive.metastore.version1.2.1Hive metastore的版本信息,从0.12.0到1.2.1
spark.sql.hive.metastore.jarsbuiltin指定metastore的Jar包位置,builtin:该jar被打包到spark应用程序中;maven:使用maven远程仓储下载;类路径:需要包含hive所有的依赖包
spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender。
spark.sql.hive.metastore.barrierPrefixes(empty)使用逗号分隔的类名前缀列表,Spark SQL所访问的每个Hive版本都会被显式的reload这些类。

JDBC连接其他数据库

SparkSQL通过JdbcRDD实现对支持jdbc的数据库进行数据加载,将其作为DataFrame进行操作。JDBC加载的数据源不需要提供classTag。使用前需要将JDBC Driver包含在spark的classpath中。例如连接postgres需要如下设置

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
  • 1

数据库中的表可以作为DataFrame或SparkSQL的临时表加载,支持以下的选项:

属性描述
urlJDBC连接URL
dbtable需要读取的JDBC表。任何在From子句中的元素都可以,例如表或者子查询等。
partitionColumn, lowerBound, upperBound, numPartitions这些选项需要同时制定,他们制定了如何并发读取数据的同时进行分区。lowerBound, upperBound仅用于确定分区边界不用于过滤数据,所有数据都会被分区
fetchSize决定了每次数据取多少行
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
  • 1
  • 2
  • 3

疑难问题

  • JDBC的driver类需要在所有executor可见,因为Java的DriverManager会进行安全检查,忽略所有不可见的类。可以通过修改每个worker节点的compute_classpath.sh以便包含Jar包
  • 有些数据库例如H2的名称是大写,需要在SparkSQL中同样使用大写

性能调优

对于一些负载可以通过内存缓存数据或者调整参数提高性能。

内存缓存数据

Spark SQL可以通过sqlContext.cacheTable(“tableName”) 或 dataFrame.cache()接口将RDD数据缓存到内存中。SparkSql可以近扫描需要的列并自动压缩、进行垃圾回收等。可以通过sqlContext.uncacheTable(“Tablename”)从内存中移除表。

属性默认值描述
spark.sql.inMemoryColumnarStorage.compressedtrue若设为true,Spark SQL会基于列的统计数据自动选择压缩器进行数据压缩
spark.sql.inMemoryColumnarStorage.batchSize10000控制列缓存的每批次的数据大小,数据越大则内存利用率及压缩比例越大,但OOM风险也越大

其他配置信息

可以通过修改以下配置提高查询执行的性能,以后可能会弃用以下设置,而变为自动进行最优化配置。

属性默认值描述
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置做join操作时被广播变量的表的大小。当设为-1时禁用广播。目前只有Hive元数据支持统计信息,可以通过ANALYZE TABLE <tablename> COMPUTE STATISTICS进行信息统计
spark.sql.tungsten.enabledtrue若为true,或使用tungsten物理优化执行,显式地管理内存并动态生成表达式计算的字节码
spark.sql.shuffle.partitions200配置shuffle操作时的分区数量

分布式SQL引擎

当使用JDBC/ODBC或者命令行进行交互时,SparkSQL可以作为分布式查询引擎执行。在这种模式下,Spark SQL的应用能够不写代码便执行查询。

运行Thrift JDBC/ODBC驱动

这里的实现与HiveServer2类似,可以通过beeline测试Spakr或者Hive1.2.1的JDBC驱动。通过以下命令启动jdbc驱动

./sbin/start-thriftserver.sh
  • 1

这脚本支持所有的spark-submit的参数,还支持–hiveconf指定特定的Hive属性。可以通过–help查看本脚本具体参数。默认server监听的端口是10000,可以覆盖一些环境变量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...
  • 1
  • 2
  • 3
  • 4
  • 5

或者修改系统属性

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...
  • 1
  • 2
  • 3
  • 4
  • 5

可以通过beeline测试Thrift JDBC/ODBC驱动

./bin/beeline
  • 1

连接JDBC/ODBC驱动

beeline> !connect jdbc:hive2://localhost:10000
  • 1

可能需要输入用户和密码进行安全验证,在非安全模式下,只需要本机的用户名和空密码即可。通过hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驱动同时支持通过HTTP端口发送thrift RPC消息。通过hive-site.xml中的配置开启HTTP模式作为系统属性:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
  • 1
  • 2
  • 3

beeline可以通过http模式连接JDBC/ODBC

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
  • 1

通过Spark SQL CLI运行

CLI是在单点模式下执行Hive元数据服务和查询的命令工具,但它不能与Thrift JDBC驱动进行会话。

./bin/spark-sql
  • 1

与Apache Hive的兼容性

Spark SQL设计时考虑对Hive metastore,SerDes以及UDF的兼容。目前是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已经部署Hive的环境运行。

不支持的Hive功能

  • bucket表:butcket是Hive的哈希分区
  • Union功能
  • unique join
  • 字段统计信息
  • Hadoop归档文件
  • Hive的部分优化功能

参考

数据类型

Spark SQL和DataFrame支持以下数据类型

  • numeric类型
    • ByteType:单字节有符号整数
    • ShortType:2个字节的有符号整数
    • IntegerType:4字节整数
    • LongType:8字节整数
    • FloatType:4字节单精度浮点数
    • DoubleType:8字节双精度浮点数
    • DecimalType:任意精度有符号带小数的数值
  • String类型
  • Binary二进制类型
  • Boolean布尔类型
  • Datetime时间类型
    • TimestampType:时间戳类型
    • DateType:日期类型,只包含年月日
  • Complex复杂类型
    • ArrayType:数组类型
    • MapType:map类型
    • StructType:包含StructField序列的结构体

所有的数据类型都在org.apache.spark.sql.types中。

NaN含义

NaN是not a number的简写,用于处理不符合浮点数格式的float和double数据,其语义需要特殊处理:

  • NaN = NaN返回true
  • 聚集过程中,所有NaN会被放到同一分组中
  • NaN在join过程中被看成普通的值
  • NaN在升序排序时放到最后,被认为是最大的数值
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/589198
推荐阅读
相关标签
  

闽ICP备14008679号