当前位置:   article > 正文

Flink自定义connector官方例子记录

flink自定义connector

Flink自定义connector官方例子Socket


前言

本次记录flinksql的官方记录的socket自定义连接器的实现。

具体步骤

1.整体概括

在这里插入图片描述
如图:
接触一个完整的connector的总共大概分为三个主要步骤

运行时

这里其实可以从这里先接入,我们需要在运行时提供出真正的SourceFunction来真正的获取数据源头,而相应的,我们的数据源头的数据源如何解析也就是自定义的反序列化器的实现

Planning 计划阶段

这里其实真正是启到承上启下的作用,这里实际调用我们运行时提供的数据源以及真正的自定义反序列化器的实现,无非注意这里还有模式的实现

Factory阶段

这里是最原始的工厂,这里我们相当于对catalog里面定义的表拿到后我们从中解析出我们的参数以及操作后,实际调用的是planning计划里面的,但是最本质的其实还是运行时的数据源提供以及自定义的反序列化器。

2.以Test例子Debug记录

测试案例
在这里插入图片描述

2.0

在这里插入图片描述
可以看到这里首先类加载器获取到八个类加载工厂。可以看到我们自定义的类工厂了。

2.1

在这里插入图片描述

2.2

在这里插入图片描述
可以看到这里发现factory找到我们自定义的factory

2.4

在这里插入图片描述
这里看到我们拿到我们catalog里面的定义的表
tableSchema
在这里插入图片描述
resolvedSchema
在这里插入图片描述

2.5

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
分别加载两个options的对应的key
在这里插入图片描述

2.6

在这里插入图片描述
获取校验帮助类

2.7

这里是找到我们的ChangelogCsvFormatFactory的类。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里还是和之前一样,两次加载options

2.8

在这里插入图片描述

在这里插入图片描述
都加载完了
在这里插入图片描述
跳到这个类,这里就是动态加载到runntimedecoder。

在这里插入图片描述

在这里插入图片描述
跳到这里
在这里插入图片描述
在这里插入图片描述
跳到这里就看到我们的running中的provider里面拿到我们的源端数据和反序列化器。
在这里插入图片描述
之后这里就是拿到我们的changelogmode的模式,

2.9


这里就是对sql语句进行转换的操作
在这里插入图片描述
后面就是解析sql的具体操作了。

总结

以上就是自定义的connector Socket的简单的一个思路!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/886372
推荐阅读
相关标签
  

闽ICP备14008679号