当前位置:   article > 正文

Flink - Table API UDF --- 用户自定义函数_flink numeric

flink numeric

函数 (Functions)

  • Flink Table API 和 SQL 为用户提供了一组用于数据转换的内置函数

  • SQL中支持的很多函数,Table API和SQL 都已经做了实现

  • 比较函数

    • SQL:
      • value1 = value2
      • value1 > value2
    • Table API
      • ANY1 === ANY2
      • ANY1 > ANY2
  • 逻辑函数

    • SQL:
      • boolean1 OR boolean2
      • boolean IS FALSE
      • NOT boolean
    • Table API
      • BOOLEAN1 || BOOLEAN2
      • BOOLEAN.isFalse
      • IBOOLEAN
  • 算数函数

    • SQL:
      • numeric1 + numeric2
      • POWER(numeric1,numeric2)
    • Table API:
      • NUMERIC1 + NUMERIC2
      • NUMERIC1,pow(NUMERIC2)
  • 字符串函数

    • SQL:
      • string1 || string2
      • UPPER(string)
      • CHAR_LENGTH(string)
    • Table API:
      • STRING1 + STRING2
      • STRING.upperCase()
      • STRING.charLength()
  • 时间函数

    • SQL:
      • DATE string
      • TIMESTAMP string
      • CURRENT_TIME
      • INTERVAL string range
    • Table API:
      • STRING.toDate
      • STRING.toTimestamp
      • currentTime()
      • NUMERIC_days
      • NUMERIC_minutes
  • 聚合函数

    • SQL:
      • COUNT(*)
      • SUM(expression)
      • RANK()
      • ROW_NUMBER()
    • Table API:
      • FIELD.count
      • FIELD.sum()

用户自定义函数(UDF)

  • 用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们是显著地扩展了查询的表达能力
  • 在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
  • 函数通过调用registerFunction()方法在TableEnvironment的函数目录中,这样Table API或SQL 解析器就可以识别并正确地解释它

标量函数(Scalar Function)

  • 用户定义的标量函数,可以将0,1或多个标量值,映射到新的标量值

  • 为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function, 并实现(一个或多个)求值(eval)方法

  • 标量函数的行为由求值方法决定,求值的方法必须公开声明为eval

    public static class HashCode extends ScalarFunction{
    	private int factor = 13;
    	public HashCoded(int factor){
    		this.factor = factor;
    	}
    	public int eval(String s){
    		return s.hashCode() * factor;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 案例

    public static void main(String[] args) throws Exception {
         
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream<String> fileDataStream = env.readTextFile("data/temps.txt");
    
    
            DataStream<TempInfo> dataStream = fileDataStream.map(new MapFunction<String, TempInfo>() {
         
                @Override
                public TempInfo map(String value) throws Exception {
         
                    String[
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/761841
推荐阅读
相关标签
  

闽ICP备14008679号