当前位置:   article > 正文

Hive的UDAF与UDTF函数_udaf和udtf

udaf和udtf

Hive的UDAF与UDTF函数

hive 的 UDF函数说明

1、用户定义函数 UDF (一进一出)
2、用户定义聚集函数 UDAF (多进一出)
3、用户定义表生成函数UDTF (一进多出)

UDF函数作用于单个数据行,并且产生一个数据行作为输出,大多数的函数都属于UDF,比如数学函数、日期函数、字符函数等。
UDAF函数作用于多个输入行,并且产生一个输出数据行。比如group by后的count、max就是聚合函数。
UDTF函数 作用于单个输入行,并且产生多个输出行,这个多可能是0个也可能是多个。比如split、explode等函数。

官网链接:UDAFUDTF

UDAF函数

简介函数名返回类型函数描述
聚合函数count 、sum、avg、min、maxT需要group by后使用的聚合函数
列去重collect_set(col)array返回一组消除了重复元素的对象
列不去重collect_list(col)array返回具有重复项的对象列表
ntile(INTEGER x)INTEGER用于将分组数据按照顺序切分成n片,返回当前记录所在的切片值。
variance(col), var_pop(col)DOUBLE返回该列数字的方差
var_samp(col)DOUBLE返回组中数字列的无偏方差
stddev_pop(col)DOUBLE返回该数值列的标准差
stddev_samp(col)DOUBLE返回该数值列的无偏方差
covar_pop(col1, col2)DOUBLE返回一对数值列的总体协方差
covar_samp(col1, col2)DOUBLE返回一对数值列的样本协方差
corr(col1, col2)DOUBLE返回一对数值列的相关性系数
percentile(BIGINT col, p)DOUBLE返回一对数值列(整数)的第P百分位数(不适用与浮点类型)。P必须在0和1之间。注意: 只能为整数值计算真正的百分位数,如果输入的是非整数,请使用PERCENTILE_APPROX。
percentile(BIGINT col, array(p1 [, p2]…))array返回一对数值列(整数)的百分位数 p 1、 p 2、…(不适用与浮点类型)。P必须在0和1之间。注意: 只能为整数值计算真正的百分位数,如果输入的是非整数,请使用PERCENTILE_APPROX。
percentile_approx(DOUBLE col, p [, B])DOUBLE返回数值列(包括浮点类型)的近似第p百分位。B参数以内存为代价控制近似精度,较高的值会产生更好的近似值,默认值为10.000.当 col 中不同值的数量小于 B 时,这给出了精确的百分位值。
percentile_approx(DOUBLE col, array(p1 [, p2]…) [, B])array与上面相同,但接受并返回百分位值数组而不是单个值。
regr_avgx(independent, dependent)double相当于 avg(dependent)。从Hive 2.2.0 开始
regr_avgy(independent, dependent)double相当于 avg(independent)。从Hive 2.2.0 开始
regr_count(independent, dependent)double返回用于拟合线性回归线的非空对的数量。从Hive 2.2.0 开始
regr_intercept(independent, dependent)double返回线性回归线的 y 截距,即等式中 b 的值依赖 = a * independent+ b。从Hive 2.2.0 开始
regr_r2(independent, dependent)double返回回归的决定系数。从Hive 2.2.0 开始
regr_slope(independent, dependent)double返回线性回归线的斜率,即方程dependent = a * Independent + b 中a 的值。从Hive 2.2.0 开始
regr_sxx(independent, dependent)double相当于 regr_count(independent,dependent) * var_pop(dependent)。从Hive 2.2.0 开始
regr_sxy(independent, dependent)double相当于 regr_count(independent,dependent) * covar_pop(independent,dependent)。从Hive 2.2.0 开始
regr_syy(independent, dependent)double相当于 regr_count(independent,dependent) * var_pop(independent)。从Hive 2.2.0 开始
histogram_numeric(col, b)array<struct {‘x’,‘y’}>使用 b 个非均匀间隔的 容器计算组中数字列的直方图。输出是一个大小为b的双值(x,y)坐标数组,表示箱子中心和高度。

UDTF函数

lateral view函数常与 UDTF函数一起使用,后续会介绍lateral view 用法。

