当前位置:   article > 正文

Flink使用指南: 教你Flink SQL自定义Connector开发,使用SQL入库更方便!_flink自定义connector

flink自定义connector

系列文章目录

Flink使用指南:Flink设置全局变量,并在函数中获取,让你的代码更加优雅!

Flink使用指南:Checkpoint机制,完全搞懂了,你就是大佬!

Flink使用指南: 面试必问内存管理模型,进大厂一定要知道!

Flink使用指南: Kafka流表关联HBase维度表

Flink使用指南: Watermark新版本使用

Flink使用指南: Flink SQL自定义函数
 


前言

最近的工作主要是在向实时计算平台方向迁移,之前的实时计算任务都是用Flink DataStream API开发的,对于DBA或者分析人员来说,可能开发代码能难度太大,所以我们打算把API封装好做成Flink SQL对外提供服务。那么其中就要涉及到一些是社区不提供的某些功能,比如与业务紧密结合的自定义函数,比如一些Source和Sink的连接器。

下面我就给大家讲一讲 如何基于Flink1.11.0版本之后开发一个自定义的Flink SQL Connector。

一. 定义动态工厂类

自定义Factory继承DynamicTableSinkFactory,DynamicTableSourceFactory接口,支持读取和写入两种功能。

拿DynamicTableSinkFactory接口来说,需要实现以下几种方法:

  1. @Override
  2. public DynamicTableSink createDynamicTableSink(Context context) {
  3. return null;
  4. }
  5. @Override
  6. public String factoryIdentifier() {
  7. return null;
  8. }
  9. @Override
  10. public Set<ConfigOption<?>> requiredOptions() {
  11. return null;
  12. }
  13. @Override
  14. public Set<ConfigOption<?>> optionalOptions() {
  15. return null;
  16. }

factoryIdentifier():返回connecotor的名字
requiredOptions():必填参数 (比如URL,TABLE_NAME,USERNAME,PASSWORD)
optionalOptions(): 可选参数(比如提交方式,提交批次等)
createDynamicTableSink(): 创建TableSink对象,返回一个 DynamicTableSink

二.定义Connector Sink类

继承DynamicTableSink接口,实现以下方法:

  1. @Override
  2. public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
  3. return null;
  4. }
  5. @Override
  6. public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
  7. return null;
  8. }
  9. @Override
  10. public DynamicTableSink copy() {
  11. return null;
  12. }
  13. @Override
  14. public String asSummaryString() {
  15. return null;
  16. }
getChangelogMode(): 写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化:
INSERT_ONLY = newBuilder().addContainedKind(RowKind.INSERT).build();
具体实现,看下源码就知道了....
getSinkRuntimeProvider(): Sink端输入的主要提供者,这里可以实现一个自定义的OutputFormat继承,当然也有人直接继承RichSinkFunction..这个和dataStream API实现写入方式就很像了。
RichOutputFormat<RowData>主要是写入逻辑的实现。
copy():返回一个DynamicTableSink
asSummaryString(): 接口功能总结,可以概述下这个spi主要是xxxx Sink

三.定义Connector Source类

这里着重提一下Source端的自定义Connector,它是需要继承LookupTableSource接口,实现Lookup功能才能当做Source端使用。

  1. @Override
  2. public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
  3. return null;
  4. }
  5. @Override
  6. public DynamicTableSource copy() {
  7. return null;
  8. }
  9. @Override
  10. public String asSummaryString() {
  11. return null;
  12. }
getLookupRuntimeProvider(): 从社区已经提供的Connector中可以看到如果想实现一个Lookup功能,需要继承TableFunction<RowData>接口.

四.加载方式

Flink SQL的自定义Connector主要是用Java的SPI方式可以加载到应用服务当中,什么是SPI机制能,这里做个知识点记录下:

1.什么是SPI

     SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的接口,它可以用来启用框架扩展和替换组件。 SPI的作用就是为这些被扩展的API寻找服务实现。

2.SPI和API的使用场景

    API (Application Programming Interface)在大多数情况下,都是实现方制定接口并完成对接口的实现,调用方仅仅依赖接口调用,且无权选择不同实现。 从使用人员上来说,API 直接被应用开发人员使用。

    SPI (Service Provider Interface)调用方来制定接口规范,提供给外部来实现,调用方在调用时则选择自己需要的外部实现。  从使用人员上来说,SPI 被框架扩展人员使用。

 所以根据SPI机制的使用机制,需要在resources目录下新建META-INF.services新建文件

org.apache.flink.table.factories.Factory

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. org.apache.flink.connector.jdbc.table.AdbpgDynamicTableFactory

文件中添加工厂类的路径就行。 


总结

以上文章讲了下 Flink SQL 自定义Connector实现方式,这样我们就可以用原生的Flink SQL语法进行业务逻辑开发,用自定义Connector实现入库。这种开发方式是不是很方便??

大家还想学什么,欢迎留言,关注我,不迷路,大家一起学习大数据。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/886375
推荐阅读
相关标签
  

闽ICP备14008679号