赞
踩
目录
Flink有3个内置Window
计数窗口,采用事件数量作为窗口处理依据。计数窗口分为滚动和滑动两类,使用keyedStream.countWindow实现计数窗口定义。
- /** 每3个事件,计算窗口内数据 */
-
- keyedStream.countWindow(3);
以上满足3个才会计算一次,没有则不计算 。
- /** 每3个事件,计算最近4个事件消息 */
-
- keyedStream.countWindow(4,3);
时间窗口,采用时间作为窗口处理依据。时间窗分为滚动和滑动两类,使用keyedStream.timeWindow实现时间窗定义。
- /** 每1分钟,计算窗口数据 */
-
- keyedStream.timeWindow( Time.minutes(1));
- /** 每半分钟,计算最近1分钟窗口数据 */
-
- keyedStream.timeWindow( Time.minutes(1), Time.seconds(30));
参考案例:案例一
功能需求:对每天(00:00:00-23:59:59)、每小时(00:00-59:59)这个两个区间段内的数据进行统计。
发现滑动时间窗口不满足这个功能,查找资料后发现具有这个function:TumblingEventTimeWindows
对每小时的数据count:
-
-
- import static flink.GetTime.dateToTimestamp;
- import static stranger.PropertyLoader.getPropertiesConfig;
-
- /**
- * @author
- * @description 对每小时的数据进行统计
- * @date 2019/6/6
- */
- public class EventTimeStreamWindowAll {
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- final String configPath = "config.properties";
- final Properties pro = getPropertiesConfig(configPath);
- final String topic = "stranger";
- final String groupId = "mainStranger";
- String bootstrapServers = pro.getProperty("bootstrap.servers");
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", bootstrapServers);//kafka的节点的IP或者hostName,多个使用逗号分隔
- properties.setProperty("group.id", groupId);//flink consumer flink的消费者的group.id
- FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
- kafkaSource.setStartFromLatest();
- SingleOutputStreamOperator<String> mainStream = env.addSource(kafkaSource).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
- @Nullable
- @Override
- public Watermark getCurrentWatermark() {
- return new Watermark(System.currentTimeMillis() - 5000);
- }
-
- @Override
- public long extractTimestamp(String s, long l) {
- String[] split = s.split("\\t");
- long timestamp = dateToTimestamp(split[0]);
- return timestamp;
- }
- });
-
- //2019-06-05 15:13:32,people6,进入,place3
- DataStream<Tuple5<String, String, String, String, Long>> mainCount = mainStream.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
- @Override
- public Tuple4<String, String, String, String> map(String s) throws Exception {
- String[] split = s.split("\\t");
- return new Tuple4<>(split[0], split[1], split[2], split[3]);
- }
- }).keyBy(2)
-
- //以整点为窗口,每小时的窗口,offset 默认为0
- .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
- //以每天为窗口,进行统计
- // .windowAll(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
- //每5秒触发一次
- .trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)))
- //每多少条触发一次
- // .trigger(CountTrigger.of(1))
- .process(new ProcessAllWindowFunction<Tuple4<String, String, String, String>, Tuple5<String, String, String, String, Long>, TimeWindow>() {
- @Override
- public void process(Context context, Iterable<Tuple4<String, String, String, String>> iterables, Collector<Tuple5<String, String, String, String, Long>> collector) throws Exception {
- Long sum = 0L;
- String time = null;
- String people = null;
- String behavior = null;
- String place = null;
- for (Tuple4<String, String, String, String> iterable : iterables) {
- sum += 1;
- time = iterable.f0;
- people = iterable.f1;
- behavior = iterable.f2;
- place = iterable.f3;
- }
- collector.collect(new Tuple5<>(time, people, behavior, place, sum));
- }
- });
-
- mainCount.print();
- try {
- env.execute("test count");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
会话窗口,采用会话持续时长作为窗口处理依据。设置指定的会话持续时长时间,在这段时间中不再出现会话则认为超出会话时长。例子:每只股票超过2秒没有交易事件时计算窗口内交易总金额。下图中“消息A、消息B”代表两只不同的股票。
- /** 会话持续2秒。当超过2秒不再出现会话认为会话结束 */
-
- keyedStream.window( ProcessingTimeSessionWindows.withGap( Time.seconds(2)))
聚合分为两类,一类是增量聚合,另一类是全量聚合。
窗口每进入一条数据,就计算一次:
常见的增量聚合函数有:
reduce(reduceFuction)
aggregate(aggregateFunction)
sum()
min()
max()
窗口触发的时候对窗口内的数据进行计算一次:
常见的全量聚合函数有:
apply(windowFunction)
process(processWindowFunction)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。