简介函数名返回类型函数描述
炸开数组explode(ARRAY a)T将一个array结构拆分成多行。
炸开Mapexplode(MAP<Tkey,Tvalue> m)Tkey,Tvalue将一个map结构拆分成多行,一行对应输入的每个键值对。
炸开数组,并输出数据的posposexplode(ARRAY a)int,T使用int类型附加一列位置列(从0开始),将数组分解为多行。返回一个两列(pos, val)的行集,数组中的每个元素占一行
炸开结构数组inline(ARRAY<STRUCTf1:T1,…,fn:Tn> a)T1,…,Tn将结构数组分解成多行,返回一个包含N列的行集(N等于结构体中元素个数),数组中每个结构体一行。
将后面元素排成n/r列r行stack(int r,T1 V1,…,Tn/r Vn)T1,…,Tn/r将n 个值 V 1 ,…,V n分解为r行。每行将有n/r列。r必须是常数。
解析多个键json_tuple(string jsonStr,string k1,…,string kn)string1,…,stringn获取JSON字符串中多个键。接受JSON字符串和一组n个键(string),并返回n个值的元组。类似get_json_object()的更高效版本。
解析多个URL部分parse_url_tuple(string urlStr,string p1,…,string pn)string 1,…,stringn获取URL字符串多个URL部分,返回一个包含n个值的元组。类似parse_url()的高级版本。有效的部分名称为:HOST、PATH、QUERY、REF、PROTOCOL、AUTHORITY、FILE、USERINFO、QUERY:。
-- 案例
-- explode
select explode(array('A','B','C'));
select explode(array('A','B','C')) as col;
select tf.* from (select 0) t lateral view explode(array('A','B','C')) tf;
select tf.* from (select 0) t lateral view explode(array('A','B','C')) tf as col;
/* 结果
    A
    B
    C
*/

select explode(map('A',10,'B',20,'C',30));
select explode(map('A',10,'B',20,'C',30)) as (key,value);
select tf.* from (select 0) t lateral view explode(map('A',10,'B',20,'C',30)) tf;
select tf.* from (select 0) t lateral view explode(map('A',10,'B',20,'C',30)) tf as key,value;
/* 结果
    A,10
    B,20
    C,30
*/


-- posexplode
select posexplode(array('A','B','C'));
select posexplode(array('A','B','C')) as (pos,val);
select tf.* from (select 0) t lateral view posexplode(array('A','B','C')) tf;
select tf.* from (select 0) t lateral view posexplode(array('A','B','C')) tf as pos,val;
/* 结果
    0,A
    1,B
    2,C
*/


-- inline
select inline(array(struct('A',10,date '2015-01-01'),struct('B',20,date '2016-02-02')));
select inline(array(struct('A',10,date '2015-01-01'),struct('B',20,date '2016-02-02'))) as (col1,col2,col3);
select tf.* from (select 0) t lateral view inline(array(struct('A',10,date '2015-01-01'),struct('B',20,date '2016-02-02'))) tf;
select tf.* from (select 0) t lateral view inline(array(struct('A',10,date '2015-01-01'),struct('B',20,date '2016-02-02'))) tf as col1,col2,col3;
/* 结果
    A,10,2015-01-01
    B,20,2016-02-02
*/


-- stack
select stack(2,'A',10,date '2015-01-01','B',20,date '2016-01-01');
select stack(2,'A',10,date '2015-01-01','B',20,date '2016-01-01') as (col0,col1,col2);
select tf.* from (select 0) t lateral view stack(2,'A',10,date '2015-01-01','B',20,date '2016-01-01') tf;
select tf.* from (select 0) t lateral view stack(2,'A',10,date '2015-01-01','B',20,date '2016-01-01') tf as col0,col1,col2;
/* 结果
    A,10,2015-01-01
    B,20,2016-01-01
*/

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

使用语法“SELECT udtf(col) AS colAlias…”有一些限制:

  • SELECT 中不允许使用其他表达式
    • SELECT pageid,explode(adid_list) AS myCol… 不受支持
  • UDTF 不能嵌套
    • SELECT 爆炸(爆炸(adid_list))作为 myCol… 不受支持
  • 不支持 GROUP BY / CLUSTER BY / DISTRIBUTE BY / SORT BY
    • SELECT expand(adid_list) AS myCol … GROUP BY myCol 不受支持

有关没有这些限制的替代语法,请参阅LanguageManual LateralView

行转列

相关函数说明

CONCAT(string A/col, string B/col…)**:**返回输入字符串连接后的结果,支持任意个输入字符串;

CONCAT_WS(separator, str1, str2,…):它是一个特殊形式的 CONCAT()。第一个参数剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何NULL 和空字符串。分隔符将被加到被连接的字符串之间;

注意: CONCAT_WS must be "string or array

