当前位置:   article > 正文

Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析_flink api文档

flink api文档

学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions

学习笔记如下:


接口实现样例

用户可以通过实现接口来完成自定义 Functions。

实现接口并使用的样例:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}
data.map(new MyMapFunction());
  • 1
  • 2
  • 3
  • 4

使用匿名类实现的样例:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});
  • 1
  • 2
  • 3

使用 Lambda 表达式实现(Java 8)样例:

data.filter(s -> s.startsWith("http://"));
  • 1
data.reduce((i1,i2) -> i1 + i2);
  • 1

Rick Functions

所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。

所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。

例如:可以将以下 MapFunction 代码

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}
  • 1
  • 2
  • 3

替换为

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
}
  • 1
  • 2
  • 3

在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:

data.map(new MyMapFunction());
  • 1

Rich Function 也可以使用匿名类的方式定义。例如:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
  • 1
  • 2
  • 3

累加器使用样例

累加器可以使用 Accumulator.add(V value) 方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。

使用计数器的方法如下:

Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)

private IntCounter numLines = new IntCounter();
  • 1

Step 2|在 rich function 的 open() 方法中注册累加器对象,也可以在次数定义名称

getRuntimeContext().addAccumulator("num-lines", this.numLines);
  • 1

Step 3|在 function 的任何地方(包括 open()close() 方法中)使用累加器

this.numLines.add(1);
  • 1

Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");
  • 1

单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。

需要注意的是,当前累加器的结果只有在整个作业结束后才可用。

计数器和累加器源码

计数器 IntCounter 类的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java

@PublicEvolving
public class IntCounter implements SimpleAccumulator<Integer> {

    private static final long serialVersionUID = 1L;

    private int localValue = 0;

    public IntCounter() {}

    public IntCounter(int value) {
        this.localValue = value;
    }

    // ------------------------------------------------------------------------
    //  Accumulator
    // ------------------------------------------------------------------------

    /** Consider using {@link #add(int)} instead for primitive int values */
    @Override
    public void add(Integer value) {
        localValue += value;
    }

    @Override
    public Integer getLocalValue() {
        return localValue;
    }

    @Override
    public void merge(Accumulator<Integer, Integer> other) {
        this.localValue += other.getLocalValue();
    }

    @Override
    public void resetLocal() {
        this.localValue = 0;
    }

    @Override
    public IntCounter clone() {
        IntCounter result = new IntCounter();
        result.localValue = localValue;
        return result;
    }

    // ------------------------------------------------------------------------
    //  Primitive Specializations
    // ------------------------------------------------------------------------

    public void add(int value) {
        localValue += value;
    }

    public int getLocalValuePrimitive() {
        return this.localValue;
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "IntCounter " + this.localValue;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 使用 localValue 属性来存储当前 function 中的累加器的值
  • add 方法:实现累加操作
  • merge 方法:实现多个累加器的合并
  • clone() 方法:实现累加器的复制

计数器 IntCounter 类实现了 SimpleAccumulator 接口,SimpleAccumulator 接口的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java

@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}
  • 1
  • 2
  • 3
  • 4

SimpleAccumulator 接口继承了 Accumulator 接口,并定义添加的值的类型与最终结果的类型相同;Accumulator 接口的源码如下:

@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
    /** @param value The value to add to the accumulator object */
    void add(V value);

    /** @return local The local value from the current UDF context */
    R getLocalValue();

    /** Reset the local value. This only affects the current UDF context. */
    void resetLocal();

    /**
     * Used by system internally to merge the collected parts of an accumulator at the end of the
     * job.
     *
     * @param other Reference to accumulator to merge in.
     */
    void merge(Accumulator<V, R> other);

    /**
     * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
     * throw a {@link java.lang.CloneNotSupportedException}
     *
     * @return The duplicated accumulator.
     */
    Accumulator<V, R> clone();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • add 方法:实现累加器的累加操作
  • getLocalValue() 方法:读取当前 UDF 中累加器的值
  • resetLocal() 方法:重置当前 UDF 中累加器的值
  • merge 方法:在 Flink 合并不同 UDF 的结果时使用
  • clone() 方法:复制累加器对象,实现了 Clonable 接口

Accumulator 接口中,定义了添加的值的类型 V 和最终结果的类型 R。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。

SimpleAccumulator 接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。

自定义累加器

如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/1019110
推荐阅读
相关标签
  

闽ICP备14008679号