序
本文主要研究一下flink的TableFunction
实例
- // The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
- public class Split extends TableFunction<Tuple2<String, Integer>> {
- private String separator = " ";
-
- public Split(String separator) {
- this.separator = separator;
- }
-
- public void eval(String str) {
- for (String s : str.split(separator)) {
- // use collect(...) to emit a row
- collect(new Tuple2<String, Integer>(s, s.length()));
- }
- }
- }
-
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
- Table myTable = ... // table schema: [a: String]
-
- // Register the function.
- tableEnv.registerFunction("split", new Split("#"));
-
- // Use the table function in the Java Table API. "as" specifies the field names of the table.
- myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
- .select("a, word, length");
- myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
- .select("a, word, length");
-
- // Use the table function in SQL with LATERAL and TABLE keywords.
- // CROSS JOIN a table function (equivalent to "join" in Table API).
- tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
- // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
- tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
- 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据
UserDefinedFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala
- abstract class UserDefinedFunction extends Serializable {
- /**
- * Setup method for user-defined function. It can be used for initialization work.
- *
- * By default, this method does nothing.
- */
- @throws(classOf[Exception])
- def open(context: FunctionContext): Unit = {}
-
- /**
- * Tear-down method for user-defined function. It can be used for clean up work.
- *
- * By default, this method does nothing.
- */
- @throws(classOf[Exception])
- def close(): Unit = {}
-
- /**
- * @return true if and only if a call to this function is guaranteed to always return
- * the same result given the same parameters; true is assumed by default
- * if user's function is not pure functional, like random(), date(), now()...
- * isDeterministic must return false
- */
- def isDeterministic: Boolean = true
-
- final def functionIdentifier: String = {
- val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
- getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
- }
-
- /**
- * Returns the name of the UDF that is used for plan explain and logging.
- */
- override def toString: String = getClass.getSimpleName
-
- }
- UserDefinedFunction定义了open、close、functionIdentifier方法
TableFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala
- abstract class TableFunction[T] extends UserDefinedFunction {
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Emit an output row.
- *
- * @param row the output row
- */
- protected def collect(row: T): Unit = {
- collector.collect(row)
- }
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * The code generated collector used to emit row.
- */
- private var collector: Collector[T] = _
-
- /**
- * Internal use. Sets the current collector.
- */
- private[flink] final def setCollector(collector: Collector[T]): Unit = {
- this.collector = collector
- }
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns the result type of the evaluation method with a given signature.
- *
- * This method needs to be overridden in case Flink's type extraction facilities are not
- * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
- * method. Flink's type extraction facilities can handle basic types or
- * simple POJOs but might be wrong for more complex, custom, or composite types.
- *
- * @return [[TypeInformation]] of result type or null if Flink should determine the type
- */
- def getResultType: TypeInformation[T] = null
-
- /**
- * Returns [[TypeInformation]] about the operands of the evaluation method with a given
- * signature.
- *
- * In order to perform operand type inference in SQL (especially when NULL is used) it might be
- * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
- * By default Flink's type extraction facilities are used for this but might be wrong for
- * more complex, custom, or composite types.
- *
- * @param signature signature of the method the operand types need to be determined
- * @return [[TypeInformation]] of operand types
- */
- def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
- signature.map { c =>
- try {
- TypeExtractor.getForClass(c)
- } catch {
- case ite: InvalidTypesException =>
- throw new ValidationException(
- s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
- s"automatically determined. Please provide type information manually.")
- }
- }
- }
-
- }
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法
ProcessOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java
- @Internal
- public class ProcessOperator<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- private transient ContextImpl context;
-
- /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
- private long currentWatermark = Long.MIN_VALUE;
-
- public ProcessOperator(ProcessFunction<IN, OUT> function) {
- super(function);
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
-
- context = new ContextImpl(userFunction, getProcessingTimeService());
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.processElement(element.getValue(), context, collector);
- context.element = null;
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- super.processWatermark(mark);
- this.currentWatermark = mark.getTimestamp();
- }
-
- //......
- }
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner
CRowCorrelateProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
- class CRowCorrelateProcessRunner(
- processName: String,
- processCode: String,
- collectorName: String,
- collectorCode: String,
- @transient var returnType: TypeInformation[CRow])
- extends ProcessFunction[CRow, CRow]
- with ResultTypeQueryable[CRow]
- with Compiler[Any]
- with Logging {
-
- private var function: ProcessFunction[Row, Row] = _
- private var collector: TableFunctionCollector[_] = _
- private var cRowWrapper: CRowWrappingCollector = _
-
- override def open(parameters: Configuration): Unit = {
- LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
- LOG.debug("Instantiating TableFunctionCollector.")
- collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
- this.cRowWrapper = new CRowWrappingCollector()
-
- LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
- val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
- val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
- LOG.debug("Instantiating ProcessFunction.")
- function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
- FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
- FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
- FunctionUtils.openFunction(collector, parameters)
- FunctionUtils.openFunction(function, parameters)
- }
-
- override def processElement(
- in: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
- out: Collector[CRow])
- : Unit = {
-
- cRowWrapper.out = out
- cRowWrapper.setChange(in.change)
-
- collector.setCollector(cRowWrapper)
- collector.setInput(in.row)
- collector.reset()
-
- function.processElement(
- in.row,
- ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
- cRowWrapper)
- }
-
- override def getProducedType: TypeInformation[CRow] = returnType
-
- override def close(): Unit = {
- FunctionUtils.closeFunction(collector)
- FunctionUtils.closeFunction(function)
- }
- }
- CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
小结
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
- 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法