赞
踩
Flink使用指南:Flink设置全局变量,并在函数中获取,让你的代码更加优雅!
Flink使用指南:Checkpoint机制,完全搞懂了,你就是大佬!
Flink使用指南: 面试必问内存管理模型,进大厂一定要知道!
最近的工作主要是在向实时计算平台方向迁移,之前的实时计算任务都是用Flink DataStream API开发的,对于DBA或者分析人员来说,可能开发代码能难度太大,所以我们打算把API封装好做成Flink SQL对外提供服务。那么其中就要涉及到一些是社区不提供的某些功能,比如与业务紧密结合的自定义函数,比如一些Source和Sink的连接器。
下面我就给大家讲一讲 如何基于Flink1.11.0版本之后开发一个自定义的Flink SQL Connector。
自定义Factory继承DynamicTableSinkFactory,DynamicTableSourceFactory接口,支持读取和写入两种功能。
拿DynamicTableSinkFactory接口来说,需要实现以下几种方法:
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- return null;
- }
-
- @Override
- public String factoryIdentifier() {
- return null;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return null;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return null;
- }
factoryIdentifier():返回connecotor的名字
requiredOptions():必填参数 (比如URL,TABLE_NAME,USERNAME,PASSWORD)
optionalOptions(): 可选参数(比如提交方式,提交批次等)
createDynamicTableSink(): 创建TableSink对象,返回一个 DynamicTableSink
继承DynamicTableSink接口,实现以下方法:
- @Override
- public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
- return null;
- }
-
- @Override
- public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- return null;
- }
-
- @Override
- public DynamicTableSink copy() {
- return null;
- }
-
- @Override
- public String asSummaryString() {
- return null;
- }
getChangelogMode(): 写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化:
INSERT_ONLY = newBuilder().addContainedKind(RowKind.INSERT).build();
具体实现,看下源码就知道了....
getSinkRuntimeProvider(): Sink端输入的主要提供者,这里可以实现一个自定义的OutputFormat继承,当然也有人直接继承RichSinkFunction..这个和dataStream API实现写入方式就很像了。
RichOutputFormat<RowData>主要是写入逻辑的实现。
copy():返回一个DynamicTableSink
asSummaryString(): 接口功能总结,可以概述下这个spi主要是xxxx Sink
这里着重提一下Source端的自定义Connector,它是需要继承LookupTableSource接口,实现Lookup功能才能当做Source端使用。
- @Override
- public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
- return null;
- }
-
- @Override
- public DynamicTableSource copy() {
- return null;
- }
-
- @Override
- public String asSummaryString() {
- return null;
- }
getLookupRuntimeProvider(): 从社区已经提供的Connector中可以看到如果想实现一个Lookup功能,需要继承TableFunction<RowData>接口.
Flink SQL的自定义Connector主要是用Java的SPI方式可以加载到应用服务当中,什么是SPI机制能,这里做个知识点记录下:
1.什么是SPI
SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的接口,它可以用来启用框架扩展和替换组件。 SPI的作用就是为这些被扩展的API寻找服务实现。
2.SPI和API的使用场景
API (
Application Programming Interface)在
大多数情况下,都是实现方
制定接口并完成对接口的实现,调用方
仅仅依赖接口调用,且无权选择不同实现。 从使用人员上来说,API 直接被应用开发人员使用。SPI (
Service Provider Interface)
是调用方
来制定接口规范,提供给外部来实现,调用方在调用时则
选择自己需要的外部实现。 从使用人员上来说,SPI 被框架扩展人员使用。
所以根据SPI机制的使用机制,需要在resources目录下新建META-INF.services新建文件
org.apache.flink.table.factories.Factory
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- org.apache.flink.connector.jdbc.table.AdbpgDynamicTableFactory
文件中添加工厂类的路径就行。
以上文章讲了下 Flink SQL 自定义Connector实现方式,这样我们就可以用原生的Flink SQL语法进行业务逻辑开发,用自定义Connector实现入库。这种开发方式是不是很方便??
大家还想学什么,欢迎留言,关注我,不迷路,大家一起学习大数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。