当前位置:   article > 正文

37、Flink 的窗口函数(Window Functions)详解_flink windowfunction

flink windowfunction
窗口函数(Window Functions)
a)概述

定义了 window assigner 之后,需要指定当窗口触发之后,如何计算每个窗口中的数据, 即 window function。

窗口函数有三种:ReduceFunctionAggregateFunctionProcessWindowFunction

  • 前两者执行更高效,因为 Flink 可以在每条数据到达窗口后进行增量聚合(incrementally aggregate);
  • ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用 ProcessWindowFunction 的窗口转换操作没有其它两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据; ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 合并来提高效率,既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。

b)ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同

Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

示例:对窗口内元组的第二个属性求和。

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
c)AggregateFunction

ReduceFunctionAggregateFunction 的特殊情况; AggregateFunction 接收三个参数:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)。

ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

示例:计算窗口内所有元素第二个属性的平均值。

private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
d)ProcessWindowFunction

ProcessWindowFunction 具备 Iterable 能获取窗口内所有的元素 ,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活;ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。

ProcessWindowFunction 的函数签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

    /**
     * Deletes any state in the {@code Context} when the Window expires (the watermark passes its
     * {@code maxTimestamp} + {@code allowedLateness}).
     *
     * @param context The context to which the window is being evaluated
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public void clear(Context context) throws Exception {}

    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();

        /** Returns the current processing time. */
        public abstract long currentProcessingTime();

        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();

        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();

        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

key 参数由 keyBy() 中指定的 KeySelector 选出;如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且需要手动将它转换为正确大小的 tuple 才能提取 key。

示例:使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口本身的信息一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
e)增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合,当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果;即实现了增量聚合窗口的元素并且从 ProcessWindowFunction 中获得窗口的元数据。

使用 ReduceFunction 增量聚合

示例:将 ReduceFunctionProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

使用 AggregateFunction 增量聚合

示例:将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
f)在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state,ProcessWindowFunction 还可以使用作用域仅为“当前正在处理的窗口”的 keyed state

per-window 中的 window 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口,具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。

Per-window state 如果处理有 1000 种不同 key 的事件,并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process() 接收到的 Context 对象中有两个方法允许访问以下两种 state:

  • globalState(),访问全局的 keyed state
  • windowState(), 访问作用域仅限于当前窗口的 keyed state

如果可能将一个 window 触发多次(比如当迟到数据会再次触发窗口计算, 或自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用,这时可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

当使用窗口状态时,一定记得在删除窗口时清除这些状态,应该定义在 clear() 方法中

WindowFunction(已过时)

在某些可以使用 ProcessWindowFunction 的地方,也可以使用 WindowFunction;它是旧版的 ProcessWindowFunction,只能提供更少的环境信息且缺少一些高级的功能,比如 per-window state。

WindowFunction 的函数签名如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

可以像下例这样使用:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/625415
推荐阅读
相关标签
  

闽ICP备14008679号