赞
踩
package com.com.cdc import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object KafkaJson { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build // 使用blink创建流式table环境变量 val sTableEnv = StreamTableEnvironment.create(env, bsSettings) // 定义DDL val kfkSql = s""" |CREATE TABLE kafka_table ( | common ROW(uid string), | ts BIGINT |) WITH ( | 'connector' = 'kafka', | 'topic' = 'test', | 'properties.bootstrap.servers' = 'jeff200:9092', | 'format' = 'json', | 'scan.startup.mode' = 'earliest-offset' |) |""".stripMargin // 创建虚拟流表 sTableEnv.executeSql(kfkSql) val filterSql = s""" |SELECT uid, ts |FROM kafka_table |WHERE uid > 0 """.stripMargin // 执行sql查询 val table: Table = sTableEnv.sqlQuery(filterSql) // 追加流返回DataStream[Row] val kafkaStream:DataStream[Row] = sTableEnv.toAppendStream(table) kafkaStream.print() env.execute("kafka cdc") } }
<!-- flink json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>${scope}</scope> </dependency> <!-- flink table api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <scope>${scope}</scope> </dependency>
{"ts": 1607003021, "common": {"uid": 1}, "displays": [{"item": 1, "type": "goods"},{"item": 2, "type": "goods"}]}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。