当前位置:   article > 正文

Flink实战之自定义flink sql connector

Flink实战之自定义flink sql connector

背景

最近工作中需要自定义开发一些flink sql的connector,因为官方提供的connector毕竟有限,在我们工作中可能会用到各种各样的中间件。所以官方没有提供的就需要我们自定义开发。
就是如:
CREATE TABLE XXX(
A STRING,
B BIGINT)
WITH(
‘connect.type’ = ‘kafka’,

)
所以开发一个自己的connector需要做哪些,本文就来总结一下开发的主要步骤,以及我遇到的问题怎么解决的。

开发

  1. 自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory

  2. 根据需要继承ConnectorDescriptorValidator,定义自己的connector参数(with 后面跟的那些)

  3. Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们

  4. 需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract

重写consumeDataStream方法

  1. 自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。

以上5步基本上可以写一个简单的sql-connector了

问题

  1. 异常信息:

    org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
    the classpath.
    
    Reason: Required context properties mismatch.
    
    The following properties are requested:
    connector.address=localhost:9091
    connector.job=testJob
    connector.metrics=testMetrics
    connector.type=xxxx
    schema.0.data-type=ROW<`val` DOUBLE>
    schema.0.name=value
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    有两个问题都导致上面的报错

    1. discoverFactory时找不到我自定义的Factory

    解决方法:

    添加如下目录及文件

在这里插入图片描述

  1. 根据properties过滤时过滤掉了我的Factory,代码在TableFactoryService#filterBySupportedProperties

​ 解决方法:在自定义Factory#supportedProperties方法中添加schema的配置

		// schema
		properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
  • 1
  • 2
  • 3

2.序列化问题

异常信息:

xxx类不能被序列化

原因:开始在我的sinkFunction中有个TableSchema属性,该属性不能被序列化,TableSchema我是用来获取字段信息的,后来直接改成了fieldName数组,从TableSchema.getFieldNames()获取。

改完后又报了我使用的Util类不能序列化,我把util实现了Serializable接口解决

3.sink类型不匹配问题

异常如下:

org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.

	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
	at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号