赞
踩
本次记录flinksql的官方记录的socket自定义连接器的实现。
如图:
接触一个完整的connector的总共大概分为三个主要步骤
这里其实可以从这里先接入,我们需要在运行时提供出真正的SourceFunction来真正的获取数据源头,而相应的,我们的数据源头的数据源如何解析也就是自定义的反序列化器的实现
这里其实真正是启到承上启下的作用,这里实际调用我们运行时提供的数据源以及真正的自定义反序列化器的实现,无非注意这里还有模式的实现
这里是最原始的工厂,这里我们相当于对catalog里面定义的表拿到后我们从中解析出我们的参数以及操作后,实际调用的是planning计划里面的,但是最本质的其实还是运行时的数据源提供以及自定义的反序列化器。
测试案例
可以看到这里首先类加载器获取到八个类加载工厂。可以看到我们自定义的类工厂了。
可以看到这里发现factory找到我们自定义的factory
这里看到我们拿到我们catalog里面的定义的表
tableSchema
resolvedSchema
分别加载两个options的对应的key
获取校验帮助类
这里是找到我们的ChangelogCsvFormatFactory的类。
这里还是和之前一样,两次加载options
都加载完了
跳到这个类,这里就是动态加载到runntimedecoder。
跳到这里
跳到这里就看到我们的running中的provider里面拿到我们的源端数据和反序列化器。
之后这里就是拿到我们的changelogmode的模式,
这里就是对sql语句进行转换的操作
后面就是解析sql的具体操作了。
以上就是自定义的connector Socket的简单的一个思路!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。