当前位置:   article > 正文

7 | 史上最全大数据笔记-Hive函数

7 | 史上最全大数据笔记-Hive函数

第八章 Hive函数

在Hive中,函数主要分两大类型,一种是内置函数,一种是用户自定义函数

8.1 Hive内置函数

8.1.1 函数查看
 show functions;
 desc function functionName;
8.1.2 日期函数

1)当前系统时间函数:current_date()、current_timestamp()、unix_timestamp()

 -- 函数1:current_date();                
         当前系统日期                格式:"yyyy-MM-dd"
 -- 函数2:current_timestamp();        
         当前系统时间戳:        格式:"yyyy-MM-dd HH:mm:ss.ms"
 -- 函数3:unix_timestamp();        
         当前系统时间戳        格式:距离1970年1月1日0点的秒数。

2)日期转时间戳函数:unix_timestamp()

 格式:unix_timestamp([date[,pattern]])
 案例:
 select unix_timestamp('1970-01-01 0:0:0'); -- 传入的日期时间是东八区的时间, 返回值是相对于子午线的时间来说的
 select unix_timestamp('1970-01-01 8:0:0'); 
 select unix_timestamp('0:0:0 1970-01-01',"HH:mm:ss yyyy-MM-dd"); 
 select unix_timestamp(current_date());

3)时间戳转日期函数:from_unixtime

 语法:from_unixtime(unix_time[,pattern]) 
 案例:
 select from_unixtime(1574092800); 
 select from_unixtime(1574096401,'yyyyMMdd'); 
 select from_unixtime(1574096401,'yyyy-MM-dd HH:mm:ss'); 
 select from_unixtime(0,'yyyy-MM-dd HH:mm:ss');
 select from_unixtime(-28800,'yyyy-MM-dd HH:mm:ss');

4)计算时间差函数:datediff()、months_between()

 格式:datediff(date1, date2) - Returns the number of days between date1 and date2
 select datediff('2019-11-20','2019-11-01');
 格式:months_between(date1, date2) - returns number of months between dates date1 and date2
 select months_between('2019-11-20','2019-11-01');
 select months_between('2019-10-30','2019-11-30');
 select months_between('2019-10-31','2019-11-30');
 select months_between('2019-11-00','2019-11-30');

5)日期时间分量函数:year()、month()、day()、hour()、minute()、second()

 案例:
 select year(current_date);
 select month(current_date);
 select day(current_date);
 select year(current_timestamp);
 select month(current_timestamp);
 select day(current_timestamp);
 select hour(current_timestamp);
 select minute(current_timestamp);
 select second(current_timestamp);
 ​
 select dayofmonth(current_date);
 select weekofyear(current_date)

6)日期定位函数:last_day()、next_day()

 --月末:
 select  last_day(current_date)
 --下周
 select next_day(current_date,'thursday');

7)日期加减函数:date_add()、date_sub()、add_months()

 格式:
 date_add(start_date, num_days)
 date_sub(start_date, num_days)
 案例:
 select date_add(current_date,1);
 select date_sub(current_date,90);
 select add_months(current_date,1);

定位案例:

 --当月第1天: 
 select date_sub(current_date,dayofmonth(current_date)-1)
 --下个月第1天:
 select  add_months(date_sub(current_date,dayofmonth(current_date)-1),1)

8) 字符串转日期:to_date()

 (字符串必须为:yyyy-MM-dd格式)
 ​
 select to_date('2017-01-01 12:12:12');

9)日期转字符串(格式化)函数:date_format

 select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss');
 select date_format(current_date(),'yyyyMMdd');
 select date_format('2017-01-01','yyyy-MM-dd HH:mm:ss');  
8.1.3 字符串函数
 lower--(转小写)
 select lower('ABC');
 ​
 upper--(转大写)
 select upper('abc');
 ​
 length--(字符串长度,字符数)
 select length('abc');
 ​
 concat--(字符串拼接)
 select concat("A", 'B');
 ​
 concat_ws --(指定分隔符)
 select concat_ws('-','a' ,'b','c');
 ​
 substr--(求子串)
 select substr('abcde',3);
 ​
 split(str,regex)--切分字符串,返回数组。
 select split("a-b-c-d-e-f","-");
8.1.4 类型转换函数
 cast(value as type) -- 类型转换
 select cast('123' as int)+1;
