赞
踩
Flink各个版本之间的API有比较大的gap,笔者将程序从Flink 1.7升级到Flink 1.11时,中间遇到了很多小问题。这里,给出一个使用Flink 1.11版本SQL API使用demo,并对需要注意的点和编写过程进行详细说明。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API and SQL components -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
</dependency>
上面两个是Flink API和Table&SQL开发基础依赖,没什么好说的,新建项目时引入就对了。
<!-- blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Flink 1.9 到 Flink 1.11 版本中,Flink old Planner和Blink Planner并存,且Flink 1.11将Blink Planner设置为默认Planner。因此开发的时候,需要根据需要引入其一,或两个都引入也可以。
注意:从maven上直接拷贝下来的pom依赖的设置为provided,本地执行flink程序时,会报找不到相应planner。删掉<scope>provided</scope>即可。
Exception in thread “main” org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for ‘org.apache.flink.table.delegation.ExecutorFactory’ in
the classpath.
<!-- use the Table API & SQL for defining pipelines -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
使用Flink Table API和SQL API时需要适配不同语言的pipelines,因此需要根据自己实际开发语言引入bridge依赖,这里笔者使用java开发。当使用scala开发时,则引入相应scala-bridge。
<!-- kafka connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
实时开发时使用最多的数据源就是kafka,上面两个依赖分别为Flink API 和Flink SQL 使用kafka的connector。
注意:要注意flink与kafka版本适配,flink 1.11只支持kafka 10和11,否则程序运行时会报错。
<!-- flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
使用Flink的format格式解析文本时,需要引入相应format依赖。笔者程序中读取的数据为json格式,因此引入flink-json的依赖。
话不多说,先给代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。