赞
踩
pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mo</groupId> <artifactId>Flink</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.0</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
需求:统计数据流中ID为1的温度计和相对应的温度值
数据流(id,timeStamp,Temp)=>(id,Temp)
package com.mo.flinkTable import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors._ object kafkaTomysql { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val blinkStreamSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env, blinkStreamSettings) // val tableEnv = StreamTableEnvironment.create(env) tableEnv.connect(new Kafka() .version("0.11") .topic("thermometer") .property("zookeeper.connect", "hadoop102:2181") .property("zookeeper.connect", "hadoop103:2181") .property("zookeeper.connect", "hadoop104:2181") .property("bootstrap.servers", "hadoop102:9092") .property("bootstrap.servers", "hadoop103:9092") .property("bootstrap.servers", "hadoop104:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("Temp", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") val Result : Table = tableEnv.from("kafkaInputTable") val kafkaEndResult = Result.select("id , Temp") .filter("id == '1' ") kafkaEndResult.toAppendStream[(String,Double)].print("kafkaDataTest") val sinkDDL : String = """ |create table kafkaOutputTable ( | id varchar , | temp double |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/test', | 'connector.table' = 'thermometer', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |) """.stripMargin //注意thermometer是真正存在mysql中的表,而kafkaOutputTable只是在流式环境中存在的与mysql中所对应的表 tableEnv.sqlUpdate(sinkDDL) //执行DDL创建kafkaOutputTable表 kafkaEndResult.insertInto("kafkaOutputTable") env.execute("kafka to mysql test") } }
liunx上开启zoookeeper和kafka,创建一个thermometer主题,注意mysql要提前创建好代码中定义好的thermometer表和相应字段,运行代码可以看到kafka的数据已经实时的存在了mysql中.
.w
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。