8.1.5 数学函数
 round --四舍五入((42.3 =>42))
 select round(42.3);
 ​
 ceil --向上取整(42.3 =>43)
 select ceil(42.3);
 ​
 floor --向下取整(42.3 =>42)
 select floor(42.3);
8.1.6 其他常用函数
 nvl(value,default value):如果value为null,则使用default value,否则使用本身value.
 ​
 isnull()
 isnotnull()
 ​
 case when  then ....when ...then.. else... end
 if(p1,p2,p3)
 coalesce(col1,col2,col3...)返回第一个不为空的

8.2 窗口函数(重点)

8.2.1 窗口函数over简介

先来看一下这个需求:求每个部门的员工信息以及部门的平均工资。在mysql中如何实现呢

  1.  SELECT emp.*, avg_sal
  2.  FROM emp
  3.         JOIN (
  4.                 SELECT deptno
  5.                         , round(AVG(ifnull(sal, 0))) AS avg_sal
  6.                 FROM emp
  7.                 GROUP BY deptno
  8.         ) t
  9.         ON emp.deptno = t.deptno
  10.  ORDER BY deptno;
  11.  ​
  12.  ​
  13.  select emp.*,(select avg(ifnull(sal,0)) from emp B where B.deptno = A.deptno )
  14.  from emp A;

通过这个需求我们可以看到,如果要查询详细记录和聚合数据,必须要经过两次查询,比较麻烦。

这个时候,我们使用窗口函数,会方便很多。那么窗口函数是什么呢?

 -1) 窗口函数又名开窗函数,属于分析函数的一种。
 -2) 是一种用于解决复杂报表统计需求的函数。
 -3) 窗口函数常用于计算基于组的某种值,它和聚合函数的不同之处是:对于每个组返回多行,而聚合函数对于每个组只返回一行。
         简单的说窗口函数对每条详细记录开一个窗口,进行聚合统计的查询
 -4) 开窗函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化。
 -5) 窗口函数一般不单独使用
 -6) 窗口函数内也可以分组和排序

注意:默认mysql老版本没有支持,在最新的8.0版本中支持, Oracle和Hive中都支持窗口函数

8.2.2 基本案例演示

数据准备(order.txt)

 姓名,购买日期,购买数量
 -1. 创建order表:
  1.  create table if not exists t_order
  2.  (
  3.     name     string,
  4.     orderdate string,
  5.     cost      int
  6.  )  row format delimited fields terminated by ',';
  7.  -2. 加载数据:
  8.  load data local inpath "./data/order.txt" into table t_order;

需求:查询每个订单的信息,以及订单的总数

-- 1.不使用窗口函数

 -- 查询所有明细
 select * from t_order;
 # 查询总量
 select count(*) from t_order;

-- 2.使用窗口函数:通常格式为 ==可用函数+over()函数==

 select *, count(*) over() from t_order;

注意:

窗口函数是针对每一行数据的.

如果over中没有指定参数,默认窗口大小为全部结果集

需求:查询在2018年1月份购买过的顾客购买明细及总人数

  1.  select *,count(*) over()
  2.  from t_order
  3.  where substring(orderdate,1,7) = '2024-01';
8.2.3 distribute by子句

在over窗口中进行分组,对某一字段进行分组统计,窗口大小就是同一个组的所有记录

 语法:
 over(distribute by colname[,colname.....])

需求:查看顾客的购买明细及月购买总额

 select name, orderdate, cost, sum(cost) over (distribute by month(orderdate))
 from t_order;
 ​
 saml        2018-01-01        10        205
 saml        2018-01-08        55        205
 tony        2018-01-07        50        205
 saml        2018-01-05        46        205
 tony        2018-01-04        29        205
 tony        2018-01-02        15        205
 saml        2018-02-03        23        23
 mart        2018-04-13        94        341
 saml        2018-04-06        42        341
 mart        2018-04-11        75        341
 mart        2018-04-09        68        341
 mart        2018-04-08        62        341
 neil        2018-05-10        12        12
 neil        2018-06-12        80        80

需求:查看顾客的购买明细及每个顾客的月购买总额

 select name, orderdate, cost, sum(cost) over (distribute by name, month(orderdate))
 from t_order;
 ​
 mart    2018-04-13      94      299
 mart    2018-04-11      75      299
 mart    2018-04-09      68      299
 mart    2018-04-08      62      299
 neil    2018-05-10      12      12
 neil    2018-06-12      80      80
 saml    2018-01-01      10      111
 saml    2018-01-08      55      111
 saml    2018-01-05      46      111
 saml    2018-02-03      23      23
 saml    2018-04-06      42      42
 tony    2018-01-07      50      94
 tony    2018-01-04      29      94
 tony    2018-01-02      15      94
