赞
踩
1.生成运行时env
2.生成表环境
3.接上数据流,数据流数据生成表
4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册
5.查询表,可以根据注册表名查询
6.插入表,可以根据生成的flink表进行数据插入
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api.bridge.scala._
- import org.apache.flink.table.api._
- import org.apache.flink.table.api.{DataTypes, Table}
- import org.apache.flink.table.descriptors._
- object SqlReadMysql {
- def main(args: Array[String]): Unit = {
- // creat env
- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
-
- //parallelism
- bsEnv.setParallelism(1)
-
- //set env
- val bsSetting = EnvironmentSettings
- .newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build()
-
- //create table env
- val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)
-
- //create ds
- val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))
-
- val table1 = bsTableEnv.fromDataStream(dataStream)
-
- //create table
- val sinkDDL =
- """
- |create table student2_flink (
- |code varchar(20) null,
- |name varchar(20) null
- |)with(
- |'connector.type'='jdbc',
- |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
- |'connector.table'='student2',
- |'connector.driver'='com.mysql.jdbc.Driver',
- |'connector.username'='root',
- |'connector.password'='root'
- |)
- |""".stripMargin
- println(sinkDDL)
- // execute the create table sql
- bsTableEnv.executeSql(sinkDDL)
-
- //register table
- val myStudent = bsTableEnv.from("student2_flink")
-
- //execute query
- val result = bsTableEnv.sqlQuery(s"select * from $myStudent")
-
- result.toRetractStream[(String, String)].print()
-
- //insert data
- table1.executeInsert("student2_flink")
-
- //execute
- bsEnv.execute()
-
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.sinopharm.gksk</groupId> <artifactId>gksk-bigdata</artifactId> <version>1.0-SNAPSHOT</version> <name>gksk-bigdata</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.25</slf4j.version> <slf4j.api.version>1.7.25</slf4j.api.version> </properties> <!--Flink项目核心依赖--> <dependencies> <!--Flink Java 项目核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink scala项目核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink Table API 核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.12.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.14.4</version> </dependency> <!-- csv--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.14.4</version> </dependency> <!--以下用到什么引用什么--> <!--Flink Kafka依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink rocksdb状态后依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--本地测试核心依赖--> <!--Flink 本地测试客户端依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink 本地测试wei ui依赖 http://127.0.0.1:8081/ --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.12</artifactId> <version>1.14.4</version> <scope>runtime</scope> </dependency> <!--junit测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--日志输出--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> </resource> <resource> <directory>src/main/scala</directory> </resource> </resources> <plugins> <!--这里没引打包插件 需要的自己引用--> <!--Java compiler--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!--Java Compiler--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
原因:pom文件中缺少 planner
解决办法:添加
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.4</version> </dependency>
ps:注意有时候 配置两个planner也会报错
原因:缺少mysql的jar包
解决:pom文件添加:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency>
原因:URL没有指定时区,jdbc 6.0以上都有这个问题
解决:在URL后边加时区
'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
原因:连接的URL写错了
解决:好好看看,字符 、格式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。