赞
踩
定义了 window assigner 之后,需要指定当窗口触发之后,如何计算每个窗口中的数据, 即 window function。
窗口函数有三种:ReduceFunction
、AggregateFunction
或 ProcessWindowFunction
;
ProcessWindowFunction
会得到能够遍历当前窗口内所有数据的 Iterable
,以及关于这个窗口的 meta-information。使用 ProcessWindowFunction
的窗口转换操作没有其它两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据; ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
合并来提高效率,既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction
接收窗口的 metadata。
ReduceFunction
指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。
Flink 使用 ReduceFunction
对窗口中的数据进行增量聚合。
示例:对窗口内元组的第二个属性求和。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
ReduceFunction
是 AggregateFunction
的特殊情况; AggregateFunction
接收三个参数:输入数据的类型(IN
)、累加器的类型(ACC
)和输出数据的类型(OUT
)。
输入数据的类型是输入流的元素类型,AggregateFunction
接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT
类型)。
与 ReduceFunction
相同,Flink 会在输入数据到达窗口时直接进行增量聚合。
示例:计算窗口内所有元素第二个属性的平均值。
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
ProcessWindowFunction 具备 Iterable 能获取窗口内所有的元素 ,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活;ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。
ProcessWindowFunction
的函数签名如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* Deletes any state in the {@code Context} when the Window expires (the watermark passes its
* {@code maxTimestamp} + {@code allowedLateness}).
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public void clear(Context context) throws Exception {}
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
key
参数由 keyBy()
中指定的 KeySelector
选出;如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple
, 并且需要手动将它转换为正确大小的 tuple 才能提取 key。
示例:使用 ProcessWindowFunction
对窗口中的元素计数,并且将窗口本身的信息一同输出。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
搭配使用, 使其能够在数据到达窗口的时候进行增量聚合,当窗口关闭时,ProcessWindowFunction
将会得到聚合的结果;即实现了增量聚合窗口的元素并且从 ProcessWindowFunction 中获得窗口的元数据。
使用 ReduceFunction 增量聚合
示例:将 ReduceFunction
与 ProcessWindowFunction
组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
使用 AggregateFunction 增量聚合
示例:将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
除了访问 keyed state,ProcessWindowFunction
还可以使用作用域仅为“当前正在处理的窗口”的 keyed state;
per-window 中的 window 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口,具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。
Per-window state 如果处理有 1000 种不同 key 的事件,并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。
process()
接收到的 Context
对象中有两个方法允许访问以下两种 state:
globalState()
,访问全局的 keyed statewindowState()
, 访问作用域仅限于当前窗口的 keyed state如果可能将一个 window 触发多次(比如当迟到数据会再次触发窗口计算, 或自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用,这时可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。
当使用窗口状态时,一定记得在删除窗口时清除这些状态,应该定义在 clear()
方法中。
WindowFunction(已过时)
在某些可以使用 ProcessWindowFunction
的地方,也可以使用 WindowFunction
;它是旧版的 ProcessWindowFunction
,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。
WindowFunction
的函数签名如下:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
可以像下例这样使用:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。