赞
踩
在1.12之前 flinksql是不支持kafka 多个topic的设置, 所以之前使用的时候简单对官方kafka连接器中kafkaDynamicSource进行封装,让其支持多个topic的设置
对应文章是1.11.0 flinksql自定义kafka源(支持多个topic)
现在1.12版本中, kafka做了一些改动,如下
比如KafkaOptions中TOPIC的类型是List<String>
分隔符是什么呢?因为tablesql中的配置是字符串类型,所以支持多topic肯定是会有一定的规定.
查看KafkaDynamicTableFactory的createDynamicTableSource中创建kafka源,构造方法中需要传入topic
进入 KafkaOptions.getSourceTopics(tableOptions)
进入 getOptional(TOPIC)
调试可以发现进入 ConfigurationUtils.convertToList(v, clazz),再进入就能看到他是用分号切分的
跟我们查看源码时对应上了
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。