当前位置:   article > 正文

Spark | SparkSQL架构_producedattributes

producedattributes

 

目录

SparkSQL

DataFrame API

DataFrame & DataSet & RDD 三者区别

SparkSQL 组成

SparkSQL Catalyst Optimizer

Tree

TreeNode

QueryPlan

Expression

Rule

RuleExecutor

Catalyst大致流程

References


spark.version = 2.4.4

站在上帝角度学习下SparkSQL架构相关内容

SparkSQL

SparkSQL 是一个用于处理结构化数据的Spark组件,结构化数据既可以来自外部结构化数据源,也可以通过向已有RDD增加Schema方式得到。

DataFrame API

通俗点理解也可以说SparkSQL主要完成SQL解析相关工作,将一个SQL语句解析为DataFrame或者RDD任务。

如下图可以看到实际上Spark的Dataframe API底层也是基于Spark的RDD。
 

DataFrame 是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。它也支持各种关系操作优化执行,与RDD不同的是,DataFrame带有Schema元数据,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。由于无法知道RDD数据集内部数据结构类型,Spark作业执行只能在调度阶段层面进行简单通用的优化,而对于DataFrame带有数据集内部的结构,可以根据这些信息进行针对性的优化,最终实现优化运行效率。通俗的说SparkSQL能自动优化任务执行过程。


DataFrame & DataSet & RDD 三者区别

RDD 是一个不可变的分布式对象集合,是 Spark 对数据的核心抽象。每个 RDD 都被分为多个分区,每个分区就是一个数据集片段,这些分区运行在集群中的不同节点上。RDD 提供了一种高度受限的内存共享模型,即 RDD 是只读的,只能基于稳定的物理储存中的数据集来创建 RDD 或对已有的 RDD 进行转换操作来得到新的 RDD。

DataFrame 是用在 Spark SQL 中的一种存放 Row 对象的特殊 RDD,是 Spark SQL中的数据抽象,它是一种结构化的数据集,每一条数据都由几个命名字段组成(类似与传统数据库中的表),DataFrame 能够利用结构信息更加高效的存储数据。同时, SparkSQL 为 DataFrame 提供了非常好用的 API,而且还能注册成表使用 SQL 来操作。DataFrame 可以从外部数据源创建,也可以从查询结果或已有的 RDD 创建。

Dataset 是 Spark1.6 开始提供的 API,是 Spark SQL 最新的数据抽象。它把 RDD 的优势(强类型,可以使用 lambda 表达式函数)和 Spark SQL 的优化执行引擎结合到了一起。Dataset 可以从 JVM 对象创建得到,而且可以像 DataFrame 一样使用 API 或 sql 来操作。

三者的关系:RDD + Schema = DataFrame = Dataset[Row]
注:RDD 是 Spark 的核心,DataFrame/Dataset 是 Spark SQL 的核心,RDD 不支持 SQL 操作。

接下来通过简单示例&源码看下三者怎样转换的?

  1. val df:DataFrame = spark.sql(sqlText)
  2. df.printSchema()
  3. val rdd = df.rdd.map(...)
  4. import spark.implicits._
  5. val a = rdd.toDF()
  1. // 首先执行spark.sql()将返回一个DataFrame
  2. def sql(sqlText: String): DataFrame = {
  3. Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  4. }
  5. // 将DataFrame转换为RDD
  6. lazy val rdd: RDD[T] = {
  7. val objectType = exprEnc.deserializer.dataType
  8. rddQueryExecution.toRdd.mapPartitions { rows =>
  9. rows.map(_.get(0, objectType).asInstanceOf[T])
  10. }
  11. }
  12. // 将RDD转换为DataFrame
  13. def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
  14. package object sql {
  15. /**
  16. * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
  17. * with the query planner and is not designed to be stable across spark releases. Developers
  18. * writing libraries should instead consider using the stable APIs provided in
  19. * [[org.apache.spark.sql.sources]]
  20. */
  21. @DeveloperApi
  22. @InterfaceStability.Unstable
  23. type Strategy = SparkStrategy
  24. type DataFrame = Dataset[Row]
  25. }

Spa

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/656936
推荐阅读
相关标签
  

闽ICP备14008679号