当前位置:   article > 正文

flink读取kafka实时数据sink到mysql(scala版)_scala+flink 读取kafka数据打印到控制台

scala+flink 读取kafka数据打印到控制台


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">











        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->









            <!-- 该插件用于将Scala代码编译成class文件 -->
                        <!-- 声明绑定到maven的compile阶段 -->

package com.mo.flinkTable

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors._

object kafkaTomysql {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val blinkStreamSettings = EnvironmentSettings.newInstance()
    val tableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)

//    val tableEnv = StreamTableEnvironment.create(env)

        tableEnv.connect(new Kafka()
          .property("zookeeper.connect", "hadoop102:2181")
          .property("zookeeper.connect", "hadoop103:2181")
          .property("zookeeper.connect", "hadoop104:2181")
          .property("bootstrap.servers", "hadoop102:9092")
          .property("bootstrap.servers", "hadoop103:9092")
          .property("bootstrap.servers", "hadoop104:9092")
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("Temp", DataTypes.DOUBLE())

    val Result : Table = tableEnv.from("kafkaInputTable")
    val kafkaEndResult = Result.select("id , Temp")
      .filter("id == '1' ")

    val sinkDDL : String =
        |create table kafkaOutputTable (
        | id varchar ,
        | temp double
        |) with (
        | 'connector.type' = 'jdbc',
        | 'connector.url' = 'jdbc:mysql://localhost:3306/test',
        | 'connector.table' = 'thermometer',
        | 'connector.driver' = 'com.mysql.jdbc.Driver',
        | 'connector.username' = 'root',
        | 'connector.password' = '123456'

    tableEnv.sqlUpdate(sinkDDL) //执行DDL创建kafkaOutputTable表
    env.execute("kafka to mysql test")