8.2.4 sort by子句

sort by子句会让输入的数据强制排序 (强调:当使用排序时,窗口会在组内逐行变大)

 语法:  over([distribute by colname] [sort by colname [desc|asc]])

需求:查看顾客的购买明细及每个顾客的月购买总额,并且按照日期降序排序

  1.  select name, orderdate, cost,
  2.  sum(cost) over (distribute by name, month(orderdate) sort by orderdate desc)
  3.  from t_order;

注意:可以使用partition by + order by 组合来代替distribute by+sort by组合

  1.  select name, orderdate, cost,
  2.  sum(cost) over (partition by name, month(orderdate) order by orderdate desc)
  3.  from t_order;

注意:也可以在窗口函数中,只写排序,窗口大小是全表记录。

 select name, orderdate, cost, 
 sum(cost) over (order by orderdate desc)
 from t_order;
 ​
 neil    2018-06-12      80      80                                -统计信息会逐行增加
 neil    2018-05-10      12      92
 mart    2018-04-13      94      186
 mart    2018-04-11      75      261
 mart    2018-04-09      68      329
 mart    2018-04-08      62      391
 saml    2018-04-06      42      433
 saml    2018-02-03      23      456
 saml    2018-01-08      55      511
 tony    2018-01-07      50      561
 saml    2018-01-05      46      607
 tony    2018-01-04      29      636
 tony    2018-01-02      15      651
 saml    2018-01-01      10      661
8.2.5 Window子句

如果要对窗口的结果做更细粒度的划分,那么就使用window子句,常见的有下面几个

 PRECEDING:往前 
 FOLLOWING:往后 
 CURRENT ROW:当前行 
 UNBOUNDED:起点,
 UNBOUNDED PRECEDING:表示从前面的起点, 
 UNBOUNDED FOLLOWING:表示到后面的终点 

一般window子句都是==rows==开头

案例:

  1.  select name,orderdate,cost,
  2.         sum(cost) over() as sample1,--所有行相加
  3.        
  4.         sum(cost) over(partition by name) as sample2,-- 按name分组,组内数据相加
  5.        
  6.         sum(cost) over(partition by name order by orderdate) as sample3,-- 按name分组,组内数据累加
  7.        
  8.         sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row )  as sample4 ,-- 与sample3一样,由起点到当前行的聚合
  9.        
  10.         sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING   and current row) as sample5, -- 当前行和前面一行做聚合
  11.        
  12.         sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING   AND 1 FOLLOWING  ) as sample6,-- 当前行和前边一行及后面一行
  13.        
  14.         sum(cost) over(partition by name order by orderdate rows between current row and UNBOUNDED FOLLOWING ) as sample7 -- 当前行及后面所有行
  15.        
  16.  from t_order;

