学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions



用户可以通过实现接口来完成自定义 Functions。


class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
data.map(new MyMapFunction());
  • 4


data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
  • 3

使用 Lambda 表达式实现(Java 8)样例:

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rick Functions

所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。

所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。

例如:可以将以下 MapFunction 代码

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:

data.map(new MyMapFunction());
  • 1

Rich Function 也可以使用匿名类的方式定义。例如:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
累加器可以使用 Accumulator.add(V value) 方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。


Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)

private IntCounter numLines = new IntCounter();
Step 2|在 rich function 的 open() 方法中注册累加器对象,也可以在次数定义名称

getRuntimeContext().addAccumulator("num-lines", this.numLines);
Step 3|在 function 的任何地方(包括 open()close() 方法中)使用累加器

Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。

单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。



计数器 IntCounter 类的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java

public class IntCounter implements SimpleAccumulator<Integer> {

    private static final long serialVersionUID = 1L;

    private int localValue = 0;

    public IntCounter() {}

    public IntCounter(int value) {
        this.localValue = value;

    // ------------------------------------------------------------------------
    //  Accumulator
    // ------------------------------------------------------------------------

    /** Consider using {@link #add(int)} instead for primitive int values */
    public void add(Integer value) {
        localValue += value;

    public Integer getLocalValue() {
        return localValue;

    public void merge(Accumulator<Integer, Integer> other) {
        this.localValue += other.getLocalValue();

    public void resetLocal() {
        this.localValue = 0;

    public IntCounter clone() {
        IntCounter result = new IntCounter();
        result.localValue = localValue;
        return result;

    // ------------------------------------------------------------------------
    //  Primitive Specializations
    // ------------------------------------------------------------------------

    public void add(int value) {
        localValue += value;

    public int getLocalValuePrimitive() {
        return this.localValue;

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    public String toString() {
        return "IntCounter " + this.localValue;
  • 使用 localValue 属性来存储当前 function 中的累加器的值
  • add 方法:实现累加操作
  • merge 方法:实现多个累加器的合并
  • clone() 方法:实现累加器的复制

计数器 IntCounter 类实现了 SimpleAccumulator 接口,SimpleAccumulator 接口的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java

public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}
  • add 方法:实现累加器的累加操作
  • getLocalValue() 方法:读取当前 UDF 中累加器的值
  • resetLocal() 方法:重置当前 UDF 中累加器的值
  • merge 方法:在 Flink 合并不同 UDF 的结果时使用
  • clone() 方法:复制累加器对象,实现了 Clonable 接口

Accumulator 接口中,定义了添加的值的类型 V 和最终结果的类型 R。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。

SimpleAccumulator 接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。


如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。

