赞
踩
这篇文章从 org.apache.flink.table.examples.java.StreamSQLExample 这个简单的例子分析 Flink SQL 的一个简单的执行流程,也算 Flink SQL 执行流程初步的入门,我们先从整体的执行框架了解一个整体流程,方便之后我们有机会对 Flink SQL 进行修改先有一个简单认识
说到 Flink SQL 我们就不能不提 Apache Calcite 这个项目,它是一个通用的动态数据管理框架,可以以统一的 SQL 查询语言进行数据的管理,目前很多大数据软件例如:Hive,Drill等都基于 Calcite 做 SQL 解析等工作。和大部分SQL执行流程一样,主要可以分成四个阶段:
Parse:语法解析阶段,将用户传入的 SQL 语句转化为 SqlNode
Validate:语法校验阶段,通过类似 Catelog 等进行元数据的合法验证,此时还是 SqlNode
Optimize:执行计划优化阶段,将 SqlNode 转化为 RelNode,并利用规则优化器进行等价转化等操作,最后得到优化过后的执行计划
Execute:将逻辑查询计划,生成可执行的代码,提交运行
通过上面,我们可以大致的有个概念,就是 SQL 的解析到最后的执行,可以大致的分为四个阶段,接下来我们这四个阶段是怎么和映射到 Flink 上面的,我们先看下简单的 StreamSQLExample
// union the two tables
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2");
tEnv.toAppendStream(result, Order.class).print();
对于 Flink(这篇文章基于1.10.0,BlinkPlanner)来说,目前分为两个 Planner,默认情况如果不显示设置默认使用的是 OldPlanner,如果需要使用BlinkPlanner,需要显示的调用 useBlinkPlanner,BlinkPlanner 里有很多优化的新特性,在将来社区将会把 BlinkPlanner 替换 OldPlanner
上述的 SQL 语句为:
SELECT * FROM UnnamedTable$0 WHERE amount > 2
UNION ALL
SELECT * FROM OrderB WHERE amount < 2
我们可以简单的先看下调用栈:
tEnv.sqlQuery
-> ParserImpl.parse
-> CalciteParser.parse(statement)
-> SqlToOperationConverter.convert(planner, catalogManager, parsed)
通过调用栈可以看出,CalciteParser 对我们输入的 statement 进行了解析,对于 CalciteParser 来说,它主要是 Calcite 的 SqlParser 的一个封装类,并且在创建的时候,指定 Calcite 的 ParserFactory 为 FlinkSqlParserImpl.FACTORY,告诉 Calcite 采用 FlinkSqlParserImpl 实现,而这个类是通过 JavaCC 自动生成,我们可以简单的浏览下 flink-sql-parser 这个模块,这里有实现 Flink 自定义解析的一些文件
我们可以在这里简单的说一下它的一个简单的过程,我们都知道,maven在执行的时候是分阶段的,我只说明下面几个:
我们可以通过 flink-sql-parser 模块的maven文件,简单的可以总结一下流程:
简单的说完它的如何生成之后,其实它最终会调用 FlinkSqlParserImpl.parseSqlStmtEof 做最终的 SQL 解析,将 SQL 转化为 SqlNode,SqlNode 是所有解析节点的父类,一个 SqlNode 就是一个 SQL 解析树,我们看下一下的类图关系
我们通过一个简单里的例子去理解,例如:
select id, cast(score as int), 'hello' from T where id < ?
在上面的SQL中,id、score、T 等为 SqlIdentifier,cast() 为 SqlCall,int 为SqlDataTypeSpec,’hello’ 为 SqlLiteral,’?’ 为SqlDynamicParam,所有的操作都是一个 SqlCall, 例如查询是一个 SqlSelect、删除是一个 SqlDelete 等等,下面对上述类型做简要的总结:
了解完上述的基本知识之后,我们可以简单的看下上诉 SQL 转换为 SqlNode 的一个简单结构
经过上面的解析分析之后,需要通过 catalogManager 进行元数据上面的校验,这里通过 FlinkCalciteSqlValidator.validate 进行语法的合法解析,
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
...
}
经过合法性验证,补充了相应的元数据
SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`
FROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`
WHERE `UnnamedTable$0`.`amount` > 2
UNION ALL
SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`
FROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`
WHERE `OrderB`.`amount` < 2
并调用 FlinkPlannerImpl.rel 将合法的 SqlNode 转变成 RelNode 同时,对于行表达式 (SqlOperator) 将转换为 RexNode
最后分装成 Operation,在这里是 PlannerQueryOperation 返回, 对于 Operation 来说,它涵盖所有种类的表操作,例如查询(DQL)、修改(DML)、定义(DDL)、或控制动作(DCL),我们可以简单的看下他的类图结构
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。