当前位置:   article > 正文

flink1.12 kafka升级记录

flink1.12 kafka升级记录

在1.12之前的做法

在1.12之前 flinksql是不支持kafka 多个topic的设置, 所以之前使用的时候简单对官方kafka连接器中kafkaDynamicSource进行封装,让其支持多个topic的设置

对应文章是1.11.0 flinksql自定义kafka源(支持多个topic)

现在1.12版本中, kafka做了一些改动,如下

1.12多topic的支持(源码调试查看)

比如KafkaOptions中TOPIC的类型是List<String>

分隔符是什么呢?因为tablesql中的配置是字符串类型,所以支持多topic肯定是会有一定的规定.

查看KafkaDynamicTableFactory的createDynamicTableSource中创建kafka源,构造方法中需要传入topic

进入 KafkaOptions.getSourceTopics(tableOptions)

进入 getOptional(TOPIC)

调试可以发现进入 ConfigurationUtils.convertToList(v, clazz),再进入就能看到他是用分号切分的

1.12 多topic的支持(官网查看)

kafka官网配置

跟我们查看源码时对应上了

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/44115
推荐阅读
相关标签
  

闽ICP备14008679号