赞
踩
首先自定义一个类,继承TableFunction类,返回值类型由TableFunction的泛型决定,由 protected collect 来发送结果数据。
在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。
在SQL 中,则需要 lateral table 来一起使用
使用起来的结果有点类似 hive 的 lateral view
- /**
- * @program: flink2021
- * @description: ${description}
- * @author: Mr.G
- * @create: 2021-09-27 14:23
- **/
- package com.ct.day08
-
- import com.ct.day01.SensorSource
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api._
- import org.apache.flink.table.api.scala._
- import org.apache.flink.table.functions.TableFunction
- import org.apache.flink.types.Row
-
-
- /**
- * @ClassName: TableFunctionExample
- * @Description: ${description}
- * @Author Mr.G
- * @Date 2021/9/27
- * @Version 1.0
- *
- */
- object TableFunctionExample {
-
- def main(args: Array[String]): Unit = {
-
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
-
- val stream = env.fromElements(
- "hello#world",
- "hi#flink"
- )
-
-
- val settings = EnvironmentSettings.newInstance()
- .inStreamingMode()
- .useBlinkPlanner()
- .build()
-
- val tEnv = StreamTableEnvironment.create(env,settings)
-
- val table = tEnv.fromDataStream(stream,'s)
-
- val mySplit = new split("#")
-
- table
- .leftOuterJoinLateral(mySplit('s) as ('word,'length))
- //上下两种写法等价 类似 hive的 lateral view
- // .joinLateral(mySplit('s) as ('word,'length))
- .select('s,'word,'length)
- .toAppendStream[Row]
- // .print()
-
- tEnv.createTemporaryView("test",table)
-
- tEnv.registerFunction("mySplit",mySplit)
-
- tEnv.sqlQuery("select s,word,length from test,lateral table(mySplit(s)) as T(word,length)"
- )
- .toAppendStream[Row]
- .print()
-
-
- env.execute()
- }
-
-
- //TableFunction后边的泛型是返回值类型
- class split(sep : String) extends TableFunction[(String,Int)]{
- def eval(s : String) : Unit={
-
- //使用 collect 输出结果
- s.split(sep).foreach(x=>collect((x,x.length)))
-
- }
-
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。