赞
踩
之前FlinkSql用的比较少,今天开始简单介绍一下。
首先导入依赖:
<!-- 引入flink table相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency>
然后我们的数据源是读取文件:
- package beans;
-
- //传感器温度读数的数据类型
- public class SensorReading {
- //属性 id,时间戳,温度值
- private String id;
- private Long timestamp;
- private Double temperature;
-
- public SensorReading() {
- }
-
- public SensorReading(String id, Long timestamp, Double temperature) {
- this.id = id;
- this.timestamp = timestamp;
- this.temperature = temperature;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Double getTemperature() {
- return temperature;
- }
-
- public void setTemperature(Double temperature) {
- this.temperature = temperature;
- }
-
- @Override
- public String toString() {
- return "SensorReading{" +
- "id='" + id + '\'' +
- ", timestamp=" + timestamp +
- ", temperature=" + temperature +
- '}';
- }
- }
具体使用DEMO:
- package table;
-
- import beans.SensorReading;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- public class Table1 {
- public static void main(String[] args) throws Exception{
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<String> inputStream = env.readTextFile("D:\\idle\\FlinkTest\\src\\main\\resources\\sensor.txt");
- DataStream<SensorReading> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
- });
-
- //创建表的执行环境
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- //创建表
- Table dataTable = tableEnv.fromDataStream(dataStream);
- //调用table API进行转换操作
- Table resultTable = dataTable.select("id, temperature")
- .where("id = 'sensor_1'");
- //执行sql
- //dataTable注册进环境
- tableEnv.createTemporaryView("sensor",dataTable);
- String sql = "select id, temperature from sensor where id='sensor_1'";
- Table resultSqlTable = tableEnv.sqlQuery(sql);
-
- //打印结果
- tableEnv.toAppendStream(resultTable, Row.class).print("result");
- tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
-
- env.execute();
-
-
- }
- }
查看结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。