赞
踩
滚动窗口是按照时间划分的窗口,其Assinger会将输入的每一条数据按照时间分配到固定长度的窗口内,并且按照这个固定的时间进行滚动,窗口和窗口之间没有数据重叠。
我们在创建TimeWindowAll的时候可以通过timeWindowAll()方法来直接创建,但要注意该方法目前已经过时,在使用的过程中需要为程序指定当前所使用的时间类型。目前建议使用windowAll()来创建窗口,可以直接传入需要的事件类型和窗口类型,与timeWindowAll()相比更加直观。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
//老API
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.timeWindowAll(Time.seconds(5)).sum(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
sum.print();
env.execute("");
}
输入内容:
C:\Users\zhibai>nc -lp 8888
spark 1
flink 1
hadoop 1
hadoop 1 --5s
flink 1
flink 1
flink 1
hadoop 1
输出结果:
6> (spark,4)
7> (flink,4)
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> s) throws Exception {
return s.f0;
}
});
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> processingwindow = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
processingwindow.sum(1).print();
env.execute("");
}
输入内容:
C:\Users\zhibai>nc -lp 8888
hadoop 1
hadoop 1
hadoop 1
flink 1
flink 1
flink 1
flink 1
spark 1
spark 1
spark 1
hadoop 1
输出结果:
8> (hadoop,1)
8> (hadoop,2)
7> (flink,1)
7> (flink,3)
1> (spark,1)
8> (hadoop,1)
1> (spark,2)
窗口的时间间隔也可以指定其他类型,例如:Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x)、Time.hours(x)、Time.days(x)。除此之外还可以调用Time的of方法,传入数字和时间单位TimeUnit,例如Time.of(1, TimeUnit.HOURS)。
TumblingWindows的of方法如果指定一个参数,就会按照指定的时间周期性的滚动形成新的窗口,例如TumblingProcessingTimeWindows.of(Time.days(1)),那么窗口的起始时间是以当前系统的ProcessingTime的整点开始以小时为单位对齐。例如[1:00:00.000, 1:59:59.999]对应一个窗口,[2:00:00.000, 2:59:59.999]会对应下一个窗口,并且会不断的生成窗口。(为了方便描述,才使用1:00:00.000这种格式,窗口的时间其实是timestamp格式)。
TumblingWindows的of方法还可以传入2个参数,第二个参数的作用是将时间调整成指定时区的时间。在UTC-0以外的时区,就需要指定一个偏移量进行调整。例如,在中国就必须指定Time.hours(-8)的偏移量。
//窗口长度为1天,同时将数据转换成对应的时区的时间
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> eventTimewindow = keyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)));
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。