赞
踩
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame也是懒执行的,但性能上比RDD要高,主要原因:
优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。
DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。
1)是DataFrame API的一个扩展,是SparkSQL最新的数据抽象;
2)用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
3)用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
4)DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
- 1)查看Spark数据源进行创建的文件格式
- scala> spark.read.
- csv format jdbc json load option options orc parquet schema table text textFile
- (2)读取json文件创建DataFrame
- scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- (3)展示结果
- scala> df.show
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +----+-------+
- 1)创建一个DataFrame
- scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 2)对DataFrame创建一个临时表
- scala> df.createOrReplaceTempView("people")
- 3)通过SQL语句实现查询全表
- scala> val sqlDF = spark.sql("SELECT * FROM people")
- sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 4)结果展示
- scala> sqlDF.show
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +----+-------+
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
- 5)对于DataFrame创建一个全局表
- scala> df.createGlobalTempView("people")
- 6)通过SQL语句实现查询全表
- scala> spark.sql("SELECT * FROM global_temp.people").show()
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
-
- scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +----+-------+
- 1)创建一个DataFrame
- scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 2)查看DataFrame的Schema信息
- scala> df.printSchema
- root
- |-- age: long (nullable = true)
- |-- name: string (nullable = true)
- 3)只查看”name”列数据
- scala> df.select("name").show()
- +-------+
- | name|
- +-------+
- |Michael|
- | Andy|
- | Justin|
- +-------+
- 4)查看”name”列数据以及”age+1”数据
- scala> df.select($"name", $"age" + 1).show()
- +-------+---------+
- | name|(age + 1)|
- +-------+---------+
- |Michael| null|
- | Andy| 31|
- | Justin| 20|
- +-------+---------+
- 5)查看”age”大于”21”的数据
- scala> df.filter($"age" > 21).show()
- +---+----+
- |age|name|
- +---+----+
- | 30|Andy|
- +---+----+
- 6)按照”age”分组,查看数据条数
- scala> df.groupBy("age").count().show()
- +----+-----+
- | age|count|
- +----+-----+
- | 19| 1|
- |null| 1|
- | 30| 1|
- +----+-----+
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的名称】
- 前置条件:导入隐式转换并创建一个RDD
- scala> import spark.implicits._
- import spark.implicits._
-
- scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
- peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
- 1)通过手动确定转换
- scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
- res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
- 2)通过反射确定(需要用到样例类)
- (1)创建一个样例类
- scala> case class People(name:String, age:Int)
- (2)根据样例类将RDD转换为DataFrame
- scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
- res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
- 3)通过编程的方式(了解)
- (1)导入所需的类型
- scala> import org.apache.spark.sql.types._
- import org.apache.spark.sql.types._
- (2)创建Schema
- scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
- structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
- (3)导入所需的类型
- scala> import org.apache.spark.sql.Row
- import org.apache.spark.sql.Row
- (4)根据给定的类型创建二元组RDD
- scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
- data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
- (5)根据数据及给定的schema创建DataFrame
- scala> val dataFrame = spark.createDataFrame(data, structType)
- dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
- 直接调用rdd即可
- 1)创建一个DataFrame
- scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 2)将DataFrame转换为RDD
- scala> val dfToRDD = df.rdd
- dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
- 3)打印RDD
- scala> dfToRDD.collect
- res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
- 1)创建一个样例类
- scala> case class Person(name: String, age: Long)
- defined class Person
- 2)创建DataSet
- scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
- caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。
- 1)创建一个RDD
- scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
- peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
- 2)创建一个样例类
- scala> case class Person(name: String, age: Long)
- defined class Person
- 3)将RDD转化为DataSet
- scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
- res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
- 调用rdd方法即可。
- 1)创建一个DataSet
- scala> val DS = Seq(Person("Andy", 32)).toDS()
- DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
- 2)将DataSet转换为RDD
- scala> DS.rdd
- res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28
- 1)创建一个DateFrame
- scala> val df = spark.read.json("examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 2)创建一个样例类
- scala> case class Person(name: String, age: Long)
- defined class Person
- 3)将DateFrame转化为DataSet
- scala> df.as[Person]
- res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
- 1)创建一个样例类
- scala> case class Person(name: String, age: Long)
- defined class Person
- 2)创建DataSet
- scala> val ds = Seq(Person("Andy", 32)).toDS()
- ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
- 3)将DataSet转化为DataFrame
- scala> val df = ds.toDF
- df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
- 4)展示
- scala> df.show
- +----+---+
- |name|age|
- +----+---+
- |Andy| 32|
- +----+---+
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口。
(1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
(3)三者有许多共同的函数,如filter,排序等;
(4)在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
|
- 1)添加依赖
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.1.1</version>
- </dependency>
- 2)代码实现
- package com.atguigu.test
-
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.{SparkConf, SparkContext}
-
- object HelloWorld {
-
- def main(args: Array[String]) {
-
- //创建SparkConf()并设置App名称
- val spark = SparkSession
- .builder()
- .master("local[*]")
- .appName("HelloWorld")
- //.config("spark.some.config.option", "some-value")
- .getOrCreate()
-
- //导入隐式转换
- import spark.implicits._
-
- //读取本地文件,创建DataFrame
- val df = spark.read.json("examples/src/main/resources/people.json")
-
- //打印
- df.show()
-
- //DSL风格:查询年龄在21岁以上的
- df.filter($"age" > 21).show()
-
- //创建临时表
- df.createOrReplaceTempView("persons")
-
- //SQL风格:查询年龄在21岁以上的
- spark.sql("SELECT * FROM persons where age > 21").show()
-
- //关闭连接
- spark.stop()
-
- }
-
- }
在Shell窗口中可以通过spark.udf功能用户可以自定义函数。
- 1)创建DataFrame
- scala> val df = spark.read.json("examples/src/main/resources/people.json")
- df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 2)打印数据
- scala> df.show()
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +----+-------+
- 3)注册UDF,功能为在数据前添加字符串
- scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
- res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
- 4)创建临时表
- scala> df.createOrReplaceTempView("people")
- 5)应用UDF
- scala> spark.sql("Select addName(name), age from people").show()
- +-----------------+----+
- |UDF:addName(name)| age|
- +-----------------+----+
- | Name:Michael|null|
- | Name:Andy| 30|
- | Name:Justin| 19|
- +-----------------+----+
- 1)read直接加载数据
- scala> spark.read.
- csv jdbc json orc parquet textFile… …
- 注意:加载数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
- 2)format指定加载数据类型
- scala> spark.read.format("…")[.option("…")].load("…")
用法详解:
(1)format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
- 1)write直接保存数据
- scala> spark.write.
- csv jdbc json orc parquet textFile… …
- 注意:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
- 2)format指定保存数据类型
- scala> spark.write.format("…")[.option("…")].save("…")
2)format指定保存数据类型
scala> spark.write.format("…")[.option("…")].save("…")
用法详解:
(1)format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
3)文件保存选项
可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。SaveMode是一个枚举类,其中的常量包括:
(1)Append:当保存路径或者表已存在时,追加内容;
(2)Overwrite: 当保存路径或者表已存在时,覆写内容;
(3)ErrorIfExists:当保存路径或者表已存在时,报错;
(4)Ignore:当保存路径或者表已存在时,忽略当前的保存操作。
使用详解:
df.write.mode(SaveMode.Append).save("… …")
Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载一个 一个JSON 文件。
注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式如下:
- {"name":"Michael"}
- {"name":"Andy", "age":30}
- {"name":"Justin", "age":19}
- 1)导入隐式转换
- import spark.implicits._
- 2)加载JSON文件
- val path = "examples/src/main/resources/people.json"
- val peopleDF = spark.read.json(path)
- 3)创建临时表
- peopleDF.createOrReplaceTempView("people")
- 4)数据查询
- val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
- teenagerNamesDF.show()
- +------+
- | name|
- +------+
- |Justin|
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
可在启动shell时指定相关的数据库驱动路径,或者将相关的数据库驱动放到spark的类路径下。
两种方式:1)使用read.jdbc加载数据 2)使用format形式加载数据
- 1)启动spark-shell
- bin/spark-shell --master spark://hadoop102:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
- 2)定义JDBC相关参数配置信息
- val prop = new java.util.Properties()
- prop.put("user", "root")
- prop.put("password", "123456")
- 3)使用read.jdbc加载数据
- val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd", "emp", prop)
- 4)使用format形式加载数据
- val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " emp").option("user", "root").option("password", "123456").load()
- 5)使用write.jdbc保存数据
- jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", prop)
- 6)使用format形式保存数据
- jdbcDF.write
- .format("jdbc")
- .option("url", "jdbc:mysql://hadoop102:3306/rdd")
- .option("dbtable", "rddtable2")
- .option("user", "root")
- .option("password", "123456")
- .save()
Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HQL)等。spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
如果要使用内嵌的Hive,什么都不用做,直接用就可以了。
可以修改其数据仓库地址,参数为:--conf spark.sql.warehouse.dir=./wear
注意:如果你使用的是内部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用HDFS作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要使用HDFS,则需要将metastore删除,重启集群。
如果想连接外部已经部署好的Hive,需要通过以下几个步骤。
1)将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下。
2)打开spark shell,注意带上访问Hive元数据库的JDBC客户端
bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar
注意:启动时指定JDBC jar包路径很麻烦,我们可以选择将JDBC的驱动包放置在spark的lib目录下,一劳永逸。
Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口。
/bin/spark-sql
- 1)添加依赖
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.11</artifactId>
- <version>2.1.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>1.2.1</version>
- </dependency>
- 2)代码实现
- //创建SparkSession
- val spark: SparkSession = SparkSession
- .builder()
- .enableHiveSupport()
- .master("local[*]")
- .appName("SQLTest")
- .getOrCreate()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。