赞
踩
Apache Flink提供了两种顶层的关系型API,分别为Table API和SQL,Flink通过Table API&SQL实现了批流统一。其中Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合关系运算符(例如select,where和join)的查询。Flink SQL基于Apache Calcite 实现了标准的SQL,用户可以使用标准的SQL处理数据集。Table API和SQL与Flink的DataStream和DataSet API紧密集成在一起,用户可以实现相互转化,比如可以将DataStream或者DataSet注册为table进行操作数据。值得注意的是,Table API and SQL目前尚未完全完善,还在积极的开发中,所以并不是所有的算子操作都可以通过其实现。
从Flink1.9开始,Flink为Table & SQL API提供了两种planner,分别为Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要区别如下:
尖叫提示:对于生产环境,目前推荐使用old planner.
flink-table-common
: 通用模块,包含 Flink Planner 和 Blink Planner 一些共用的代码flink-table-api-java
: java语言的Table & SQL API,仅针对table(处于早期的开发阶段,不推荐使用)flink-table-api-scala
: scala语言的Table & SQL API,仅针对table(处于早期的开发阶段,不推荐使用)flink-table-api-java-bridge
: java语言的Table & SQL API,支持DataStream/DataSet API(推荐使用)flink-table-api-scala-bridge
: scala语言的Table & SQL API,支持DataStream/DataSet API(推荐使用)flink-table-planner
:planner 和runtime. planner为Flink1,9之前的old planner(推荐使用)flink-table-planner-blink
: 新的Blink planner.flink-table-runtime-blink
: 新的Blink runtime.flink-table-uber
: 将上述的API模块及old planner打成一个jar包,形如flink-table-*.jar,位与/lib目录下flink-table-uber-blink
:将上述的API模块及Blink 模块打成一个jar包,形如fflink-table-blink-*.jar,位与/lib目录下Blink planner和old planner有许多不同的特点,具体列举如下:
BatchTableSource
,使用的是有界的StreamTableSource。Catalog
,不支持ExternalCatalog
(已过时)。PlannerExpression
(未来会被移除),而Blink planner 会谓词下推到 Expression
(表示一个产生计算结果的逻辑树)。根据使用的语言不同,可以选择下面的依赖,包括scala版和java版,如下:
<!-- java版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
除此之外,如果需要在本地的IDE中运行Table API & SQL的程序,则需要添加下面的pom依赖:
<!-- Flink 1.9之前的old planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
另外,如果需要实现自定义的格式(比如和kafka交互)或者用户自定义函数,需要添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
所有的Table API&SQL的程序(无论是批处理还是流处理)都有着相同的形式,下面将给出通用的编程结构形式:
// 创建一个TableEnvironment对象,指定planner、处理模式(batch、streaming)
TableEnvironment tableEnv = ...;
// 创建一个表
tableEnv.connect(...).createTemporaryTable("table1");
// 注册一个外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过Table API的查询创建一个Table 对象
Table tapiResult = tableEnv.from("table1").select(...);
// 通过SQL查询的查询创建一个Table 对象
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 将结果写入TableSink
tapiResult.insertInto("outputTable");
// 执行
tableEnv.execute("java_job");
注意:Table API & SQL的查询可以相互集成,另外还可以在DataStream或者DataSet中使用Table API & SQL的API,实现DataStreams、 DataSet与Table之间的相互转换。
TableEnvironment是Table API & SQL程序的一个入口,主要包括如下的功能:
DataStream
、DataSet
与Table之间的相互转换ExecutionEnvironment
、StreamExecutionEnvironment
的引用一个Table必定属于一个具体的TableEnvironment,不可以将不同TableEnvironment的表放在一起使用(比如join,union等操作)。
TableEnvironment是通过调用 Bat
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。