当前位置:   article > 正文

Flink 1.11 SQL 快速上手,内含Demo及详细分析和使用过程,亲测可行!_flink sql pom

flink sql pom

Flink各个版本之间的API有比较大的gap,笔者将程序从Flink 1.7升级到Flink 1.11时,中间遇到了很多小问题。这里,给出一个使用Flink 1.11版本SQL API使用demo,并对需要注意的点和编写过程进行详细说明。

1. 需引入的pom依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- Table API and SQL components -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>${flink.version}</version>
    <type>pom</type>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

上面两个是Flink API和Table&SQL开发基础依赖,没什么好说的,新建项目时引入就对了。

<!-- blink planner -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- flink planner -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Flink 1.9 到 Flink 1.11 版本中,Flink old Planner和Blink Planner并存,且Flink 1.11将Blink Planner设置为默认Planner。因此开发的时候,需要根据需要引入其一,或两个都引入也可以。

注意:从maven上直接拷贝下来的pom依赖的设置为provided,本地执行flink程序时,会报找不到相应planner。删掉<scope>provided</scope>即可。

Exception in thread “main” org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for ‘org.apache.flink.table.delegation.ExecutorFactory’ in
the classpath.

<!-- use the Table API & SQL for defining pipelines -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用Flink Table API和SQL API时需要适配不同语言的pipelines,因此需要根据自己实际开发语言引入bridge依赖,这里笔者使用java开发。当使用scala开发时,则引入相应scala-bridge。

<!-- kafka connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- flink-sql-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

实时开发时使用最多的数据源就是kafka,上面两个依赖分别为Flink API 和Flink SQL 使用kafka的connector。

注意:要注意flink与kafka版本适配,flink 1.11只支持kafka 10和11,否则程序运行时会报错。

<!-- flink-json -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用Flink的format格式解析文本时,需要引入相应format依赖。笔者程序中读取的数据为json格式,因此引入flink-json的依赖。

2. Flink SQL批处理Demo

话不多说,先给代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/396810
推荐阅读
相关标签