赞
踩
物理计划是将Spark SQL生成的逻辑算子树映射成物理算子树,并将逻辑计划的信息映射到Spark Core模型中的RDD、Transformation、Action的过程。生成物理计划后,一条SQL语句就变成了可以执行的Spark任务。
物理计划的定义在org.apache.spark.sql.catalyst.plans.QueryPlan
中,从定义可以看出,物理计划是一个抽象语法树,树节点的主要组成部分包括:子树节点、出现过的表达式、出现过的子查询;
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] = {
// 子树
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
// 表达式
final def expressions: Seq[Expression] = {}
// 子查询
def subqueries: Seq[PlanType] = {}
}
一个样例物理计划如下所示:
== Physical Plan ==
*(5) SortMergeJoin [x#3L], [y#9L], Inner
:- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(x#3L, 200)
: +- *(1) Project [(id#0L % 2) AS x#3L]
: +- *(1) Filter isnotnull((id#0L % 2))
: +- *(1) Range (0, 5, step=1, splits=8)
+- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(y#9L, 200)
+- *(3) Project [(id#6L % 2) AS y#9L]
+- *(3) Filter isnotnull((id#6L % 2))
+- *(3) Range (0, 5, step=1, splits=8)
上面打印出的是一个物理计划的treeString表示,其中(5) SortMergeJoin
和(3) Range
这些树节点是Spark查询的表达式,表达式开头的数字(5)
和(3)
代表DFS遍历物理计划表达式树的顺序(见org.apache.spark.sql.catalyst.trees.TreeNode
函数generateTreeString
),简化版的表达式遍历顺序如下:
== Physical Plan ==
*(5) SortMergeJoin
:- *(2) Sort
: +- *(1) Project
: +- *(1) Filter
: +- *(1) Range
+- *(4) Sort
+- *(3) Project
+- *(3) Filter
+- *(3) Range
将逻辑计划转换成物理计划的抽象类叫做QueryPlanner,它定义了转换的框架:首先得到一系列候选物理计划、然后自底向上替换算子树节点的物理计划、最后化简物理计划。
QueryPlanner源代码
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def strategies: Seq[GenericStrategy[PhysicalPlan]] def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { val candidates = strategies.iterator.flatMap(_(plan)) val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { Iterator(candidate) } else { placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。