赞
踩
由于工作需要最近学习flink
现记录下Flink介绍和实际使用过程
这是flink系列的第五篇文章
flink的sink是flink三大逻辑结构之一(source,transform,sink),功能就是负责把flink处理后的数据输出到外部系统中。
在编写代码的过程中,我们可以使用flink已经提供的sink,如kafka,es等。
连接器 | 是否提供Source支持 | 是否提供Sink支持 |
Apache Kafka | 是 | 是 |
Apache Cassandra | 否 | 是 |
Apache Kinesis Data Streams | 是 | 是 |
Elasticsearch | 否 | 是 |
HDFS | 否 | 是 |
RabbitMQ | 是 | 是 |
Apache NiFi | 是 | 是 |
Twitter Streaming API | 是 | 是 |
当然我们也可以通过自定义的方式,来实现我们自己的sink,自定义可以通过两种方式:
首先看下Flink中提供的SinkFunction接口,实现了SinkFunction接口就可以实现自定义Sink。
这是SinkFunction接口的源码,我们只需要实现invoke方法即可实现自定义sink:
public interface SinkFunction<IN> extends Function, Serializable {
/** @deprecated */
@Deprecated
default void invoke(IN value) throws Exception {
}
default void invoke(IN value, SinkFunction.Context context) throws Exception {
this.invoke(value);
}
@Public
public interface Context<T> {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
既然SinkFunction接口即可满足要求,那么为什么要通过继承RichSinkFunction类来实现呢。
这是RichSinkFunction类的源码:
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
public RichSinkFunction() {
}
}
可以看到RichSinkFunction除了实现了SinkFunction接口,还继承了AbstractRichFunction 类。
这是AbstractRichFunction 类的源码:
public abstract class AbstractRichFunction implements RichFunction, Serializable {
private static final long serialVersionUID = 1L;
private transient RuntimeContext runtimeContext;
public AbstractRichFunction() {
}
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
public IterationRuntimeContext getIterationRuntimeContext() {
if (this.runtimeContext == null) {
throw new IllegalStateException("The runtime context has not been initialized.");
} else if (this.runtimeContext instanceof IterationRuntimeContext) {
return (IterationRuntimeContext)this.runtimeContext;
} else {
throw new IllegalStateException("This stub is not part of an iteration step function.");
}
}
public void open(Configuration parameters) throws Exception {
}
public void close() throws Exception {
}
}
这里可以看到open和close方法,我们可以通过重写这两个方法,在open方法中连接资源,在close方法中关闭资源、释放资源。
当然我们还可以在open中做更多的操作,比如我们的sink是将数据加入到clickhouse中,加入clickhouse时数据最好是批量加入(不清楚百度哈),所以我们要在open中初始化定时任务去定时处理数据,每10000条加入一次。当然单线程不一定来得及,还要配合上线程池使用,所以open方法中改成初始化线程池去执行定时任务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。