COLLECT_SET(col):函数只接受基本数据类型。它的主要作用是将某字段的值进行去重汇总,产生array类型字段。

  • 案例
    数据准备

    nameconstellationblood_type
    孙悟空白羊座A
    唐僧射手座A
    沙僧白羊座B
    猪八戒白羊座A
    如来射手座A
    菩萨白羊座B

    **需求:**把星座和血型一样的人归类到一起。结果如下:

    白羊座,A  孙悟空|猪八戒
    白羊座,B  沙僧|菩萨
    射手座,A  如来|唐僧
    
    
    • 1
    • 2
    • 3
    • 4

    参考答案: (答案不唯一)

    --创建表
    create table person_info
    (
        name          string,
        constellation string,
        blood_type    string
    )
        row format delimited fields terminated by ",";
    
    --插入数据
    insert overwrite table person_info
    values ('孙悟空', '白羊座', 'A'),
           ('唐僧', '射手座', 'A'),
           ('沙僧', '白羊座', 'B'),
           ('猪八戒', '白羊座', 'A'),
           ('如来', '射手座', 'A'),
           ('菩萨', '白羊座', 'B');
    
    --解答
    --第一种
    SELECT t1.c_b,
           CONCAT_WS("|", collect_set(t1.name))
    FROM (
             SELECT NAME,
                    CONCAT_WS(',', constellation, blood_type) c_b
             FROM person_info
         ) t1
    GROUP BY t1.c_b;
    
    --第二种
    select CONCAT_WS(',', constellation, blood_type),
           CONCAT_WS("|", collect_set(name))
    from person_info
    group by constellation, blood_type;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

列转行

相关函数说明

explode(col):将hive一列中复杂的array或者map结构拆分成多行。

split(stringstr, string pat): 按照pat字符串分割str,会返回分割后的字符串数组

-- explode
select explode(`array`('hello', 'hive', 'HQL'));
select explode(`map`("k1","v1","k2","v2","k3","v3"));
select explode(split('1,2,3', ',')) as myCol from (select 0) t1

-- split
select split('1,2,3', ',');

-- explode && split
select explode(split('1,2,3', ','));

-- 错误用法
-- select 1, explode(split('1,2,3', ','));
-- select explode(split('1,2,3', ',')), explode(split('1,2,3', ','));
-- select explode(explode(`array`(`array`(1,2,3), `array`(1,2,3))));
-- select explode(split('1,2,3', ',')) as myCol from (select 0) t1  group by myCol;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

lateral view

用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。

一个 FROM 子句可以有多个 LATERAL VIEW 子句。后续的 LATERAL VIEWS 可以引用出现在 LATERAL VIEW 左侧的任何表中的列。

--例子
-- 案例1:使用单个lateral view
create table pageAds
(
    pageid    string,
    adid_list array<int>
);
insert overwrite table pageAds
values ('front_page', `array`(1, 2, 3)),
       ('contact_page', `array`(3, 4, 5));
select pageid, adid_list
from pageAds lateral view explode(adid_list) adTable AS adid;

select adid, count(1)
from pageAds lateral view explode(adid_list) adTable AS adid
group by adid
order by adid;

-- 案例2
--创建表
create table allPagesAds
(
    pagesid   array<string>,
    adid_list array<int>
);
--插入数据
insert overwrite table allPagesAds
values (`array`('a', 'b', 'c'), `array`(1, 2)),
       (`array`('d', 'e', 'f'), `array`(3, 4));

--SQL
select page, id
from allPagesAds
         lateral view explode(pagesid) pageTable as page
         lateral view explode(adid_list) adTable as id;

-- 案例3
--特别的,当使用UDTF分解的列为空时,就会导致源数据不会出现在结果集中。而OUTER可用于防止这种情况发生,并且将使用NULL为UDTF函数的结果生成行。
select * from pageAds lateral view explode(array()) tmp as t limit 10;  --结果为空集
select * from pageAds lateral view outer explode(array()) tmp as t limit 10; --UDTF生成结果都为null

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 案例
    数据准备

    moviecategory
    《疑犯追踪》悬疑,动作,科幻,剧情
    《Lie to me》悬疑,警匪,动作,心理,剧情
    《战狼2》战争,动作,灾难

    **需求:**把电影分类中的数组数据展开。结果如下:

    《疑犯追踪》      悬疑
    《疑犯追踪》      动作
    《疑犯追踪》      科幻
    《疑犯追踪》      剧情
    《Lie to me》   悬疑
    《Lie to me》   警匪
    《Lie to me》   动作
    《Lie to me》   心理
    《Lie to me》   剧情
    《战狼2》        战争
    《战狼2》        动作
    《战狼2》        灾难
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    **参考答案: **

    --创建表
    create table movie_info
    (
        movie    string,
        category string
    )
        row format delimited
            fields terminated by "\t";
    
    --插入数据
    insert overwrite table movie_info
    values ('《疑犯追踪》', '悬疑,动作,科幻,剧情'),
           ('《Lie to me》', '悬疑,警匪,动作,心理,剧情'),
           ('《战狼2》', '战争,动作,灾难');
    
    --解答
    SELECT movie, category_name
    FROM movie_info
             lateral VIEW
                 explode(split(category, ",")) movie_info_tmp AS category_name;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

