赞
踩
JAVA实现Flink Table基于事件时间的滑动窗口代码样例
- package org.fenghua.example.table.windos;
-
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.TableEnvironment;
- import org.apache.flink.table.api.java.Slide;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.table.descriptors.Json;
- import org.apache.flink.table.descriptors.Kafka;
- import org.apache.flink.table.descriptors.Rowtime;
- import org.apache.flink.table.descriptors.Schema;
- import org.apache.flink.types.Row;
-
- /**
- * 滑动窗口
- * {"id":2,"product":"test","amount":66,"createTime":"1637215294000"}
- *
- * @author: fenghua
- * @date: 2021/11/17
- */
- public class KafkaSlidingEventTimeWindos {
- private final static String SOURCE_TOPIC = "topic3";
- private final static String ZOOKEEPER_CONNECT = "127.0.0.1:2181";
- private final static String METADATA_BROKER_LIST = "127.0.0.1:9092";
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
- tEnv.connect(
- new Kafka()
- .version("0.10")
- .topic(SOURCE_TOPIC)
- .startFromLatest()
- .property("zookeeper.connect", ZOOKEEPER_CONNECT)
- .property("bootstrap.servers", METADATA_BROKER_LIST)
- )
- .withFormat(
- new Json()
- .schema(
- org.apache.flink.table.api.Types.ROW(
- new String[]{"id", "product", "amount","createTime"},
- new TypeInformation[]{
- Types.LONG,
- Types.STRING,
- Types.INT,
- Types.SQL_TIMESTAMP,
- }))
- .failOnMissingField(true)
- )
- .withSchema(
- new Schema()
- .field("id", Types.LONG)
- .field("product", Types.STRING)
- .field("amount", Types.INT)
- .field("rowTime", Types.SQL_TIMESTAMP)
- .rowtime(new Rowtime()
- .timestampsFromField("createTime")
- .watermarksPeriodicBounded(1000))
- )
- .inAppendMode()
- .registerTableSource("sourceTable");
-
- Table table1 = tEnv.sqlQuery("select id, rowTime from sourceTable");
- Table table2 = table1
- .window(Slide.over("20.second").every("10.second").on("rowTime").as("w"))
- .groupBy("id,w")
- .select("id,id.count");
- tEnv.toRetractStream(table2, Row.class).print();
- env.execute(" test ");
-
- }
-
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。