赞
踩
如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。
Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了Flink 未来的技术架构
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。
Flink 的 Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.
动态表(Dynamic Tables)
是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
与表示批处理数据的静态表不同,动态表是随时间变化的
。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)
。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT
操作。
假设有如下格式的数据:
[
user: VARCHAR, // 用户名
cTime: TIMESTAMP, // 访问 URL 的时间
url: VARCHAR // 用户访问的 URL
]
下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表的数据将不断增长
。
连续查询
在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表
。
在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
说明:
第一行数据
被插入到 clicks 表时,查询开始计算结果表。第一行数据 [Mary,./home] 插入后,结果表(右侧,上部)由一行 [Mary, 1] 组成。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class Flink01_TableApi_BasicUse { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); // 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2. 创建表: 将流转换成动态表. 表的字段名从pojo的属性名自动抽取 Table table = tableEnv.fromDataStream(waterSensorStream); // 3. 对动态表进行查询 Table resultTable = table .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); // 4. 把动态表转换成流 DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); resultStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
// 3. 对动态表进行查询
Table resultTable = table
.where($("vc").isGreaterOrEqual(20))
.groupBy($("id"))
.aggregate($("vc").sum().as("vc_sum"))
.select($("id"), $("vc_sum"));
// 4. 把动态表转换成流 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流
仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流
retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。
请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。
前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据
// 2. 创建表 // 2.1 表的元数据信息 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); // 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表 tableEnv.connect(new FileSystem().path("input/sensor.txt")) .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")) .withSchema(schema) .createTemporaryTable("sensor"); // 3. 做成表对象, 然后对动态表进行查询 Table sensorTable = tableEnv.from("sensor"); Table resultTable = sensorTable .groupBy($("id")) .select($("id"), $("id").count().as("cnt")); // 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记 DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); resultStream.print();
// 2. 创建表 // 2.1 表的元数据信息 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); // 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表 tableEnv .connect(new Kafka() .version("universal") .topic("sensor") .startFromLatest() .property("group.id", "bigdata") .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); // 3. 对动态表进行查询 Table sensorTable = tableEnv.from("sensor"); Table resultTable = sensorTable .groupBy($("id")) .select($("id"), $("id").count().as("cnt")); // 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记 DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); resultStream.print();
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import static org.apache.flink.table.api.Expressions.$; public class Flink02_TableApi_ToFileSystem { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); // 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable .where($("id").isEqual("sensor_1") ) .select($("id"), $("ts"), $("vc")); // 创建输出表 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); tableEnv .connect(new FileSystem().path("output/sensor_id.txt")) .withFormat(new Csv().fieldDelimiter('|')) .withSchema(schema) .createTemporaryTable("sensor"); // 把数据写入到输出表中 resultTable.executeInsert("sensor"); } }
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import static org.apache.flink.table.api.Expressions.$; public class Flink03_TableApi_ToKafka { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); // 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable .where($("id").isEqual("sensor_1") ) .select($("id"), $("ts"), $("vc")); // 创建输出表 Schema schema = new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc", DataTypes.INT()); tableEnv .connect(new Kafka() .version("universal") .topic("sink_sensor") .sinkPartitionerRoundRobin() .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); // 把数据写入到输出表中 resultTable.executeInsert("sensor"); } }
参考官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connect.html
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Flink05_SQL_BaseUse { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用sql查询未注册的表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'"); tableEnv.toAppendStream(resultTable, Row.class).print(); env.execute(); } }
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Flink05_SQL_BaseUse_2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用sql查询一个已注册的表 // 1. 从流得到一个表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); // 2. 把注册为一个临时视图 tableEnv.createTemporaryView("sensor", inputTable); // 3. 在临时视图查询数据, 并得到一个新表 Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'"); // 4. 显示resultTable的数据 tableEnv.toAppendStream(resultTable, Row.class).print(); env.execute(); } }
使用sql从Kafka读数据, 并写入到Kafka中
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Flink05_SQL_Kafka2Kafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1. 注册SourceTable: source_sensor tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with(" + "'connector' = 'kafka'," + "'topic' = 'topic_source_sensor'," + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092'," + "'properties.group.id' = 'atguigu'," + "'scan.startup.mode' = 'latest-offset'," + "'format' = 'json'" + ")"); // 2. 注册SinkTable: sink_sensor tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with(" + "'connector' = 'kafka'," + "'topic' = 'topic_sink_sensor'," + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092'," + "'format' = 'json'" + ")"); // 3. 从SourceTable 查询数据, 并写入到 SinkTable tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'"); } }
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。
处理时间属性可以在 schema 定义的时候用 .proctime
后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); // 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 声明一个额外的字段来作为处理时间字段 Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime()); sensorTable.print(); env.execute();
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Flink06_TableApi_ProcessTime { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建表, 声明一个额外的列作为处理时间 tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with(" + "'connector' = 'filesystem'," + "'path' = 'input/sensor.txt'," + "'format' = 'csv'" + ")"); TableResult result = tableEnv.executeSql("select * from sensor"); result.print(); } }
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
import com.atguigu.flink.java.chapter_5.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; public class Flink07_TableApi_EventTime { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> waterSensorStream = env .fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTs()) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv // 用一个额外的字段作为事件时间属性 .fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("et").rowtime()); table.execute().print(); env.execute(); } } // 使用已有的字段作为时间属性 .fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Flink07_TableApi_EventTime_2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t tEnv.executeSql("create table sensor(" + "id string," + "ts bigint," + "vc int, " + "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," + "watermark for t as t - interval '5' second)" + "with(" + "'connector' = 'filesystem'," + "'path' = 'input/sensor.txt'," + "'format' = 'csv'" + ")"); tEnv.sqlQuery("select * from sensor").execute().print(); } }
说明:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。