赞
踩
## 一些术语约定(下文中有不理解再来看)
整体执行流程图
大概经过下面这几个阶段:
- 逻辑计划生成:
1. spark.sql() 这种方式的话会涉及到sql的解析,解析之后就会生成逻辑计划
2. 如果是直接在DataFrame的api上直接操作的话,使用的api将直接生成逻辑计划
- 分析:生成的逻辑计划将会通过解析器结合元数据信息(catalog)进行分析,比如识别到你读的表是什么文件放在哪儿。
- 优化:解析后的逻辑计划会经过优化器进行优化。 例如谓词下推,比如将你的过滤条件下推到下一层,这样就让上一层少算点数据了。
- 生成物理计划:将优化后的逻辑计划转化成能执行的物理计划,也就是转化成rdd的操作。比如你的inner join到底是用broadcatJoin还是用shuffleJoin。这里传说会使用基于代价的优化(RBO)对来进行优化,但是源码中只看到了收集统计信息。
- 在spark里面是如何用代码来表示逻辑的呢 答案就是用树结构来表示。也就是你写的sql或者dataframe的操作,spark都会转化成一颗逻辑树,来看两个例子。
## 例子
###### 首先我们先伪造两张表 people score还有对映的两个DataFrame变量
- # 首先我们先伪造两张表 people score还有对映的两个DataFrame变量
- from pyspark.sql.functions import sum
- people = spark.range(100).selectExpr("id","id+100 age")
- score = spark.range(100).selectExpr("id","id+1 math_score","id+2 english_score")
- score.cache().count()
- people.cache().count()
- spark.sql("use tmp")
- people.write.saveAsTable("people")
- score.write.saveAsTable("score")
###### DataFrame的操作
- people = spark.table("people")
- score = spark.table("score")
- tmp = people.join(score,score.id == people.id ).filter(people.age > 10)
- tmp2 = tmp.select(score.id, (score.math_score+100+80+score.english_score).alias("v") )
- res = tmp2.select(sum(tmp2.v))
###### sql语句的操作
- res = spark.sql("""
- select sum(v)
- from (
- select score.id
- ,100+80+score.math_score+score.english_score as v
- from people
- join score
- on people.id=score.id
- where people.age>10
- )tmp""")
###### 解释:
以上两种虽然是不同的代码写出来的但是表达的逻辑其实都是一样的,在spark里面就长下图这样。这样一来就将我们通过代码写出来的逻辑转化成了一棵逻辑树,每一个节点都是一步操作。
解析之后的计划
## 逻辑树在 spark 中的实现
上面的例子我们用图完成了数据操作逻辑的表示,树中的每个节点都相当于一步操作,只要我们获取了最上层的那个源节点我们就能遍历整棵树了。也可以在这个源节点上层增加各种操作,形成更大的一棵树。
spark里面逻辑节点都是TreeNode 主要用在三个地方:
逻辑计划和物理计划都继承自QueryPlan,节点有三种类型:
表达式可能会有三个的节点。
抽象语法树
分析和优化spark里都是使用的规则来进行的,所有抽象出了一个规则执行器的类(RuleExecutor)然后分析器(Analyzer)与优化器(Optmizer)都是它的子类。看图:
- Rule : 具体的规则,把一个LogicalPlan转化成另一个LogicalPlan,实现的过程就大量利用了scala模式匹配的优势。比如PrushDownPredicate(谓词下推)。
- Batch: 一批规则,有些规则需要结合起来使用的,所以规则都统一封装成Batch。
- Strategy:每个Batch都有自己的执行策略,比如有的只执行一次,有个可能需要一直迭代执行到结果不在改变为止。
- 规则执行器 (RuleExecutor):这个类里面就包含了很多Batch,用于使用这些Batch
规则执行器
上文说到物理计划在spark里面使用SparkPlan这个类进行表示,它使用rdd来完成各项操作,是可执行的计划。那么LogicalPlan 是如何转化成SparkPlan来进行执行的呢。
GenericStrategy: 这个类就是用来把LogicalPlan 转化成SparkPlan的。他也是基于规则的,子类就是不同规则的实现,比如DataSourceStrategy就是用来处理一些数据源相关的LogicalPlan变成SparkPlan的。
QueryPlanner:计划的执行者,手上拥有很多LogicalPlan转SparkPlan的策略集合。具体的实现是SparPlanner去干的。
转成物理计划
pass 大家自己去源码探索吧 有问题联系我
加我信微 Zeal-Zeng 费免拉你进 知识星球、大数据社群、众公号(曾二爷) 和优秀的人一起学习
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。