需求:查看顾客到目前为止的购买总额

 select name,
        t_order.orderdate,
        cost,
        sum(cost)
            over (partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as allCount
 from t_order;
 mart        2018-04-08        62        62
 mart        2018-04-09        68        130
 mart        2018-04-11        75        205
 mart        2018-04-13        94        299
 neil        2018-05-10        12        12
 neil        2018-06-12        80        92
 saml        2018-01-01        10        10
 saml        2018-01-05        46        56
 saml        2018-01-08        55        111
 saml        2018-02-03        23        134
 saml        2018-04-06        42        176
 tony        2018-01-02        15        15
 tony        2018-01-04        29        44
 tony        2018-01-07        50        94

需求:求每个顾客最近三次的消费总额

 select name,orderdate,cost,
 sum(cost) over(partition by name order by orderdate rows between 2 preceding and current row)
 from t_order;

8.3 序列函数

8.3.1 NTILE

ntile 是Hive很强大的一个分析函数。可以看成是:它把有序的数据集合 平均分配指定的数量(num)个桶中, 将桶号分配给每一行。如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。

例子:

 select name,orderdate,cost,
        ntile(3) over(partition by name), # 按照name进行分组,在分组内将数据切成3份
 from t_order;
 mart        2018-04-13        94        1
 mart        2018-04-11        75        1
 mart        2018-04-09        68        2
 mart        2018-04-08        62        3
 neil        2018-06-12        80        1
 neil        2018-05-10        12        2
 saml        2018-01-01        10        1
 saml        2018-01-08        55        1
 saml        2018-04-06        42        2
 saml        2018-01-05        46        2
 saml        2018-02-03        23        3
 tony        2018-01-07        50        1
 tony        2018-01-02        15        2
 tony        2018-01-04        29        3
8.3.2 LAG和LEAD函数
  • lag返回当前数据行的前第n行的数据

  • lead返回当前数据行的后第n行的数据

需求:查询顾客上次购买的时间

 select name,orderdate,cost,lag(orderdate,1) over(partition by name order by orderdate) as time1 from t_order;
 ​
 lag(colName,n[,default value]): 取字段的前第n个值。如果为null,显示默认值
 ​
 select name,orderdate,cost,lag(orderdate,1,'1990-01-01') over(partition by name order by orderdate ) as time1 from t_order;

取得顾客下次购买的时间

 select name,orderdate,cost,
        lead(orderdate,1) over(partition by name order by orderdate ) as time1
 from t_order;
8.3.3 first_value和last_value
  • first_value 取分组内排序后,截止到当前行,第一个值

  • last_value 分组内排序后,截止到当前行,最后一个值

案例:

  1.  select name,orderdate,cost,
  2.     first_value(orderdate) over(partition by name order by orderdate) as time1,
  3.     last_value(orderdate) over(partition by name order by orderdate) as time2
  4.  from t_order;
  5.  ​
  6.  ​
  7.  select name,orderdate,cost,
  8.     first_value(orderdate) over(partition by name order by orderdate) as time1,
  9.     first_value(orderdate) over(partition by name order by orderdate desc) as time2
  10.  from t_order;

8.4 排名函数

第一种函数:row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

 效果如下:
 98                1
 97                2
 97                3
 96                4
 95                5
 95                6
 ​
 没有并列名次情况,顺序递增

第二种函数:RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位

 效果如下:
 98                1
 97                2
 97                2
 96                4
 95                5
 95                5
 94                7
 ​
 有并列名次情况,顺序跳跃递增

第三种函数:DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位

 效果如下:
 98                1
 97                2
 97                2
 96                3
 95                4
 95                4
 94                5
 ​
 有并列名次情况,顺序递增

准备数据

 userid        classno        score
 1 gp1808 80   
 2 gp1808 92
 3 gp1808 84
 4 gp1808 86
 5 gp1808 88
 6 gp1808 70
 7 gp1808 98
 8 gp1808 84
 9 gp1808 86
 10 gp1807 90
 11 gp1807 92
 12 gp1807 84
 13 gp1807 86
 14 gp1807 88
 15 gp1807 80
 16 gp1807 92
 17 gp1807 84
 18 gp1807 86
 19 gp1805 80
 20 gp1805 92
 21 gp1805 94
 22 gp1805 86
 23 gp1805 88
 24 gp1805 80
 25 gp1805 92
 26 gp1805 94
 27 gp1805 86
 create table if not exists stu_score(
 userid int,
 classno string,
 score int
 )
 row format delimited 
 fields terminated by ' ';
 ​
 load data local inpath './data/stu_score.txt' overwrite into table stu_score;

需求1:对每次考试按照考试成绩倒序

  1.  select *,
  2.  row_number() over(partition by classno order by score desc) rn1
  3.  from stu_score;
  4.  ​
  5.  select *,
  6.  rank() over(partition by classno order by score desc) rn2
  7.  from stu_score;
  8.  ​
  9.  select *,
  10.  dense_rank() over(distribute by classno sort by score desc) rn3
  11.  from stu_score;
  12.  ​
  13.  select *,
  14.  dense_rank() over(order by score desc) `全年级排名`
  15.  from stu_score;

需求2:获取每次考试的排名情况

  1.  select *,
  2.  -- 没有并列,相同名次依顺序排
  3.  row_number() over(distribute by classno sort by score desc) rn1,
  4.  -- rank():有并列,相同名次空位
  5.  rank() over(distribute by classno sort by score desc) rn2,
  6.  -- dense_rank():有并列,相同名次不空位
  7.  dense_rank() over(distribute by classno sort by score desc) rn3
  8.  from stu_score;

需求3:求每个班级的前三名

  1.  select *
  2.  from
  3.  (
  4.  select *,
  5.  row_number() over(partition by classno order by score desc) rn1
  6.  from stu_score
  7.  ) A
  8.  where rn1 < 4;

8.5 自定义函数

8.5.1 自定义函数介绍

hive的内置函数满足不了所有的业务需求。hive提供很多的模块可以自定义功能,比如:自定义函数、serde、输入输出格式等。而自定义函数可以分为以下三类:

1)UDF:user defined function

 用户自定义函数,一对一的输入输出 (最常用的)。