扩展:reflect

reflect函数通过使用反射匹配参数来调用java的自带函数。从 hive 0.7.0开始使用,具体可查看 ReflectUDF 。在hive 0.9.0时 java_method() 是reflect() 的同义词,方法使用也一样。

select reflect("java.lang.String", "valueOf", 1),
       reflect("java.lang.String", "isEmpty"),
       reflect("java.lang.Math", "max", 2, 3),
       reflect("java.lang.Math", "min", 2, 3);
       
select reflect("org.apache.commons.lang.math.NumberUtils", "isNumber", '123');

-- 官方给的下面三个例子在运行时发生了精度问题
-- select reflect("java.lang.Math", "round", 2.5);
-- select reflect("java.lang.Math", "exp", 1.0);
-- select reflect("java.lang.Math", "floor", 1.9);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

请注意,反射 UDF 是不确定的,因为无法保证给定相同参数的特定方法将返回什么。所以在 WHERE 子句上使用 Reflect 时要小心,因为这可能会使 Predicate Pushdown 优化无效。

自定义UDF函数

官网描述:如何自定义UDF函数

案例: 自定义一个UDF实现计算给定字符串的长度

  • 实现步骤:
    (1) 创建一个maven工程
    (2) 导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (3) 创建一个类

    package com.hive.udf;
    
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    /**
     * 自定义UDF函数,需要继承GenericUDF类
     * 需求: 计算指定字符串的长度
     */
    public class MyStringLength extends GenericUDF {
        /**
         *
         * @param arguments 输入参数类型的鉴别器对象
         * @return 返回值类型的鉴别器对象
         * @throws UDFArgumentException
         */
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            // 判断输入参数的个数
            if(arguments.length !=1){
                throw new UDFArgumentLengthException("Input Args Length Error!!!");
            }
            // 判断输入参数的类型
            if(!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
                throw new UDFArgumentTypeException(0,"Input Args Type Error!!!");
            }
            //函数本身返回值为int,需要返回int类型的鉴别器对象
            return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
        }
    
        /**
         * 函数的逻辑处理
         * @param arguments 输入的参数
         * @return 返回值
         * @throws HiveException
         */
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
           if(arguments[0].get() == null){
               return 0 ;
           }
           return arguments[0].get().toString().length();
        }
    
        @Override
        public String getDisplayString(String[] children) {
            return "";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

(4) 打成jar包上传到服务器/opt/module/hive/datas/myudf.jar

(5) 将jar包添加到hive的classpath

hive (default)> add jar /opt/module/hive/datas/myudf.jar;
  • 1

(6) 创建临时函数与开发好的java class关联

hive (default)> create temporary function default.my_len as "com.hive.udf.MyStringLength";

-- 检查函数是否创建成功
desc function default.my_len;
desc function extended default.my_len;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(7) 即可在hql中使用自定义的函数

hive (default)> select ename,my_len(ename) ename_len from emp;
  • 1

需要注意的是: hive自定义的函数是有作用域限制的,如果创建函数的时候没有在函数名前加上库名,则会创建在当前库中。在使用的过程中,若是跨库使用函数也需要在函数名前加上表名,否则会报找不到该函数。

自定义UDAF函数

官网

https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy

翻译官方如何编写UDAF: 实现UDAF有两个部分,一是编写解析器类(解析器类继承AbstractGenericUDAFResolver),二是在解析器类中创建一个评估器[或计算器]内部类(评估器类继承GenericUDAFEvaluator)。解析器处理类型检查和运算符重载,并帮助hive为给定的参数类型找到正确的评估器类,评估器内部实现了UDAF逻辑。
关于方法的描述也可以查看org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRank

  • UDAF模板
    package com.hive.udaf;
    
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    
    public class MyUDAF extends AbstractGenericUDAFResolver {
    
        /**
         * 参数类型检查以及返回一个自定义的GenericUDAFEvaluator
         * @param info
         * @return
         * @throws SemanticException
         */
        @Override
        public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
            TypeInfo[] parameters = info.getParameters(); //获取参数的SQL类型相对应的类型信息对象数组
    
            // 验证参数类型的
    
            return new MyUDAFEvaluator();
        }
    
        // 自定义评估器[计算器]
        public static class MyUDAFEvaluator extends GenericUDAFEvaluator{
            /**
             * 初始化评估器
             * @param m  聚合模式
             * @param parameters   参数的ObjectInspector:在PARTIAL1和COMPLETE模式下,
             *          参数为原始数据; 在 PARTIAL2 和 FINAL 模式下,参数只是部分聚合(在这种情况下,数组将始终具有单个元素)
             */
            @Override
            public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
                return super.init(m, parameters);
            }
    
            /**
             * 获取一个新的缓冲区对象,用于存储临时聚合结果
             * @return
             * @throws HiveException
             */
            public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                return null;
            }
    
            // 重置缓冲区
            public void reset(AggregationBuffer agg) throws HiveException {
    
            }
    
            /**
             * 首先检查聚合缓冲区中的对象是否初始化,如果没有问题则将读取每一行数据,累积到缓冲区中。
             * @param agg 缓冲区
             * @param parameters 输入的列的N行
             */
            public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    
            }
    
            /**
             * 以可持久化的方式返回当前聚合的内容,返回值需要继承序列化的官方类。注意: 这里是返回的只是部分聚合内容,merge方法中有描述
             *      官方描述:
             *          返回值只能根据 Java 基本数据类型、数组、包装类(例如 Double)、Hadoop Writables、列表和映射来构建。
             *          不要使用你自己的类(即使它们实现了 java.io.Serializable
             *          ,否则你可能会得到奇怪的错误或(可能更糟)错误的结果
             */
            public Object terminatePartial(AggregationBuffer agg) throws HiveException {
                return null;
            }
    
            /**
             * 将terminatePartial返回的部分聚合合并到当前聚合中
             * @param agg
             * @param partial
             * @throws HiveException
             */
            public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    
            }
    
            /**
             * 将聚合的最终结果返回
             * @param agg
             * @return
             * @throws HiveException
             */
            public Object terminate(AggregationBuffer agg) throws HiveException {
                return null;
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

自定义UDTF函数

https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF

翻译官方如何编写UDTF: 自定义UDTF可以通过扩展GenericUDTF抽象类,然后执行被创建initializeprocess以及可能的close方法。该initialize方法由 Hive 调用以通知 UDTF 期望的参数类型。然后,UDTF 必须返回与 UDTF 将生成的行对象相对应的对象检查器。一旦initialize()被调用,Hive 将使用该process()方法将行提供给 UDTF 。在 中process(),UDTF 可以通过调用 生成行并将行转发给其他运算符forward()。最后,close()当所有行都传递给 UDTF 时,Hive 将调用该方法。关于方法的描述也可以查看 **org.apache.hadoop.hive.ql.udf.generic.GenericUDTF**

案例: 自定义一个UDTF实现将一个任意分割符的字符串切割成独立的单词

  • 实现步骤:
    (1) 创建一个maven项目
    (2) 导入依赖
    XML <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> </dependencies>

  • (3) 创建一个类

    package com.hive.udtf;
    
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class MyUDTF extends GenericUDTF {
    
        private ArrayList<String> outList = new ArrayList<>();
    
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            //1.定义输出数据的列名和类型
            List<String> fieldNames = new ArrayList<>();
            List<ObjectInspector> fieldOIs = new ArrayList<>();
    
            //2.添加输出数据的列名和类型
            fieldNames.add("lineToWord");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            
            //1.获取原始数据
            String arg = args[0].toString();
    
            //2.获取数据传入的第二个参数,此处为分隔符
            String splitKey = args[1].toString();
    
            //3.将原始数据按照传入的分隔符进行切分
            String[] fields = arg.split(splitKey);
    
            //4.遍历切分后的结果,并写出
            for (String field : fields) {
    
                //集合为复用的,首先清空集合
                outList.clear();
    
                //将每一个单词添加至集合
                outList.add(field);
    
                //将集合内容写出
                forward(outList);
            }
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

(4) 打成jar包上传到服务器/opt/module/hive/datas/myudtf.jar

(5) 将jar包添加到hive的classpath

hive (default)> add jar /opt/module/hive/datas/myudtf.jar;
  • 1

(6) 创建临时函数与开发好的java class关联

hive (default)> create temporary function myudtf as "com.hive.udtf.MyUDTF";

-- 检查函数是否创建成功
desc function default.myudtf;
desc function extended default.myudtf;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(7) 使用自定义的函数

hive (default)> select myudtf("hello,world,hadoop,hive",",");
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/597267
推荐阅读
相关标签
  

闽ICP备14008679号