应用举例及流程概述
- //初始化SparkSession
- val spark = SparkSession.builder().appName("example").master("local").getOrCreate()
- val person = spark.read.text("examples/src/main/resources/people.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim.toInt))
- person.registerAsTable("person")
- val teenagers = sql("SELECT name, age FROM person WHERE age >= 13 and age <= 19")
- teenagers.map(t => "name:" + t(0)).collect().foreach(println)
对Spark SQL系统,从SQL到Spark中RDD的执行需要经过两个大的阶段,分别是逻辑计划(LogicalPlan)和物理计划(PhysicalPlan)。
逻辑计划阶段会将用户所写的SQL语句转换成树型数据结构(逻辑算子树),SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。逻辑计划阶段生成的逻辑算子树并不会直接提交执行,仅作为中间阶段。最终逻辑算子树的生成过程经历3个子阶段:未解析的逻辑算子树(Unresolved LogicalPlan,仅仅是数据结构,不包含任何数据信息等);解析后的逻辑算子树(Analyzed LogicalPlan,节点中绑定各种信息);优化后的逻辑算子树(Optimized LogicalPlan,应用各种优化规则对一些低效的逻辑计划进行转换)。
逻辑计划将上一步逻辑计划阶段生成的逻辑算子树进一步转换生成物理算子树。物理算子树的节点会直接生成RDD或对RDD进行transformation操作(每个物理计划节点中都实现了对RDD进行转换的execute方法)。物理计划阶段也报含3个子阶段:首先,根据逻辑算子树,生成物理算子树的列表Iterator[PhysicalPlan](同样的逻辑算子树可能对应多个物理算子树);然后,从列表中按照一定的策略选取最优的物理算子树(SparkPlan);最后,对选取的物理算子树进行提交前的准备工作,例如:确保分区操作正确、物理算子树节点重用、执行代码生成等,得到准备后的物理算子树(Prepared SparkPlan)。之后,物理算子树生成的RDD执行action操作(如栗子中的show()),即可提交执行。
从 SQL语句的解析一直到提交之前,上述整个转换过程都在 Spark集群的 Driver端进行,不涉及分布式环境 。SparkSession 类的 sql方法调用 SessionState 中的各种对象 ,包括上述不同阶段对应的 SparkSqlParser类、 Analyzer类、 Optimizer类和 SparkPlanner类等 ,最后封装成一个 QueryExecution对象。 因此,在进行 SparkSQL开发时,可以很方便地将每一步生成的计划单独剥离出来分析 。
SQL通用执行过程
下图对简单Sql语句进行标注,SELECT表示是一种具体的操作,即查询数据;f1,f2,f3表示返回的结果;tableX是数据源;condition部分是查询条件。
由上面可以看出,SQL表达式中的顺序与常见的RDD处理逻辑其在表达的顺序上有差异。
SQL语句在分析执行过程中会经历下图所示的几个步骤:
1、语法解析(SqlParse)
语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。
这棵语法树的叶结点是我们查的两张表:Product和Inventory,可以看到实际上是将关联查询解析为两个子查询;再往上一层,是对两个子查询做Join,并按照匹配条件过滤,最后按照所要得聚合条件做聚合投影。
- 首先先通过SparkSqlParser生成语法树。
- Spark1.x版本使用的是scala原生的parser语法解析器,从2.x后改用的是第三方语法解析工具ANTLR4,只需要定制好语法,可以通过插件自动生成对应的解析代码。
- 然后通过AstBuilder配合antlr的visitor模式自主控制遍历Tree,将antlr里面的节点都替换成catalyst(优化器系统)里面的类型,所有的类型都继承了TreeNode特质,TreeNode又有子节点children: Seq[BaseType],便有了树的结构。
- 此过程解析完后形成的AST(抽象语法树)为 Unresolved LogicalPlan。
2、操作绑定(Analyzer)
- 上个步骤还只是把sql字符串通过antlr4拆分并由SparkSqlParser解析成各种LogicalPlan(TreeNode的子类),每个LogicalPlan究竟是什么意思还不知道。
- 接下来就需要通过Analyzer去把不确定的属性和关系,通过catalog和一些适配器方法确定下来,比如要从Catalog中解析出表名user,是临时表、临时view,hive table还是hive view,schema又是怎么样的等都需要确定下来。
- 将各种Rule应用到Tree之上的真正执行者都是RuleExecutor,包括后面的Optimizer 也继承了RuleExecutor, 解析的套路是递归的遍历,将新解析出来的LogicalPlan来替换原来的LogicalPlan。
- 此过程解析完后形成的AST为 Resolved LogicalPlan。若没有action操作,后续的优化,物理计划等都不会执行。
3、策略优化(Optimizer)
形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。
- 这个步骤就是根据大佬们多年的SQL优化经验来对SQL进行优化,比如谓词下推、列值裁剪、常量累加等。
- Optimizer 也继承了RuleExecutor,并定义了一批规则,和Analyzer 一样对输入的plan进行递归处理,此过程解析完后形成的AST为 Optimized LogicalPlan。
4、交付执行(SparkPlanner,prepareForExecution,execute)
SparkPlanner
通过优化后的LogicalPlan还只是逻辑上的,接下来需要通过SparkPlanner 将optimized LogicalPlan应用到一系列特定的Strategies上,即转化为可以直接操作真实数据的操作及数据和RDD的绑定等,此过程解析完后形成的AST为 PhysicalPlan。
prepareForExecution
此模块将 physical plan 转化为 executable physical plan,主要是插入 shuffle 操作和 internal row 的格式转换。
execute
最后调用SparkPlan的execute()执行计算。每个SparkPlan里面都有execute的实现,一般都会递归调用children的execute()方法,最后便会触发整个Tree的计算。
SQL在Spark中的实现
整个SQL部分的代码,其大致分类如下图所示
- SqlParser生成LogicPlan Tree
- Analyzer和Optimizer将各种rule作用于LogicalPlan Tree
- 最终优化生成的LogicalPlan生成Spark RDD
- 最后将生成的RDD交由Spark执行
阶段1:生成LogicalPlan
sql中引入了一种新的RDD,即SchemaRDD
且看SchemaRDD的构造函数:
- class SchemaRDD(
- @transient val sqlContext: SQLContext,
- @transient protected[spark] val logicalPlan: LogicalPlan)
构造函数中总共两入参一为SparkContext,另一个LogicalPlan。LogicalPlan又是如何生成的呢?
要回答这个问题,不得不回到整个问题的入口点sql函数,sql函数的定义如下:
- def sql(sqlText: String): SchemaRDD = {
- val result = new SchemaRDD(this, parseSql(sqlText))
-
- result.queryExecution.toRdd
- result
- }
parseSql(sqlText)负责生成LogicalPlan,parseSql就是SqlParser的一个实例。
SqlParser这一部分的代码要理解起来关键是要搞清楚StandardTokenParsers的调用规则。
由于apply函数可以不被显示调用,所以parseSql(sqlText)一句其实会隐式的调用SqlParser中的apply函数:
- def apply(input: String): LogicalPlan = {
- phrase(query)(new lexical.Scanner(input)) match {
- case Success(r, x) => r
- case x => sys.error(x.toString)
- }
- }
phrase(query)(new lexical.Scanner(input))表示如果输入的input字符串符合Lexical中定义的规则,则继续使用query处理。
看一下query的定义是什么:
- protected lazy val query: Parser[LogicalPlan] =
- select * (
- UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
- UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
- ) | insert
到了这里终于看到有LogicalPlan了,也就是说将普通的string转换成LogicalPlan在这里发生了。
query这段代码同时说明,在目前的spark sql中仅支持select和insert两种操作,至于delete, update暂不支持。
【注】详细参考4
阶段2:QueryExecution
第一阶段,将string转换成为logicalplan tree,第二阶段将各种规则作用于LogicalPlan。
在第一阶段中展示的代码,哪一句会触发优化规则呢?是sql函数中的"result.queryExecution.toRdd",此处的queryExecution就是QueryExecution。这里又涉及到scala的一个语法糖问题。QueryExecution是一个抽象类,但却看到了下述的代码
- protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
- new this.QueryExecution { val logical = plan }
怎么可以创建抽象类的实例?我的世界坍塌了,呵呵。不要紧张,这在scala的世界是允许的,只不过scala是隐含的创建了一个QueryExecution的子类并初始化而已,java里的原则还是对的,人家背后有猫腻。
Ok,轮到阶段2中最重要的角色QueryExecution闪亮登场了
- protected abstract class QueryExecution {
- def logical: LogicalPlan
-
- lazy val analyzed = analyzer(logical)
- lazy val optimizedPlan = optimizer(analyzed)
- lazy val sparkPlan = planner(optimizedPlan).next()
- lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
-
- /** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[Row] = executedPlan.execute()
-
- protected def stringOrError[A](f: => A): String =
- try f.toString catch { case e: Throwable => e.toString }
-
- def simpleString: String = stringOrError(executedPlan)
-
- override def toString: String =
- s"""== Logical Plan ==
- |${stringOrError(analyzed)}
- |== Optimized Logical Plan ==
- |${stringOrError(optimizedPlan)}
- |== Physical Plan ==
- |${stringOrError(executedPlan)}
- """.stripMargin.trim
-
- def debugExec() = DebugQuery(executedPlan).execute().collect()
- }
三大步
- lazy val analyzed = analyzer(logical)
- lazy val optimizedPlan = optimizer(analyzed)
- lazy val sparkPlan = planner(optimizedPlan).next()
无论analyzer还是optimizer,它们都是RuleExecutor的子类,
RuleExecutor的默认处理函数是apply,对所有的子类都是一样的,RuleExecutor的apply函数定义如下,
- def apply(plan: TreeType): TreeType = {
- var curPlan = plan
-
- batches.foreach { batch =>
- val batchStartPlan = curPlan
- var iteration = 1
- var lastPlan = curPlan
- var continue = true
-
- // Run until fix point (or the max number of iterations as specified in the strategy.
- while (continue) {
- curPlan = batch.rules.foldLeft(curPlan) {
- case (plan, rule) =>
- val result = rule(plan)
- if (!result.fastEquals(plan)) {
- logger.trace(
- s"""
- |=== Applying Rule ${rule.ruleName} ===
- |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
- """.stripMargin)
- }
-
- result
- }
- iteration += 1
- if (iteration > batch.strategy.maxIterations) {
- logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")
- continue = false
- }
-
- if (curPlan.fastEquals(lastPlan)) {
- logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")
- continue = false
- }
- lastPlan = curPlan
- }
-
- if (!batchStartPlan.fastEquals(curPlan)) {
- logger.debug(
- s"""
- |=== Result of Batch ${batch.name} ===
- |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
- """.stripMargin)
- } else {
- logger.trace(s"Batch ${batch.name} has no effect.")
- }
- }
-
- curPlan
- }
对于RuleExecutor的子类来说,最主要的是定义自己的batches,来看analyzer中的batches是如何定义的
- val batches: Seq[Batch] = Seq(
- Batch("MultiInstanceRelations", Once,
- NewRelationInstances),
- Batch("CaseInsensitiveAttributeReferences", Once,
- (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
- Batch("Resolution", fixedPoint,
- ResolveReferences ::
- ResolveRelations ::
- NewRelationInstances ::
- ImplicitGenerate ::
- StarExpansion ::
- ResolveFunctions ::
- GlobalAggregates ::
- typeCoercionRules :_*),
- Batch("AnalysisOperators", fixedPoint,
- EliminateAnalysisOperators)
- )
batch中定义了一系列的规则,这里再次出现语法糖问题。“如何理解::这个操作符”? ::表示cons的意思,即连接生成一个list.
Batch构造函数中需要指定一系列的Rule,像ResolveReferences就是Rule,有关Rule的代码就不一一分析了。
阶段3:LogicalPlan转换成Physical Plan
在阶段3最主要的代码就两行
- lazy val executePlan: SparkPlan = prepareForExecution(sparkPlan)
- lazy val toRdd: RDD[Row] = executedPlan.execute()
与LogicalPlan不同,SparkPlan最重要的区别就是有execute函数
针对Sparkplan的具体实现,又要分成UnaryNode, LeafNode和BinaryNode,简要来说即单目运算符操作,叶子结点,双目运算符操作。每个子类的具体实现可以自行参考源码。
阶段4: 触发RDD执行
RDD被触发真正执行的过程在看了前面几篇文章之后想来难不住你来,所有的所有都在这一行代码。
teenagers.map(p => "name:"+p(0)).foreach(println)
【注】其实感觉有很多细节不明白比如转换为Physical Plan,以及Physical Plan的具体形式,之后看吧。
参考资料: