当前位置:   article > 正文

Flink sql cdc 读取kafka json格式数据_flinkcdc 读取kafka

flinkcdc 读取kafka
  • Flink CDC 读取kafka json格式数据
package com.com.cdc
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object KafkaJson {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
    // 使用blink创建流式table环境变量
    val sTableEnv = StreamTableEnvironment.create(env, bsSettings)
    // 定义DDL
    val kfkSql =
      s"""
         |CREATE TABLE kafka_table (
         | common ROW(uid string),
         | ts BIGINT
         |) WITH (
         | 'connector' = 'kafka',
         | 'topic' = 'test',
         | 'properties.bootstrap.servers' = 'jeff200:9092',
         | 'format' = 'json',
         | 'scan.startup.mode' = 'earliest-offset'
         |)
         |""".stripMargin
    // 创建虚拟流表
    sTableEnv.executeSql(kfkSql)
    val filterSql =
      s"""
         |SELECT uid, ts
         |FROM kafka_table
         |WHERE uid > 0
       """.stripMargin
    // 执行sql查询
    val table: Table = sTableEnv.sqlQuery(filterSql)
    // 追加流返回DataStream[Row]
    val kafkaStream:DataStream[Row] = sTableEnv.toAppendStream(table)
    kafkaStream.print()

    env.execute("kafka cdc")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 核心依赖
<!-- flink json -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
<!-- flink table api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • kafka test 数据和运行结果
{"ts": 1607003021, "common": {"uid": 1}, "displays": [{"item": 1, "type": "goods"},{"item": 2, "type": "goods"}]}
  • 1

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/836384
推荐阅读
相关标签
  

闽ICP备14008679号