赞
踩
前三节,我们从spark底层的RDD角度去剖析了整个Spark 程序的执行逻辑,以及一些原理性的东西,当然我们在使用的时候要是直接使用Spark Core的编程语法也可以,在此基础上Spark 还提供了基于SQL的编程语法,也就是Spark-Sql,本文章从以下几个方面去分析Spark-Sql
Spark sql是spark内部最核心,也是社区最活跃的组件。Spark SQL支持在Spark中执行SQL,或者HiveQL的关系查询表达式。列式存储的类RDD(DataSet/DataFrame)数据类型以及对sql语句的支持使它更容易上手,同时,它对数据的抽取、清洗的特性,使它广泛的用于etl,甚至是机器学习领域。因此,saprk sql较其他spark组件,获得了更多的使用者。
如下图:
无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作,主要流程包括:
下面我们结合源码来分析下整个sql的执行过程:
生成的整个语法树如下:
针对如下sql:
- SELECT score.id,
- 100 + 80 + score.math_score + score.english_score AS v
- FROM student
- JOIN score
- ON student.id = score.id
- AND student.age > 10
执行计划为:
- 2.1 == Parsed Logical Plan ==
- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#0]
- +- 'Join Inner, (('student.id = 'score.id) && ('student.age > 10))
- :- 'UnresolvedRelation `student`
- +- 'UnresolvedRelation `score`
-
- 2.2 == Analyzed Logical Plan ==
- id: int, v: int
- Project [id#13, (((100 + 80) + math_score#14) + english_score#15) AS v#0]
- +- Join Inner, ((id#10 = id#13) && (age#11 > 10))
- :- SubqueryAlias student
- : +- HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
- +- SubqueryAlias score
- +- HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]
-
- 2.3 == Optimized Logical Plan ==
- Project [id#13, ((180 + math_score#14) + english_score#15) AS v#0]
- +- Join Inner, (id#10 = id#13)
- :- Project [id#10]
- : +- Filter ((isnotnull(age#11) && (age#11 > 10)) && isnotnull(id#10))
- : +- HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
- +- Filter isnotnull(id#13)
- +- HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]
-
- 2.4 == Physical Plan ==
- *(5) Project [id#13, ((180 + math_score#14) + english_score#15) AS v#0]
- +- *(5) SortMergeJoin [id#10], [id#13], Inner
- :- *(2) Sort [id#10 ASC NULLS FIRST], false, 0
- : +- Exchange hashpartitioning(id#10, 200)
- : +- *(1) Project [id#10]
- : +- *(1) Filter ((isnotnull(age#11) && (age#11 > 10)) && isnotnull(id#10))
- : +- HiveTableScan [age#11, id#10], HiveTableRelation `db_yqs_b_505`.`student`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#10, age#11, name#12]
- +- *(4) Sort [id#13 ASC NULLS FIRST], false, 0
- +- Exchange hashpartitioning(id#13, 200)
- +- *(3) Filter isnotnull(id#13)
- +- HiveTableScan [id#13, math_score#14, english_score#15], HiveTableRelation `db_yqs_b_505`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, math_score#14, english_score#15]
2.1 Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan,当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法,如下:
- def sql(sqlText: String): DataFrame = {
- Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
- }
如下:
- protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
- logDebug(s"Parsing command: $command")
-
- val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
- lexer.removeErrorListeners()
- lexer.addErrorListener(ParseErrorListener)
-
- val tokenStream = new CommonTokenStream(lexer)
- val parser = new SqlBaseParser(tokenStream)
- parser.addParseListener(PostProcessor)
- parser.removeErrorListeners()
- parser.addErrorListener(ParseErrorListener)
-
- try {
- try {
- // first, try parsing with potentially faster SLL mode
- parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
- toResult(parser)
- }
- catch {
- case e: ParseCancellationException =>
- // if we fail, parse with LL mode
- tokenStream.seek(0) // rewind input stream
- parser.reset()
-
- // Try Again.
- parser.getInterpreter.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。