当前位置:   article > 正文

聊聊flink的TableFunction

flink table function collect

本文主要研究一下flink的TableFunction

实例

  1. // The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
  2. public class Split extends TableFunction<Tuple2<String, Integer>> {
  3. private String separator = " ";
  4. public Split(String separator) {
  5. this.separator = separator;
  6. }
  7. public void eval(String str) {
  8. for (String s : str.split(separator)) {
  9. // use collect(...) to emit a row
  10. collect(new Tuple2<String, Integer>(s, s.length()));
  11. }
  12. }
  13. }
  14. BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  15. Table myTable = ... // table schema: [a: String]
  16. // Register the function.
  17. tableEnv.registerFunction("split", new Split("#"));
  18. // Use the table function in the Java Table API. "as" specifies the field names of the table.
  19. myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
  20. .select("a, word, length");
  21. myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
  22. .select("a, word, length");
  23. // Use the table function in SQL with LATERAL and TABLE keywords.
  24. // CROSS JOIN a table function (equivalent to "join" in Table API).
  25. tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
  26. // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
  27. tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
  • 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据

UserDefinedFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala

  1. abstract class UserDefinedFunction extends Serializable {
  2. /**
  3. * Setup method for user-defined function. It can be used for initialization work.
  4. *
  5. * By default, this method does nothing.
  6. */
  7. @throws(classOf[Exception])
  8. def open(context: FunctionContext): Unit = {}
  9. /**
  10. * Tear-down method for user-defined function. It can be used for clean up work.
  11. *
  12. * By default, this method does nothing.
  13. */
  14. @throws(classOf[Exception])
  15. def close(): Unit = {}
  16. /**
  17. * @return true if and only if a call to this function is guaranteed to always return
  18. * the same result given the same parameters; true is assumed by default
  19. * if user's function is not pure functional, like random(), date(), now()...
  20. * isDeterministic must return false
  21. */
  22. def isDeterministic: Boolean = true
  23. final def functionIdentifier: String = {
  24. val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
  25. getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
  26. }
  27. /**
  28. * Returns the name of the UDF that is used for plan explain and logging.
  29. */
  30. override def toString: String = getClass.getSimpleName
  31. }
  • UserDefinedFunction定义了open、close、functionIdentifier方法

TableFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala

  1. abstract class TableFunction[T] extends UserDefinedFunction {
  2. // ----------------------------------------------------------------------------------------------
  3. /**
  4. * Emit an output row.
  5. *
  6. * @param row the output row
  7. */
  8. protected def collect(row: T): Unit = {
  9. collector.collect(row)
  10. }
  11. // ----------------------------------------------------------------------------------------------
  12. /**
  13. * The code generated collector used to emit row.
  14. */
  15. private var collector: Collector[T] = _
  16. /**
  17. * Internal use. Sets the current collector.
  18. */
  19. private[flink] final def setCollector(collector: Collector[T]): Unit = {
  20. this.collector = collector
  21. }
  22. // ----------------------------------------------------------------------------------------------
  23. /**
  24. * Returns the result type of the evaluation method with a given signature.
  25. *
  26. * This method needs to be overridden in case Flink's type extraction facilities are not
  27. * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
  28. * method. Flink's type extraction facilities can handle basic types or
  29. * simple POJOs but might be wrong for more complex, custom, or composite types.
  30. *
  31. * @return [[TypeInformation]] of result type or null if Flink should determine the type
  32. */
  33. def getResultType: TypeInformation[T] = null
  34. /**
  35. * Returns [[TypeInformation]] about the operands of the evaluation method with a given
  36. * signature.
  37. *
  38. * In order to perform operand type inference in SQL (especially when NULL is used) it might be
  39. * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
  40. * By default Flink's type extraction facilities are used for this but might be wrong for
  41. * more complex, custom, or composite types.
  42. *
  43. * @param signature signature of the method the operand types need to be determined
  44. * @return [[TypeInformation]] of operand types
  45. */
  46. def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
  47. signature.map { c =>
  48. try {
  49. TypeExtractor.getForClass(c)
  50. } catch {
  51. case ite: InvalidTypesException =>
  52. throw new ValidationException(
  53. s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
  54. s"automatically determined. Please provide type information manually.")
  55. }
  56. }
  57. }
  58. }
  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法

ProcessOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java

  1. @Internal
  2. public class ProcessOperator<IN, OUT>
  3. extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
  4. implements OneInputStreamOperator<IN, OUT> {
  5. private static final long serialVersionUID = 1L;
  6. private transient TimestampedCollector<OUT> collector;
  7. private transient ContextImpl context;
  8. /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
  9. private long currentWatermark = Long.MIN_VALUE;
  10. public ProcessOperator(ProcessFunction<IN, OUT> function) {
  11. super(function);
  12. chainingStrategy = ChainingStrategy.ALWAYS;
  13. }
  14. @Override
  15. public void open() throws Exception {
  16. super.open();
  17. collector = new TimestampedCollector<>(output);
  18. context = new ContextImpl(userFunction, getProcessingTimeService());
  19. }
  20. @Override
  21. public void processElement(StreamRecord<IN> element) throws Exception {
  22. collector.setTimestamp(element);
  23. context.element = element;
  24. userFunction.processElement(element.getValue(), context, collector);
  25. context.element = null;
  26. }
  27. @Override
  28. public void processWatermark(Watermark mark) throws Exception {
  29. super.processWatermark(mark);
  30. this.currentWatermark = mark.getTimestamp();
  31. }
  32. //......
  33. }
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner

CRowCorrelateProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

  1. class CRowCorrelateProcessRunner(
  2. processName: String,
  3. processCode: String,
  4. collectorName: String,
  5. collectorCode: String,
  6. @transient var returnType: TypeInformation[CRow])
  7. extends ProcessFunction[CRow, CRow]
  8. with ResultTypeQueryable[CRow]
  9. with Compiler[Any]
  10. with Logging {
  11. private var function: ProcessFunction[Row, Row] = _
  12. private var collector: TableFunctionCollector[_] = _
  13. private var cRowWrapper: CRowWrappingCollector = _
  14. override def open(parameters: Configuration): Unit = {
  15. LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
  16. val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
  17. LOG.debug("Instantiating TableFunctionCollector.")
  18. collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
  19. this.cRowWrapper = new CRowWrappingCollector()
  20. LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
  21. val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
  22. val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
  23. LOG.debug("Instantiating ProcessFunction.")
  24. function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
  25. FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
  26. FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
  27. FunctionUtils.openFunction(collector, parameters)
  28. FunctionUtils.openFunction(function, parameters)
  29. }
  30. override def processElement(
  31. in: CRow,
  32. ctx: ProcessFunction[CRow, CRow]#Context,
  33. out: Collector[CRow])
  34. : Unit = {
  35. cRowWrapper.out = out
  36. cRowWrapper.setChange(in.change)
  37. collector.setCollector(cRowWrapper)
  38. collector.setInput(in.row)
  39. collector.reset()
  40. function.processElement(
  41. in.row,
  42. ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
  43. cRowWrapper)
  44. }
  45. override def getProducedType: TypeInformation[CRow] = returnType
  46. override def close(): Unit = {
  47. FunctionUtils.closeFunction(collector)
  48. FunctionUtils.closeFunction(function)
  49. }
  50. }
  • CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

小结

  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
  • 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

doc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/761844
推荐阅读
相关标签
  

闽ICP备14008679号