赞
踩
ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用ReduceFunction来逐步聚合窗口的元素。
ReduceFunction可以定义像这样使用:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce {
(v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction是一般化版本的ReduceFunction,其具有三种类型:输入类型【IN】,累加器【ACC】和一个输出类型【OUT】。输入类型是输入流中元素的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法。该接口还具有创建初始累加器,将两个累加器合并为一个累加器以及从累加器提取输出的方法。
与ReduceFunction一样,Flink将在窗口输入元素到达时增量地对其进行聚合。
AggregateFunction可以被定义并这样使用:
/** * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。