赞
踩
DataStream API 提供流处理较低层次的、原始的编程API,例如时间、状态和数据流管理,而Table API则在此基础上抽象出许多内部构件,提供结构化和声明式API。
两种API都可以处理有界或无界流。为了提高效率,两种API都分别提供了优化的批执行模式,然而因为批式一种特殊的流,所以可以在一个标准流执行模式运行有界流。
两种API不需相互依赖,可以分别独自定义完整的数据流管道,但是在实际使用过程种,为了各种便利的原因可以混合使用他们:
相比与DataStream API ,Table 生态系统在访问目录和外部系统更加方便
而无状态数据的规范换和清洗使用SQL的方法更加方便
如果要用定时器等可以转换成DataStream
虽然在两种API之间转化过程中有一定的资源被消耗,但是这种消耗可以忽略不计。
为了和DataStream API整合,Flink提供了专门的SteamTableEnvironment。这些环境使用额外的方法扩展了常规的TableEnvironment,并以DataStream API中使用的StreamExecutionEnvironment作为参数。
下面是一个简单的Table-to-Stream转换的示例,Table的列名和类型都是从DataStream的TypeInformation 中派生出来的,而DataSteam API不支持更新流,所以在stream-to-table和table-to-stream转换过程中,仅假设是append-only/insert-only语义。
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEvn: StreamTableEnvironment = StreamTableEnvironment.create(env) val inputTable: DataStream[String] = env.fromElements("Alice", "Bob", "John") // 默认临时表的字段名(默认f0,f1,...)和字段类型皆来自源Datastream tableEvn.createTemporaryView("InputTable", inputTable) // 用户自定义字段名及字段类型,如下面的自定义schema val schema = Schema.newBuilder().column("name", DataTypes.STRING()).build() // tableEvn.createTemporaryView("InputTable2", inputTable, schema) // 这种shema怎么用呢? val resultTable: Table = tableEvn.sqlQuery("select * from InputTable") // 也可以从已定义Table创建临时表 tableEvn.createTemporaryView("InputTable3", resultTable) val resultTable3: Table = tableEvn.sqlQuery("select * from InputTable3") val resultStream: DataStream[Row] = tableEvn.toDataStream(resultTable) val resultStream3: DataStream[Row] = tableEvn.toDataStream(resultTable3) resultStream.print() resultStream3.print() env.execute()
可以看到打印结果+I表示 insert操作:
4> +I[John]
4> +I[Bob]
3> +I[Alice]
3> +I[Bob]
1> +I[John]
2> +I[Alice]
下面是更新表转换为流的示例,每个结果行都会带一个标记,可以使用 row.getKind()方法获得。
import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.Table import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val dataStream: DataStream[Row] = env.fromElements( Row.of("Alice", Int.box(12)) , Row.of("Bob", Int.box(10)) , Row.of("Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // as 重新命名字段名称 val inputTable: Table = tableEnv.fromDataStream(dataStream).as("name", "score") tableEnv.createTemporaryView("InputTable", inputTable) val resultTable: Table = tableEnv.sqlQuery("select name, sum(score) as totalScore from InputTable group by name") resultTable.printSchema() val resultStream: DataStream[Row] = tableEnv.toChangelogStream(resultTable) resultStream.print() env.execute()
打印结果:
3> +I[Alice, 12]
3> +I[Bob, 10]
3> -U[Alice, 12]
3> +U[Alice, 112]
对于批数据处理,可以将所有数据读取后一次处理便得到最终结果,程序内部会做相应的算法优化以提高效率。下面示例展示如果设置相应参数来选择处理数据模式。
import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.Table import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 批运行模式(官方建议在部署时通过命令行模式设定,这样可以做到灵活部署成不同模式) env.setRuntimeMode(RuntimeExecutionMode.BATCH) // 流运行模式 // env.setRuntimeMode(RuntimeExecutionMode.STREAMING) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val dataStream: DataStream[Row] = env.fromElements( Row.of("Alice", Int.box(12)) , Row.of("Bob", Int.box(10)) , Row.of("Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // as 重新命名字段名称 val inputTable: Table = tableEnv.fromDataStream(dataStream).as("name", "score") tableEnv.createTemporaryView("InputTable", inputTable) val resultTable: Table = tableEnv.sqlQuery("select name, sum(score) as totalScore from InputTable group by name") resultTable.printSchema() val resultStream: DataStream[Row] = tableEnv.toChangelogStream(resultTable) resultStream.print() env.execute()
打印结果:
批模式
3> +I[Alice, 112]
2> +I[Bob, 10]
流模式
3> +I[Alice, 12]
3> +I[Bob, 10]
3> -U[Alice, 12]
3> +U[Alice, 112]
TableEnvironment将采用传入的StreamExecutionEnvironment中的所有配置选项,但不能保证对StreamExecutionEnvironment配置的进一步更改在其实例化后传播到StreamTableEnvironment。从Table API到DataStream API的选项传播发生在planning期间。
官方推荐在 DataStream API 转换到 Table API之前设置好所有选项参数。
import java.time.ZoneId import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ // create Scala DataStream API val env = StreamExecutionEnvironment.getExecutionEnvironment // set various configuration early env.setMaxParallelism(256) env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer]) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // then switch to Scala Table API val tableEnv = StreamTableEnvironment.create(env) // set configuration early tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin")) // start defining your pipelines in both APIs...
两种API程序最终提交集群运行的标志是由各自execute方法被调用,但他们运行行为又有些不同。
DataStream API 的StreamExecutionEnvironment 用builder模式构建pipeline,pipeline可能包含很多分支,同时其分支可能不以sink收尾。而StreamExecutionEnvironment 缓存所有分支直到该job提交。
StreamExecutionEnvironment.execute()提交整个pipeline同时清楚缓存的pipeline。简而言之,DataStream API程序最终以DataStream.execute()收尾提交程序,或者DataStream.executeAndCollect()(将结果发送到本地客户端)收尾。
在Table API中,分支pipelines必须在StatementSet对象中声明一个最终的sink,即,其不像DataStream API 程序,最终一个通用的execute()方法,将正整个程序包含其各分支提交集群运行,而是每一个source-to-sink 的pipiline都需要提供execute()方法。
// execute with explicit sink tableEnv.from("InputTable").executeInsert("OutputTable") tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable") tableEnv.createStatementSet() .addInsert("OutputTable", tableEnv.from("InputTable")) .addInsert("OutputTable2", tableEnv.from("InputTable")) .execute() tableEnv.createStatementSet() .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable") .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable") .execute() // execute with implicit local sink tableEnv.from("InputTable").execute().print() tableEnv.executeSql("SELECT * FROM InputTable").print()
混用两种API,每次调用StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 将会编译Table API子pipeline然后将其合并入DataStream API 的管道builder。这意味着 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect 最终被调用才会将程序提交集群运行。
// (1)
// 将表对象转换成流并打印,当执行到此,而未执行env.execute()就不会真正提交该分支到集群运行
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print()
// (2)
// 使用Table API 编写端到端的数据流管道,只要执行到此,便真正的在集群中运行该分支
// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print()
// 最终 将(1)定义的流管道提交集群并运行,而此时(2)已经在运行了
// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute()
对于有界批数据和无界流,Table API & SQL planner分别提供专门的优化规则和运行时算子,目前运行时模式不能自动的由数据源获得,因此必须显式设定或由StreamExecutionEnvironment实例化StreamTableEnvironment而采用StreamExecutionEnvironment的设定。
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.EnvironmentSettings
// adopt mode from StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val tableEnv = StreamTableEnvironment.create(env)
// or
// set mode explicitly for StreamTableEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
下面示例展示如果使用Flink 自带DataGen table source:
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val table: Table = tableEnv.from( TableDescriptor.forConnector("datagen") .schema( Schema.newBuilder() .column("uid", DataTypes.TINYINT()) .column("playload", DataTypes.STRING()) .build() ) // .option("number-of-rows", "10") // 设置总的数据行,默认是无界的,如果设置该参数则变为有界的了 .option("rows-per-second", "10") // 设置每秒参审多少条数据 // .option("fields.uid.min", "0") // 设置字段uid的最小值,如果已经设定该字段为序列则不能在有该选项 .option("fields.uid.kind", "sequence") // 设置字段uid为序列值 .option("fields.uid.start", "0") // 设置字段uid为序列值 .option("fields.uid.end", "100") // 设置字段uid为序列值 .build() ) table.printSchema() val aaa = tableEnv.toDataStream(table) .keyBy(r => r.getFieldAs[String]("playload")) .map(r => "My custom operator: " + r.getFieldAs[String]("playload") + "||" + r.getFieldAs[Int]("uid")) // 必须是字符串才能执行成功,且""必须在前面,在后面会报不能将Byte转换成Integer类型错误 // .map(r => "" + r.getFieldAs[Int]("uid")) .executeAndCollect() .foreach(println)
下面的案例不经在API层面做到统一,而且在更新流结果做到了统一:
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import java.time.LocalDateTime; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); SingleOutputStreamOperator<Row> userStream = env .fromElements( Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"), Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"), Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob") ) .returns( Types.ROW_NAMED( new String[]{"ts", "uid", "name"}, Types.LOCAL_DATE_TIME, Types.INT, Types.STRING ) ); SingleOutputStreamOperator<Row> orderStream = env .fromElements( Row.of(LocalDateTime.parse("2021-08-21T13:02:00"), 1, 122), Row.of(LocalDateTime.parse("2021-08-21T13:07:00"), 2, 239), Row.of(LocalDateTime.parse("2021-08-21T13:11:00"), 2, 999), Row.of(LocalDateTime.parse("2021-08-21T13:15:01"), 2, 999) ) .returns( Types.ROW_NAMED( new String[]{"ts", "uid", "amount"}, Types.LOCAL_DATE_TIME, Types.INT, Types.INT ) ); tableEnv.createTemporaryView( "UserTable", userStream, Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP()) .column("uid", DataTypes.INT()) .column("name", DataTypes.STRING()) .build() ); tableEnv.createTemporaryView( "OrderTable", orderStream, Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP()) .column("uid", DataTypes.INT()) .column("amount", DataTypes.INT()) .build() ); Table joinedTable = tableEnv.sqlQuery( "select U.name, U.ts as Uts, O.amount, O.ts as Ots " + "from UserTable U, OrderTable O " + "where U.uid=O.uid and O.ts between U.ts and U.ts +interval '5' minutes" // 时间满足U.ts <= O.ts <= U.ts ); DataStream<Row> joinedStream = tableEnv.toDataStream(joinedTable); joinedStream.print("joinedStream: "); joinedStream .keyBy(r -> r.<String>getFieldAs("name")) .process( new KeyedProcessFunction<String, Row, String>() { ValueState<String> seen; @Override public void open(Configuration parameters) throws Exception { seen = getRuntimeContext().getState( new ValueStateDescriptor<String>("seen", String.class)); } @Override public void processElement(Row row, Context context, Collector<String> out) throws Exception { String name = row.getFieldAs("name"); if (seen.value() == null) { // order只输出第一次关联上的结果 seen.update(name); out.collect(row.toString()); } } } ).print("process: "); env.execute();
输出结果:
只有join上的结果才会出现在join的结果流中:
joinedStream: :3> +I[Alice, 2021-08-21T13:00, 122, 2021-08-21T13:02]
joinedStream: :4> +I[Bob, 2021-08-21T13:05, 239, 2021-08-21T13:07]
joinedStream: :4> +I[Bob, 2021-08-21T13:10, 999, 2021-08-21T13:11]
只输出第一次join结果
process: :3> +I[Alice, 2021-08-21T13:00, 122, 2021-08-21T13:02]
process: :4> +I[Bob, 2021-08-21T13:05, 239, 2021-08-21T13:07]
StreamTableEnvironment 提供了如下的table-to-stream和stream-to-table的方法:
fromDataStream(DataStream):Event-time 和 watermarks不会被继承;
fromDataStream(DataStream, Schema):选项参数Schema可以丰富列数据的类型、增加时间属性、watermarks策略、其他计算列及主键;
createTemporaryView(String, DataStream):在流上注册一个view以便用sql访问,它是createTemporaryView(String, fromDataStream(DataStream))的简化版;
createTemporaryView(String, DataStream, Schema):同上,它是createTemporaryView(String, fromDataStream(DataStream, Schema))的简化版;
toDataStream(Table):默认数据类型是org.apache.flink.types.Row,rowtime时间属性被重新添加数据行中,且watermarks也会继续有效;
toDataStream(Table, AbstractDataType): 此方法接受表示所需流记录类型的数据类型。规划器可以插入隐式类型转换和重排序列,将列映射到(可能嵌套的)数据类型的字段。
toDataStream(Table, Class): 是toDataStream(Table, DataTypes.of(Class)) 的简化版,快速获得所需类型;
从Table API方面来看,Table和DataStream之间的转换和使用sql( CREATE TABLE DDL )定义的table connector相似。
在CREATE TABLE name (schema) WITH (options)定义中的schema可以自动继承自DataStream的类型信息,而为了增加某些配置项或完全自定义可以使用org.apache.flink.table.api.Schema。
DataStream table connector为每个数据行暴露了如下元数据信息:
DataStream table source 实现了 SupportsSourceWatermark ,因此允许调用SOURCE_WATERMARK()内置函数设置watermark策略去采用DataStream API的watermarks。
import java.time.{Duration, Instant} import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{Schema, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment case class User(name: String, score: Integer, event_time: Instant) object FromDataStreamTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 时间设置没用,始终晚8小时 // tableEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC")) // tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) // tableEnv.getConfig.setLocalTimeZone(ZoneOffset.ofHours(8)) val dataStream: DataStream[User] = env.fromElements( User("Alice", 4, Instant.ofEpochMilli(1000)), User("Bob", 6, Instant.ofEpochMilli(1001)), User("Alice", 10, Instant.ofEpochMilli(1002)) ) // 该配置是为了example(4)会从源DataStream继承watermark配置而设置的 // val dataStream2 = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(2)) { // override def extractTimestamp(element: User): Long = { // element.event_time.toEpochMilli // } // }) val dataStream2 = dataStream.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness[User](Duration.ofSeconds(2)) // 乱序的2秒延迟 .withTimestampAssigner(new SerializableTimestampAssigner[User] { // 提取时间字段 override def extractTimestamp(element: User, recordTimestamp: Long): Long = element.event_time.toEpochMilli })) // example(1) 从DataStream继承所有数据项特性 val table: Table = tableEnv.fromDataStream(dataStream) table.printSchema() /* ( `name` STRING, `score` INT NOT NULL, `event_time` TIMESTAMP_LTZ(9) ) */ // example(2) 从DataStream继承所有数据项特性,同时增加一个计算列,在本例中增加了一个处理时间属性列 val table2 = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByExpression("proc_time", "PROCTIME()") .build() ) table2.printSchema() tableEnv.toDataStream(table2).print() /* ( `name` STRING, `score` INT NOT NULL, `event_time` TIMESTAMP_LTZ(9), `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME() ) */ // example(3) 从DataStream继承所有数据项特性,同时增加一个计算列,在本例中增加了一个处理时间属性列和自定义watermark策略 val table3 = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime - INTERVAL '10' SECOND ") .build() ) table3.printSchema() tableEnv.toDataStream(table3).print("table3") /* ( `name` STRING, `score` INT NOT NULL, `event_time` TIMESTAMP_LTZ(9), `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)), WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND ) */ // example(4) 从DataStream继承所有数据项特性,同时增加一个计算列,在本例中是获取行的时间戳创建了rowtime时间属性列 // 和获取watermark策略 默认为dataStream已经设定好的 val table4: Table = tableEnv.fromDataStream( dataStream2, Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build() ) table4.printSchema() tableEnv.toDataStream(table4).print("table4") /* ( `name` STRING, `score` INT NOT NULL, `event_time` TIMESTAMP_LTZ(9), `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA, WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() ) */ // 自定义schema,将时间戳的精度由9降到3,案例中是把event_time放到最前,但是报错 val table5: Table = tableEnv.fromDataStream( dataStream, Schema.newBuilder() .column("name", "STRING") .column("score", "INT") .column("event_time", "TIMESTAMP_LTZ(3)") .watermark("event_time", "SOURCE_WATERMARK()") .build() ) table5.printSchema() /* ( `name` STRING, `score` INT, `event_time` TIMESTAMP_LTZ(3) *ROWTIME*, WATERMARK FOR `event_time`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK() ) */ env.execute() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。