2)UDAF:user defined aggregation function

 用户自定义聚合函数,多对一的输入输出,比如:count sum max。

3)UDTF:user defined table-generate function

 用户自定义表生产函数 一对多的
 输入输出,比如:lateral view explode
8.5.2 自定义函数实现
8.5.2.1 UDF格式

在pom.xml,加入以下maven的依赖包 请查看code/pom.xml

  1. <dependency>
  2.      <groupId>org.apache.hive</groupId>
  3.      <artifactId>hive-exec</artifactId>
  4.      <version>3.1.2</version>
  5. </dependency>

定义UDF函数要注意下面几点:

  1. 继承org.apache.hadoop.hive.ql.exec.UDF

  2. 重写evaluate(),这个方法不是由接口定义的,因为它可接受的参数的个数,数据类型都是不确定的。Hive会检查UDF,看能否找到和函数调用相匹配的evaluate()方法

自定义函数第一个案例

  1. public class FirstUDF extends UDF {
  2. public String evaluate(String str){
  3. //关于默认输出值是null,还是"",这个要看需求具体定义,在这里先默认定义为null,
  4. String result = null;
  5. //1、检查输入参数
  6. if (!StringUtils.isEmpty(str)){
  7. result = str.toUpperCase();
  8. }
  9. return result;
  10. }
  11. //调试自定义函数
  12. public static void main(String[] args){
  13. System.out.println(new FirstUDF().evaluate("hadoopedu"));
  14. }
  15. }

在Hive3.0中UDF已经过时了,官方建议我们使用GenericUDF

  1.  public class FirstGenericUDF extends GenericUDF {
  2.      /**
  3.       * Serde实现数据序列化和反序列化以及提供一个辅助类ObjectInspector帮助使用者访问需要序列化或者反序列化的对象。
  4.       *
  5.       * Serde层构建在数据存储和执行引擎之间,实现数据存储+中间数据存储和执行引擎的解耦。
  6.       */
  7.      StringObjectInspector word;
  8.      // 只调用一次,在任何evaluate()调用之前,你可以接收到一个可以表示函数输入参数类型的object inspectors数组
  9.      // 这是你用来验证该函数是否接收正确的参数类型和参数个数的地方
  10.      @Override
  11.      public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
  12.          if (objectInspectors.length != 1){
  13.              throw new UDFArgumentLengthException("FirstGenericUDF take only one arguments :String");
  14.         }
  15.          ObjectInspector a = objectInspectors[0];
  16.          if (!(a instanceof StringObjectInspector)){
  17.              throw new UDFArgumentException("argument must be String");
  18.         }
  19.          this.word = (StringObjectInspector) a;
  20.          return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
  21.     }
  22.  ​
  23.      // 这个类似于简单API的evaluat方法,它可以读取输入数据和返回结果
  24.      //DeferredObject:Hive内部封装的一个公共的,稳定的接口,对对象进行包装,有一个基本的子类DeferredJavaObject
  25.      @Override
  26.      public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
  27.          String result = null;
  28.  ​
  29.          String str = this.word.getPrimitiveJavaObject(deferredObjects[0].get());
  30.          System.out.println("category:"+this.word.getCategory());
  31.  ​
  32.          if (!StringUtils.isEmpty(str)){
  33.              result = str.toUpperCase();
  34.         }
  35.          return result;
  36.     }
  37.  ​
  38.      // 该方法无关紧要,我们可以返回任何东西,但应当是描述该方法的字符串
  39.      @Override
  40.      public String getDisplayString(String[] strings) {
  41.          return "FirstGenericUDF";
  42.     }
  43.  ​
  44.      public static void main(String[] args) throws HiveException {
  45.          //获取当前的实例对象
  46.          FirstGenericUDF udf = new FirstGenericUDF();
  47.          //获取字符串的序列化格式
  48.          ObjectInspector stringio = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
  49.          JavaStringObjectInspector resultInspector = (JavaStringObjectInspector) udf.initialize(new ObjectInspector[]{stringio});
  50.          //对数据进行处理
  51.          Object result = udf.evaluate(new DeferredObject[]{new DeferredJavaObject("hehe")});
  52.          //得到具体类型的数据
  53.          System.out.println(resultInspector.getPrimitiveJavaObject(result));
  54.     }
  55.  }
 
