当前位置:   article > 正文

spark sql 原理以及解析_sparksql提交的原理

sparksql提交的原理

前三节,我们从spark底层的RDD角度去剖析了整个Spark 程序的执行逻辑,以及一些原理性的东西,当然我们在使用的时候要是直接使用Spark Core的编程语法也可以,在此基础上Spark 还提供了基于SQL的编程语法,也就是Spark-Sql,本文章从以下几个方面去分析Spark-Sql

  1.  Spark Sql  简介
  2.  Spark Sql 执行原理( Catalyst整体执行流程介绍)

 1. Spark Sql  简介

Spark sql是spark内部最核心,也是社区最活跃的组件。Spark SQL支持在Spark中执行SQL,或者HiveQL的关系查询表达式。列式存储的类RDD(DataSet/DataFrame)数据类型以及对sql语句的支持使它更容易上手,同时,它对数据的抽取、清洗的特性,使它广泛的用于etl,甚至是机器学习领域。因此,saprk sql较其他spark组件,获得了更多的使用者。

 2.Spark Sql 执行原理 (Catalyst整体执行流程介绍)

                如下图:

     

    无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作,主要流程包括:

  • Parser 解析 SQL,生成 Unresolved Logical Plan(语法树,此棵树称为Unresolved Logical Plan)
  • 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan(逻辑计划)
  • Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan(优化后的逻辑计划)
  • Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan(物理计划)
  • CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan(优化后的物理计划)
  • Spark 以 DAG 的方法执行上述 Physical Plan
  • 在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率

下面我们结合源码来分析下整个sql的执行过程:

         生成的整个语法树如下:

 针对如下sql:

  1. SELECT score.id,
  2. 100 + 80 + score.math_score + score.english_score AS v
  3. FROM student
  4. JOIN score
  5. ON student.id = score.id
  6. AND student.age > 10

          执行计划为:

  1. 2.1 == Parsed Logical Plan ==
  2. 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#0]
  3. +- 'Join Inner, (('student.id = 'score.id) && ('student.age > 10))
  4. :- 'UnresolvedRelation `student`
  5. +- 'UnresolvedRelation `score`
  6. 2.2 == Analyzed Logical Plan ==
  7. id: int, v: int
  8. Project [id#13, (((100 + 80) + math_score#14) + english_score#15) AS v#0]
  9. +- Join Inner, ((id#10 = id#13) && (age#11 > 10))
  10. :- SubqueryAlias student
  11. : +- HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
  12. +- SubqueryAlias score
  13. +- HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]
  14. 2.3 == Optimized Logical Plan ==
  15. Project [id#13, ((180 + math_score#14) + english_score#15) AS v#0]
  16. +- Join Inner, (id#10 = id#13)
  17. :- Project [id#10]
  18. : +- Filter ((isnotnull(age#11) && (age#11 > 10)) && isnotnull(id#10))
  19. : +- HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
  20. +- Filter isnotnull(id#13)
  21. +- HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]
  22. 2.4 == Physical Plan ==
  23. *(5) Project [id#13, ((180 + math_score#14) + english_score#15) AS v#0]
  24. +- *(5) SortMergeJoin [id#10], [id#13], Inner
  25. :- *(2) Sort [id#10 ASC NULLS FIRST], false, 0
  26. : +- Exchange hashpartitioning(id#10, 200)
  27. : +- *(1) Project [id#10]
  28. : +- *(1) Filter ((isnotnull(age#11) && (age#11 > 10)) && isnotnull(id#10))
  29. : +- HiveTableScan [age#11, id#10], HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
  30. +- *(4) Sort [id#13 ASC NULLS FIRST], false, 0
  31. +- Exchange hashpartitioning(id#13, 200)
  32. +- *(3) Filter isnotnull(id#13)
  33. +- HiveTableScan [id#13, math_score#14, english_score#15], HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]

 

  2.1 Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan,当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法,如下:

  1. def sql(sqlText: String): DataFrame = {
  2. Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  3. }

 

  • 使用 Antlr 生成的 SqlBaseLexer 对 SQL 进行词法分析,生成 CommonTokenStream
  • 使用 Antlr 生成的 SqlBaseParser 进行语法分析,得到 LogicalPlan

如下:

  1. protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
  2. logDebug(s"Parsing command: $command")
  3. val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
  4. lexer.removeErrorListeners()
  5. lexer.addErrorListener(ParseErrorListener)
  6. val tokenStream = new CommonTokenStream(lexer)
  7. val parser = new SqlBaseParser(tokenStream)
  8. parser.addParseListener(PostProcessor)
  9. parser.removeErrorListeners()
  10. parser.addErrorListener(ParseErrorListener)
  11. try {
  12. try {
  13. // first, try parsing with potentially faster SLL mode
  14. parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
  15. toResult(parser)
  16. }
  17. catch {
  18. case e: ParseCancellationException =>
  19. // if we fail, parse with LL mode
  20. tokenStream.seek(0) // rewind input stream
  21. parser.reset()
  22. // Try Again.
  23. parser.getInterpreter.
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/389391
推荐阅读
相关标签
  

闽ICP备14008679号