赞
踩
Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。
Spark SQL 的前身是Shark。Shark是基于 Hive 所开发的工具,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在 Spark 引擎上。下图是 Hive和Shark的区别。
但是,Spark在不断发展,但又太过依赖Hive,有了不少限制。于是,Spark团队提出了Spark SQL。
SparkSQL抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
后来,Shark停止开发,发展出了两个分支:SparkSQL 和 Hive on Spark。
其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎。
Spark SQL 为了简化 RDD 的开发,提高开发效率,在RDD之上提供了 2 个编程抽象,分别是DataFrame和DataSet。
DataFrame是一种以RDD为基础的分布式数据集。DataFrame和RDD的主要区别就是,RDD本身并不具备数据的结构信息,而DataFrame类似于一个二维表格,带有schema元信息,它的二维数据每一条都具有列名称和类型。
简单来说,RDD只有每条数据的类型信息,而DataFrame的每条数据有列的结构划分以及类型信息。如下图所示:
同样,DataFrame也是懒执行的,但是它在查询时通过Spark catalyst optimiser进行了优化,查询性能比RDD更高。DataFrame 也支持嵌套数据类型(struct、array 和 map)。
DataSet也是一种分布式数据集,是Spark1.6新增的新抽象,为DataFrame的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)。
DataSet是一种强类型的分布式数据集。什么是强类型?指的是DataSet并不只数据的列有类型和结构,DataSet的每一行数据也形成一个结构,并且具有数据类型。最简单的例子,就是某个样例对象作为数据行的类型。比如可以有DataSet[Car],DataSet[Person],每个Car或者Person有它的属性,DataSet也同样会记录它们的列属性。
DataFrame其实是一种特殊的DataSet,它每行数据的行信息其实是Row对象,即DataFrame=DataSet[Row]。
简单来说,RDD只会记录一条条相同类型的数据,DataFrame会记录结构化数据中的列信息,DataSet会记录结构化数据中的行与列的信息。
在Spark Core中,如果我们要使用Spark的程序,首先就是创建一个SparkContext。Spark SQL 其实可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
在老版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫 SQLContext,用于 Spark自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
在新版本中,SparkSession 是 Spark 的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合,所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。
// 构造spark
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
//json形式
val df = spark.read.json("data/input.json")
//jdbc形式 (url, table, property)
val df = spark.read.jdbc("jdbc:mysql://localhost:3306/spark","dict",props)
df.createOrReplaceTempView("dict")
val dictDF = spark.sql("SELECT * FROM dict")
dictDF.show()
/*
结果:
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
*/
普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people。
spark.sql("SELECT * FROM global_temp.people").show()
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。它的好处是不需要使用createOrReplaceTempView这类函数创建临时视图。
df.select("username").show()
//涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名。
df.select($"username",$"age" + 1).show
df.select('username, 'age + 1).show()
df.filter($"age">30).show()
df.groupBy("age").count.show()
在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._。
这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持val 修饰的对象的引入。
RDD转DataFrame:
val idRDD = sc.makeRDD(List(1,2,3,4)) idRDD.toDF("id").show() /* +---+ | id| +---+ | 1| | 2| | 3| | 4| +---+ */ //实际开发中,一般通过样例类将 RDD 转换为 DataFrame。 case class User(name:String, age:Int) val idRDD = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)) idRDD.toDF().show() /* +--------+---+ | name|age| +--------+---+ |zhangsan| 30| | lisi| 40| +--------+---+ */
DataFrame转RDD:
val rdd = df.rdd()
//此时RDD类型:RDD[org.apache.spark.sql.Row]
RDD转DataSet:
//利用样例类转换
case class Person(name: String, age: Long)
val caseClassDS = sc.makeRDD(Person("zhangsan",2)).toDS()
//caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
caseClassDS.show()
/*
+---------+---+
| name|age|
+---------+---+
| zhangsan| 2|
+---------+---+
*/
DataSet转RDD:
val rdd = res11.rdd()
DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。
DataFrame转DataSet:
case class User(name:String, age:Int)
val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
val ds = df.as[User]
DataSet转DataFrame:
val df = ds.toDF()
参考:尚硅谷
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。