当前位置:   article > 正文

Spark SQL Catalyst源码分析之Optimizer_spark case class literal (value: any, datatype: da

spark case class literal (value: any, datatype: datatype)

 /** Spark SQL源码分析系列文章*/

  前几篇文章介绍了Spark SQL的Catalyst的核心运行流程SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识。

  Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理执行计划的前置。如下图:

  

一、Optimizer

  Optimizer这个类是在catalyst里的optimizer包下的唯一一个类,Optimizer的工作方式其实类似Analyzer,因为它们都继承自RuleExecutor[LogicalPlan],都是执行一系列的Batch操作:

  

  Optimizer里的batches包含了3类优化策略:1、Combine Limits 合并Limits  2、ConstantFolding 常量合并 3、Filter Pushdown 过滤器下推,每个Batch里定义的优化伴随对象都定义在Optimizer里了:

 

 
  1. object Optimizer extends RuleExecutor[LogicalPlan] {

  2. val batches =

  3. Batch("Combine Limits", FixedPoint(100),

  4. CombineLimits) ::

  5. Batch("ConstantFolding", FixedPoint(100),

  6. NullPropagation,

  7. ConstantFolding,

  8. BooleanSimplification,

  9. SimplifyFilters,

  10. SimplifyCasts,

  11. SimplifyCaseConversionExpressions) ::

  12. Batch("Filter Pushdown", FixedPoint(100),

  13. CombineFilters,

  14. PushPredicateThroughProject,

  15. PushPredicateThroughJoin,

  16. ColumnPruning) :: Nil

  17. }

  另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出:

  如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。

 
  1. case class Aggregate(

  2. groupingExpressions: Seq[Expression],

  3. aggregateExpressions: Seq[NamedExpression],

  4. child: LogicalPlan)

  5. extends UnaryNode {

  6.  
  7. override def output = aggregateExpressions.map(_.toAttribute)

  8. override def references =

  9. (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet

  10. }

 

  

二、优化策略详解

  Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是先序遍历(pre-order),而对Expression transfrom的时候是后序遍历(post-order):

2.1、Batch: Combine Limits

如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。

 

 
  1. /**

  2. * Combines two adjacent [[Limit]] operators into one, merging the

  3. * expressions into one single expression.

  4. */

  5. object CombineLimits extends Rule[LogicalPlan] {

  6. def apply(plan: LogicalPlan): LogicalPlan = plan transform {

  7. case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll为当前Limit,le为其expression, nl是ll的grandChild,ne是nl的expression

  8. Limit(If(LessThan(ne, le), ne, le), grandChild) //expression比较,如果ne比le小则表达式为ne,否则为le

  9. }

  10. }

给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ") 

 
  1. scala> query.queryExecution.analyzed

  2. res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =

  3. Limit 10

  4. Project [key#13,value#14]

  5. Limit 100

  6. Project [key#13,value#14]

  7. MetastoreRelation default, temp_shengli, None

 

子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/679797
推荐阅读
相关标签
  

闽ICP备14008679号