当前位置:   article > 正文

Flink SQL 执行流程简单介绍_flink执行sql

flink执行sql

在这里插入图片描述

这篇文章从 org.apache.flink.table.examples.java.StreamSQLExample 这个简单的例子分析 Flink SQL 的一个简单的执行流程,也算 Flink SQL 执行流程初步的入门,我们先从整体的执行框架了解一个整体流程,方便之后我们有机会对 Flink SQL 进行修改先有一个简单认识

1. Apache Calcite 是什么?

说到 Flink SQL 我们就不能不提 Apache Calcite 这个项目,它是一个通用的动态数据管理框架,可以以统一的 SQL 查询语言进行数据的管理,目前很多大数据软件例如:Hive,Drill等都基于 Calcite 做 SQL 解析等工作。和大部分SQL执行流程一样,主要可以分成四个阶段:

在这里插入图片描述

Parse:语法解析阶段,将用户传入的 SQL 语句转化为 SqlNode
Validate:语法校验阶段,通过类似 Catelog 等进行元数据的合法验证,此时还是 SqlNode
Optimize:执行计划优化阶段,将 SqlNode 转化为 RelNode,并利用规则优化器进行等价转化等操作,最后得到优化过后的执行计划
Execute:将逻辑查询计划,生成可执行的代码,提交运行
  • 1
  • 2
  • 3
  • 4

2. Flink SQL 的执行过程

通过上面,我们可以大致的有个概念,就是 SQL 的解析到最后的执行,可以大致的分为四个阶段,接下来我们这四个阶段是怎么和映射到 Flink 上面的,我们先看下简单的 StreamSQLExample

// union the two tables
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +
                "SELECT * FROM OrderB WHERE amount < 2");

tEnv.toAppendStream(result, Order.class).print();
  • 1
  • 2
  • 3
  • 4
  • 5

对于 Flink(这篇文章基于1.10.0,BlinkPlanner)来说,目前分为两个 Planner,默认情况如果不显示设置默认使用的是 OldPlanner,如果需要使用BlinkPlanner,需要显示的调用 useBlinkPlanner,BlinkPlanner 里有很多优化的新特性,在将来社区将会把 BlinkPlanner 替换 OldPlanner

3. 语法解析

上述的 SQL 语句为:

SELECT * FROM UnnamedTable$0 WHERE amount > 2
UNION ALL
SELECT * FROM OrderB WHERE amount < 2
  • 1
  • 2
  • 3

我们可以简单的先看下调用栈:

tEnv.sqlQuery
    -> ParserImpl.parse
         -> CalciteParser.parse(statement)
         -> SqlToOperationConverter.convert(planner, catalogManager, parsed)
  • 1
  • 2
  • 3
  • 4

通过调用栈可以看出,CalciteParser 对我们输入的 statement 进行了解析,对于 CalciteParser 来说,它主要是 Calcite 的 SqlParser 的一个封装类,并且在创建的时候,指定 Calcite 的 ParserFactory 为 FlinkSqlParserImpl.FACTORY,告诉 Calcite 采用 FlinkSqlParserImpl 实现,而这个类是通过 JavaCC 自动生成,我们可以简单的浏览下 flink-sql-parser 这个模块,这里有实现 Flink 自定义解析的一些文件

在这里插入图片描述
我们可以在这里简单的说一下它的一个简单的过程,我们都知道,maven在执行的时候是分阶段的,我只说明下面几个:

  • validate: 用于验证项目的有效性和其项目所需要的内容是否具备
  • initialize:初始化操作,比如创建一些构建所需要的目录等。
  • generate-sources:用于生成一些源代码,这些源代码在compile phase中需要使用到
  • process-sources:对源代码进行一些操作,例如过滤一些源代码
  • generate-resources:生成资源文件(这些文件将被包含在最后的输入文件中)
  • process-resources:对资源文件进行处理
  • compile:对源代码进行编译
  • 等等

我们可以通过 flink-sql-parser 模块的maven文件,简单的可以总结一下流程:

  • unpack-parser-template (Maven initialize 阶段):对 calcite-core 的jar文件进行解压,拷贝**/Parser.jj到
  • copy-fmpp-resources (Maven initialize 阶段):拷贝src/main/codegen中的文件到target/codegen
  • generate-fmpp-sources (Maven generate-sources 阶段):fmpp 插件生成代码
  • javacc (Maven generate-sources 阶段):javacc 对整体代码进行生成,放置到target/generated-sources,之后继续打包

简单的说完它的如何生成之后,其实它最终会调用 FlinkSqlParserImpl.parseSqlStmtEof 做最终的 SQL 解析,将 SQL 转化为 SqlNode,SqlNode 是所有解析节点的父类,一个 SqlNode 就是一个 SQL 解析树,我们看下一下的类图关系

在这里插入图片描述
我们通过一个简单里的例子去理解,例如:

select id, cast(score as int), 'hello' from T where id < ?
  • 1

在上面的SQL中,id、score、T 等为 SqlIdentifier,cast() 为 SqlCall,int 为SqlDataTypeSpec,’hello’ 为 SqlLiteral,’?’ 为SqlDynamicParam,所有的操作都是一个 SqlCall, 例如查询是一个 SqlSelect、删除是一个 SqlDelete 等等,下面对上述类型做简要的总结:

  • SqlIdentifier:SQL中的Id标示符
  • SqlCall:是对操作符的调用. 操作符可以用来描述任何语法结构,SQL解析树中的每个非叶节点都是某种类型的SqlCall
  • SqlDataTypeSpec:SQL数据类型规范
  • SqlLiteral:SQL中的常量, 表示输入的常量
  • SqlDynamicParam:SQL语句中的动态参数标记
  • SqlKind:SqlNode类型
  • SqlOperator:SQL解析的节点类型,包括:函数,操作符(=),语法结构(case)等操作

了解完上述的基本知识之后,我们可以简单的看下上诉 SQL 转换为 SqlNode 的一个简单结构

在这里插入图片描述

3. 元数据校验

经过上面的解析分析之后,需要通过 catalogManager 进行元数据上面的校验,这里通过 FlinkCalciteSqlValidator.validate 进行语法的合法解析,

public static Optional<Operation> convert(
        FlinkPlannerImpl flinkPlanner,
        CatalogManager catalogManager,
        SqlNode sqlNode) {
    // validate the query
    final SqlNode validated = flinkPlanner.validate(sqlNode);
    SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

经过合法性验证,补充了相应的元数据

SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`
FROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`
WHERE `UnnamedTable$0`.`amount` > 2
UNION ALL
SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`
FROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`
WHERE `OrderB`.`amount` < 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

并调用 FlinkPlannerImpl.rel 将合法的 SqlNode 转变成 RelNode 同时,对于行表达式 (SqlOperator) 将转换为 RexNode

在这里插入图片描述
最后分装成 Operation,在这里是 PlannerQueryOperation 返回, 对于 Operation 来说,它涵盖所有种类的表操作,例如查询(DQL)、修改(DML)、定义(DDL)、或控制动作(DCL),我们可以简单的看下他的类图结构

在这里插入图片描述

原文:[Flink] Flink SQL 原理解析 (1)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/684340
推荐阅读
相关标签
  

闽ICP备14008679号