当前位置:   article > 正文

Flink开发-滚动窗口TumblingWindows_tumblingeventtimewindows.of

tumblingeventtimewindows.of

Flink开发-滚动窗口TumblingWindows

滚动窗口是按照时间划分的窗口,其Assinger会将输入的每一条数据按照时间分配到固定长度的窗口内,并且按照这个固定的时间进行滚动,窗口和窗口之间没有数据重叠。

1.Non-Keyed Tumbling Windows

我们在创建TimeWindowAll的时候可以通过timeWindowAll()方法来直接创建,但要注意该方法目前已经过时,在使用的过程中需要为程序指定当前所使用的时间类型。目前建议使用windowAll()来创建窗口,可以直接传入需要的事件类型和窗口类型,与timeWindowAll()相比更加直观。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });
        //老API
//        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.timeWindowAll(Time.seconds(5)).sum(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
        sum.print();
        env.execute("");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输入内容:

C:\Users\zhibai>nc -lp 8888
spark 1
flink 1
hadoop 1
hadoop 1      --5s
flink 1
flink 1
flink 1
hadoop 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

输出结果:

6> (spark,4)
7> (flink,4)
  • 1
  • 2

2.Keyed Tumbling Windows

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(" ");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> s) throws Exception {
                return s.f0;
            }
        });
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> processingwindow = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        processingwindow.sum(1).print();
        env.execute("");

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

输入内容:

C:\Users\zhibai>nc -lp 8888
hadoop 1
hadoop 1
hadoop 1
flink 1
flink 1
flink 1
flink 1
spark 1
spark 1
spark 1
hadoop 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

输出结果:

8> (hadoop,1)
8> (hadoop,2)
7> (flink,1)
7> (flink,3)
1> (spark,1)
8> (hadoop,1)
1> (spark,2)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

窗口的时间间隔也可以指定其他类型,例如:Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x)、Time.hours(x)、Time.days(x)。除此之外还可以调用Time的of方法,传入数字和时间单位TimeUnit,例如Time.of(1, TimeUnit.HOURS)。

TumblingWindows的of方法如果指定一个参数,就会按照指定的时间周期性的滚动形成新的窗口,例如TumblingProcessingTimeWindows.of(Time.days(1)),那么窗口的起始时间是以当前系统的ProcessingTime的整点开始以小时为单位对齐。例如[1:00:00.000, 1:59:59.999]对应一个窗口,[2:00:00.000, 2:59:59.999]会对应下一个窗口,并且会不断的生成窗口。(为了方便描述,才使用1:00:00.000这种格式,窗口的时间其实是timestamp格式)。

TumblingWindows的of方法还可以传入2个参数,第二个参数的作用是将时间调整成指定时区的时间。在UTC-0以外的时区,就需要指定一个偏移量进行调整。例如,在中国就必须指定Time.hours(-8)的偏移量。

//窗口长度为1天,同时将数据转换成对应的时区的时间
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> eventTimewindow = keyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)));
  • 1
  • 2
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号