赞
踩
最近工作中需要自定义开发一些flink sql的connector,因为官方提供的connector毕竟有限,在我们工作中可能会用到各种各样的中间件。所以官方没有提供的就需要我们自定义开发。
就是如:
CREATE TABLE XXX(
A STRING,
B BIGINT)
WITH(
‘connect.type’ = ‘kafka’,
…
)
所以开发一个自己的connector需要做哪些,本文就来总结一下开发的主要步骤,以及我遇到的问题怎么解决的。
自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory
根据需要继承ConnectorDescriptorValidator,定义自己的connector参数(with 后面跟的那些)
Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们
需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract
重写consumeDataStream方法
以上5步基本上可以写一个简单的sql-connector了
异常信息:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.address=localhost:9091
connector.job=testJob
connector.metrics=testMetrics
connector.type=xxxx
schema.0.data-type=ROW<`val` DOUBLE>
schema.0.name=value
有两个问题都导致上面的报错
解决方法:
添加如下目录及文件
解决方法:在自定义Factory#supportedProperties方法中添加schema的配置
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
2.序列化问题
异常信息:
xxx类不能被序列化
原因:开始在我的sinkFunction中有个TableSchema属性,该属性不能被序列化,TableSchema我是用来获取字段信息的,后来直接改成了fieldName数组,从TableSchema.getFieldNames()获取。
改完后又报了我使用的Util类不能序列化,我把util实现了Serializable接口解决
3.sink类型不匹配问题
异常如下:
org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。