8.5.2.2 函数加载方式

临时加载

 # 将编写的udf的jar包上传到服务器上.
 # 并且将jar包添加到Hive的class path中
 # 进入到Hive客户端,执行下面命令
  hive> add jar /opt/jar/udf.jar
 -- 创建一个临时函数名,要跟上面Hive在同一个session里面:
 hive> create temporary function toUP as 'com.qf.hive.FirstUDF';
 ​
 -- 检查函数是否创建成功
 hive> show functions;
 ​
 -- 测试功能
 hive> select toUp('abcdef');
 ​
 -- 删除函数 
 hive> drop temporary function if exists toUP;

配置文件加载

通过配置文件方式这种只要用Hive命令行启动都会加载函数

# 1、将编写的udf的jar包上传到服务器上
# 2、在Hive的安装目录的bin目录下创建一个配置文件,文件名:.hiverc
[root@hadoop01 hive]#  vi ./bin/.hiverc
add jar /hivedata/udf.jar;
create temporary function toup as 'com.qf.hive.FirstUDF';
3、启动Hive
 [root@hadoop01 hive]# hive
8.5.2.3 UDTF格式

UDTF是一对多的输入输出,实现UDTF需要完成下面步骤

  1. 继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

  2. 重写initlizer()process()close()

执行流程如下:

  • UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。

  • 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。

  • 最后close()方法调用,对需要清理的方法进行清理。

8.5.2.3.1 需求
 把"k1:v1;k2:v2;k3:v3"类似的的字符串解析成每一行多行,每一行按照key:value格式输出
8.5.2.3.2 源码

自定义函数如下:

  1.  package com.qf.hive;
  2.  ​
  3.   /**
  4.   * 定义一个UDTF的Hive自定义函数(一对多),默认要继承与GenericUDTF
  5.   */
  6.  public class ParseMapUDTF extends GenericUDTF {
  7.      //在initializez中初始化要输出字段的名称和类型
  8.      @Override
  9.      public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
  10.         //定义要输出列的名字的List,并且添加要输出的列名
  11.          List<String> structFieldNames = new ArrayList<>();
  12.          structFieldNames.add("key");
  13.          structFieldNames.add("value");
  14.  ​
  15.  //     定义要输出列的类型的List,并且添加要输出列的类型
  16.          List<ObjectInspector> objectInspectorList = new ArrayList<>();
  17.          objectInspectorList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  18.          objectInspectorList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  19.  ​
  20.          return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, objectInspectorList);
  21.     }
  22.  ​
  23.      // process方法用来处理输入的每行数据,每行数据处理调用一次process,类似于Mapper中的map方法
  24.      @Override
  25.      public void process(Object[] objects) throws HiveException {
  26.  //       得到第一个参数,转化为字符串,类似于-> name:zhang;age:30;address:shenzhen
  27.          String insputString = objects[0].toString();
  28.  ​
  29.  //       把上述例子字符串按照分号;切分为数组
  30.          String[] split = insputString.split(";");
  31.  ​
  32.  //           s=name:zhang
  33.          for (String s : split) {
  34.  //           把每个切分后的key value分开
  35.              String[] kvArray = s.split(":");
  36.  //           如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
  37.              forward(kvArray);
  38.         }
  39.     }
  40.  ​
  41.      @Override
  42.      public void close() throws HiveException {
  43.  ​
  44.     }
  45.  }
8.5.2.3.3 打包加载

对上述命令源文件打包为 udtf.jar,拷贝到服务器的 /opt/jar/ 目录

在Hive客户端把udtf.jar加入到Hive中,如下:

 hive> add jar /opt/jar/udtf.jar;
8.5.2.3.4 创建临时函数

在Hive客户端创建函数:

 # 创建一个临时函数parseMap
 hive> create temporary function parseMap as 'com.qf.hive.ParseMapUDTF'; 
 ​
 # 查看函数是否加入
 hive> show functions ;
