当前位置:   article > 正文

《SparkSQL内核剖析》【物理计划篇】_框架里的物理计划是什么

框架里的物理计划是什么

一、概览

物理计划是将Spark SQL生成的逻辑算子树映射成物理算子树,并将逻辑计划的信息映射到Spark Core模型中的RDD、Transformation、Action的过程。生成物理计划后,一条SQL语句就变成了可以执行的Spark任务。

物理计划的定义在org.apache.spark.sql.catalyst.plans.QueryPlan中,从定义可以看出,物理计划是一个抽象语法树,树节点的主要组成部分包括:子树节点、出现过的表达式、出现过的子查询;

abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] = {
	// 子树
	override protected def innerChildren: Seq[QueryPlan[_]] = subqueries 
	// 表达式
	final def expressions: Seq[Expression] = {}
	// 子查询
	def subqueries: Seq[PlanType] = {}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

一个样例物理计划如下所示:

 == Physical Plan ==
   *(5) SortMergeJoin [x#3L], [y#9L], Inner
   :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#3L, 200)
   :     +- *(1) Project [(id#0L % 2) AS x#3L]
   :        +- *(1) Filter isnotnull((id#0L % 2))
   :           +- *(1) Range (0, 5, step=1, splits=8)
   +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(y#9L, 200)
         +- *(3) Project [(id#6L % 2) AS y#9L]
            +- *(3) Filter isnotnull((id#6L % 2))
               +- *(3) Range (0, 5, step=1, splits=8)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

上面打印出的是一个物理计划的treeString表示,其中(5) SortMergeJoin(3) Range 这些树节点是Spark查询的表达式,表达式开头的数字(5)(3)代表DFS遍历物理计划表达式树的顺序(见org.apache.spark.sql.catalyst.trees.TreeNode函数generateTreeString),简化版的表达式遍历顺序如下:

== Physical Plan ==
*(5) SortMergeJoin
   :- *(2) Sort 
   :     +- *(1) Project 
   :        +- *(1) Filter
   :           +- *(1) Range 
   +- *(4) Sort 
         +- *(3) Project 
            +- *(3) Filter
               +- *(3) Range
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

将逻辑计划转换成物理计划的抽象类叫做QueryPlanner,它定义了转换的框架:首先得到一系列候选物理计划、然后自底向上替换算子树节点的物理计划、最后化简物理计划。

QueryPlanner源代码

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
   
  def strategies: Seq[GenericStrategy[PhysicalPlan]]

  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
   
    val candidates = strategies.iterator.flatMap(_(plan))
    val plans = candidates.flatMap {
    candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
   
        Iterator(candidate)
      } else {
   
        placeholders.iterator.foldLeft(Iterator(candidate)) {
   
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/588899
推荐阅读
相关标签
  

闽ICP备14008679号