赞
踩
一步一个脚印,一天一道大数据面试题
博主希望能够得到大家的点赞收,藏支持!非常感谢~
点赞,收藏是情分,不点是本分。祝你身体健康,事事顺心!
我们来看看 Flink SQL
大概流程和样例:
1.创建 流处理环境 StreamExecutionEnvironment env
2.创建 表环境 StreamTableEnvironment.create(env);
3.创建 source
表,sink
表
4.用 table API 编写查询 SQL(返回 Table
对象)
5.执行 sink executeInsert("sink")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class SqlDemo2 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1.创建表环境 // 1.1 方法 1 // EnvironmentSettings settings = EnvironmentSettings.newInstance() // .inStreamingMode() // .build(); // TableEnvironment tableEnv = TableEnvironment.create(settings); // 1.2 方法 2 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建表 // 用 datagen 生成随机数据作为 source tableEnv.executeSql("CREATE TABLE source (\n" + " id INT\n" + " ,ts BIGINT\n" + " ,vc INT\n" + ") WITH (\n" + " 'connector' = 'datagen'\n" + " ,'rows-per-second'='1'\n" + " ,'fields.id.kind'='random'\n" + " ,'fields.id.min'='1'\n" + " ,'fields.id.max'='10'\n" + " ,'fields.ts.kind'='sequence'\n" + " ,'fields.ts.min'='1'\n" + " ,'fields.ts.max'='1000000'\n" + " ,'fields.vc.kind'='random'\n" + " ,'fields.vc.min'='1'\n" + " ,'fields.vc.max'='100'\n" + ");\n"); tableEnv.executeSql("CREATE TABLE sink(\n" + " id INT,\n" + " sumVC INT,\n" + ") WITH (\n" + "'connector'='print'\n" + ");\n"); // 执行查询 Table source = tableEnv.from("source"); Table select = source.where($("id").isGreater(5)) .groupBy($("id")) .aggregate($("vc").sum().as("sumVC")) .select($("id"), $("sumVC")); // 执行 sink select.executeInsert("sink"); } }
我是近未来,祝你变得更强!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。