8.5.2.3.5 测试临时函数
 hive> select parseMap("name:zhang;age:30;address:shenzhen");

结果如下:

 #map  key  
 name    zhang
 age 30
 address shenzhen
8.5.2.4 UDAF格式

用户自定义聚合函数。user defined aggregate function。多对一的输入输出 count sum max。定义一个UDAF需要如下步骤:

  1. UDF自定义函数必须是org.apache.hadoop.hive.ql.exec.UDAF的子类,并且包含一个火哥多个嵌套的的实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类。

  2. 函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。

  3. Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数

这几个函数作用如下:

函数说明
init实现接口UDAFEvaluator的init函数
iterate每次对一个新值进行聚集计算都会调用,计算函数要根据计算的结果更新其内部状态,,map端调用
terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,,map端调用
merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
terminate返回最终的聚集函数结果。 在reducer端调用
8.5.2.4.1 需求
 计算一组整数的最大值
8.5.2.4.2 代码
  1.  package com.qf.hive;
  2.  ​
  3.  /**
  4.   * 定义一个UDAF自定义函数类,默认要继承于UDAF类
  5.   */
  6.  //给当前函数添加描述信息,方便在desc function方法时查看
  7.  @Description(name="maxInt",value = "Find Max Value" ,extended = "Extended:Find Max Value for all Col")
  8.  public class MaxValueUDAF  extends UDAF {
  9.      //UDAF要求 并且包含一个或者多个嵌套的的实现类
  10.      // org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类。
  11.      public static class MaxnumIntUDAFEvaluator implements UDAFEvaluator {
  12.          //在静态类内部定义一个返回值,作为当前UDAF最后的唯一返回值,因为返回值要在Hive调用,所以必须要使用序列化类型
  13.          private IntWritable result;
  14.  ​
  15.          /**
  16.           * 在初始化是把返回值设为null,避免和上次调用时混淆
  17.           */
  18.          @Override
  19.          public void init() {
  20.              result=null;
  21.         }
  22.  ​
  23.          //定义一个函数iterate用来处理遍历多行时,每行值传进来是调用的函数
  24.          public boolean iterate(IntWritable value) {
  25.  //           把遍历每行的值value传入,和result比较,如果比result大,那么result就设置为value,否则result不变
  26.              if (value == null) {
  27.                  return true;
  28.             }
  29.  ​
  30.              //如果是第一行数据,那么直接给result赋值为第一行数据
  31.              if (result == null) {
  32.                  result = new IntWritable(value.get());
  33.             } else {
  34.  //               给result赋值result和value之间的最大值
  35.                  result.set(Math.max(result.get(),value.get()));
  36.             }
  37.              return true;
  38.         }
  39.  ​
  40.          /**
  41.           * 在map端进行并行执行后的结果
  42.           * @return
  43.           */
  44.          public IntWritable terminatePartial() {
  45.              return result;
  46.         }
  47.  ​
  48.          /**
  49.           * 接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
  50.           * @param other
  51.           * @return
  52.           */
  53.          public boolean merge(IntWritable other) {
  54.              return iterate( other );
  55.         }
  56.  ​
  57.          /**
  58.           * 将最终的结果返回为Hive
  59.           * @return
  60.           */
  61.          public IntWritable terminate() {
  62.              return result;
  63.         }
  64.     }
  65.  }
  66.  ​

注意:如果你要给自己写的函数加上desc function后的说明,可以在自定义函数类上面加上下面的注解:

 @Description(name = "maxValue",value = "Find Max Value",extended = "Extended:Find Max Value for all Col")
 ​
 hive> desc function  maxInt;
 hive> desc function extended maxInt;
8.5.2.4.3 打包加载

对上述命令源文件打包为udaf.jar,拷贝到服务器的/opt/jar/目录

在Hive客户端把udf.jar加入到Hive中,如下:

 hive> add jar /opt/jar/udaf.jar;
8.5.2.4.4 创建临时函数

在Hive客户端创建函数:

 hive> create temporary function maxInt as 'com.qf.hive.MaxValueUDAF';
 ​
 # 查看函数是否加入
 hive> show functions ;
8.5.2.4.5 测试临时函数
 # 使用前面的任意一个有int类型字段的表进行测试
 hive> select maxInt(id)  from dy_part1;

结果如下:

#结果
4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/821084
推荐阅读
相关标签
  

闽ICP备14008679号