赞
踩
时间语义,要配合窗口操作才能发挥作用
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。主要分为滚动窗口、滑动窗口和会话窗口
public class TestGroupWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //开启事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }) //提取事件时间戳和设置watermark .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime"); tableEnv.createTemporaryView("sensor", sensorTable); //分组窗口操作 //1. Table API //1.1 定义滚动窗口(时间或计数) Table resultTable = sensorTable.window(Tumble.over("10.minutes").on("rt").as("tw")) .groupBy("id, tw") .select("id, id.count as cnt, temp.avg as avgTemp, tw.start, tw.end"); //sensorTable.window(Tumble.over("10.rows").on("rt").as("trw")); //滚动计数窗口 //1.2 定义滑动窗口(时间或计数) //sensorTable.window(Slide.over("10.minutes").every("5.minutes").on("rt").as("sw")); //sensorTable.window(Slide.over("10.rows").every("5.rows").on("rt").as("srw")); //1.3 定义会话窗口 //sensorTable.window(Session.withGap("10.minutes").on("rt").as("sew")); //2. SQL //2.1 定义滚动窗口(时间或计数) /* TUMBLE(time_attr, interval):定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度 */ String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) as wEnd from sensor group by id, tumble(rt, interval '10' second)"; Table resultSqlTable = tableEnv.sqlQuery(sql); //2.2 定义滑动窗口(时间或计数) /* HOP(time_attr, interval, interval):定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度 */ //String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, hop_end(rt, interval '2' second, interval '10' second) as wEnd from sensor group by id, hop(rt, interval '2' second, interval '10' second)"; //Table resultSqlTable = tableEnv.sqlQuery(sql); //2.3 定义会话窗口 /* SESSION(time_attr, interval):定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔 */ //String sql = "select id, count(id) as cnt, avg(temp) as avgTemp, session_end(rt, interval '10' second) as wEnd from sensor group by id, session(rt, interval '10' second)"; //Table resultSqlTable = tableEnv.sqlQuery(sql); //基于窗口的集合统计可以转换为追加流输出 tableEnv.toAppendStream(resultTable, Row.class).print("result"); tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql"); env.execute(); } }
开窗窗口(Over Window)是标准 SQL 中已有的(over 子句),可以在查询的 SELECT 子句中定义,会针对每个输入行,计算相邻行范围内的聚合
public class TestOverWindow { public static void main(String[] args) throw Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = InputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, rt.rowtime"); tableEnv.createTemporaryView("sensor", dataTable); //开窗窗口 //1. TableAPI /* 基本语法:table.window(Over[.partitionBy("field")].orderBy("rowtime/proctime").preceding(...).as("alias")) 窗口范围的指定:preceding(...) 1.无界窗口:preceding(UNBOUNDED_RANGE|UNBOUNDED_ROW),表示从最开始的时间|行到当前数据 2.有界窗口:preceding("1.minutes|10.rows"),表示从当前时间的前一分钟到当前时间数据|从当前行的前10行到当前数据 */ Table overTable = sensorTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow")).select("id, rt, id.count over ow, temp.avg over ow"); //2. SQL /* 基本语法: 1. count()/sum()... over(partition by field order by rowtime|proctime range|rows between interval '1' minute|2 preceding and current row) from table 2. count() over ow/sum() over ow... from table window ow as (partition by field order by rowtime|proctime range|rows between interval '1' minute|2 preceding and current row) */ Table overSqlTable = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow from sensor window ow as (partition by id order by rt rows between 2 preceding and current row)"); tableEnv.toAppendStream(overTable, Row.class).print("result"); tableEnv.toRetractStream(overSqlTable, Row.class).print("sql"); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。