当前位置:   article > 正文

Flink专题五:Flink 中自定义Sink_flink 自定义sink

flink 自定义sink

由于工作需要最近学习flink
现记录下Flink介绍和实际使用过程
这是flink系列的第五篇文章

Sink介绍

flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中。
在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,es等。

连接器是否提供Source支持是否提供Sink支持
Apache Kafka
Apache Cassandra
Apache Kinesis Data Streams
Elasticsearch
HDFS
RabbitMQ
Apache NiFi
Twitter Streaming API

当然我们也可以通过自定义的方式,来实现我们自己的sink,自定义可以通过两种方式:

  1. 实现SinkFunction接口
  2. 继承RichSinkFunction类

SinkFunction接口介绍

首先看下Flink中提供的SinkFunction接口,实现了SinkFunction接口就可以实现自定义Sink。
这是SinkFunction接口的源码,我们只需要实现invoke方法即可实现自定义sink:

public interface SinkFunction<IN> extends Function, Serializable {
    /** @deprecated */
    @Deprecated
    default void invoke(IN value) throws Exception {
    }

    default void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.invoke(value);
    }

    @Public
    public interface Context<T> {
        long currentProcessingTime();

        long currentWatermark();

        Long timestamp();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

RichSinkFunction类介绍

既然SinkFunction接口即可满足要求,那么为什么要通过继承RichSinkFunction类来实现呢。
这是RichSinkFunction类的源码:

public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
    private static final long serialVersionUID = 1L;

    public RichSinkFunction() {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以看到RichSinkFunction除了实现了SinkFunction接口,还继承了AbstractRichFunction 类。
这是AbstractRichFunction 类的源码:

public abstract class AbstractRichFunction implements RichFunction, Serializable {
    private static final long serialVersionUID = 1L;
    private transient RuntimeContext runtimeContext;

    public AbstractRichFunction() {
    }

    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext)this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }

    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这里可以看到open和close方法,我们可以通过重写这两个方法,在open方法中连接资源,在close方法中关闭资源、释放资源。

当然我们还可以在open中做更多的操作,比如我们的sink是将数据加入到clickhouse中,加入clickhouse时数据最好是批量加入(不清楚百度哈),所以我们要在open中初始化定时任务去定时处理数据,每10000条加入一次。当然单线程不一定来得及,还要配合上线程池使用,所以open方法中改成初始化线程池去执行定时任务。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/606527
推荐阅读
相关标签
  

闽ICP备14008679号