赞
踩
窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
对窗口的正确理解:
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
注:
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口
以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车)
计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车)
窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
DataStream<T> input = ...; // 滚动 event-time 窗口 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 滚动 processing-time 窗口 input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
DataStream<T> input = ...; // 滑动 event-time 窗口 input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 滑动 processing-time 窗口 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 滑动 processing-time 窗口,偏移量为 -8 小时 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
DataStream<T> input = ...; // 设置了固定间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // 设置了动态间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>); // 设置了固定间隔的 processing-time session 窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // 设置了动态间隔的 processing-time 会话窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>);
这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...)
.countWindow(10)
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...)
.countWindow(10,3)
定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
//v1 和v2是 2个相同类型的输入参数
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。
/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());
接口中有四个方法:
可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。
public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))); sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) { // 上下文可以拿到window对象,还有其他东西:侧输出流 等等 long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements); } }).print(); env.execute(); } }
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function) // AggregateFunction与WindowFunction结合 public <ACC,V,R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction) // AggregateFunction与ProcessWindowFunction结合 public <ACC,V,R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T,ACC,V> aggFunction, ProcessWindowFunction<V,R,K,W> windowFunction)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。