当前位置:   article > 正文

Flink--Table API UDF函数之表函数_lateral table

lateral table

首先自定义一个类,继承TableFunction类,返回值类型由TableFunction的泛型决定,由 protected collect 来发送结果数据。

在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。

在SQL 中,则需要 lateral table 来一起使用

使用起来的结果有点类似 hive 的 lateral view 

  1. /**
  2. * @program: flink2021
  3. * @description: ${description}
  4. * @author: Mr.G
  5. * @create: 2021-09-27 14:23
  6. **/
  7. package com.ct.day08
  8. import com.ct.day01.SensorSource
  9. import org.apache.flink.streaming.api.scala._
  10. import org.apache.flink.table.api._
  11. import org.apache.flink.table.api.scala._
  12. import org.apache.flink.table.functions.TableFunction
  13. import org.apache.flink.types.Row
  14. /**
  15. * @ClassName: TableFunctionExample
  16. * @Description: ${description}
  17. * @Author Mr.G
  18. * @Date 2021/9/27
  19. * @Version 1.0
  20. *
  21. */
  22. object TableFunctionExample {
  23. def main(args: Array[String]): Unit = {
  24. val env = StreamExecutionEnvironment.getExecutionEnvironment
  25. env.setParallelism(1)
  26. val stream = env.fromElements(
  27. "hello#world",
  28. "hi#flink"
  29. )
  30. val settings = EnvironmentSettings.newInstance()
  31. .inStreamingMode()
  32. .useBlinkPlanner()
  33. .build()
  34. val tEnv = StreamTableEnvironment.create(env,settings)
  35. val table = tEnv.fromDataStream(stream,'s)
  36. val mySplit = new split("#")
  37. table
  38. .leftOuterJoinLateral(mySplit('s) as ('word,'length))
  39. //上下两种写法等价 类似 hive的 lateral view
  40. // .joinLateral(mySplit('s) as ('word,'length))
  41. .select('s,'word,'length)
  42. .toAppendStream[Row]
  43. // .print()
  44. tEnv.createTemporaryView("test",table)
  45. tEnv.registerFunction("mySplit",mySplit)
  46. tEnv.sqlQuery("select s,word,length from test,lateral table(mySplit(s)) as T(word,length)"
  47. )
  48. .toAppendStream[Row]
  49. .print()
  50. env.execute()
  51. }
  52. //TableFunction后边的泛型是返回值类型
  53. class split(sep : String) extends TableFunction[(String,Int)]{
  54. def eval(s : String) : Unit={
  55. //使用 collect 输出结果
  56. s.split(sep).foreach(x=>collect((x,x.length)))
  57. }
  58. }
  59. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/761793
推荐阅读
相关标签
  

闽ICP备14008679号