赞
踩
在Hive中,函数主要分两大类型,一种是内置函数,一种是用户自定义函数。
show functions; desc function functionName;
1)当前系统时间函数:current_date()、current_timestamp()、unix_timestamp()
-- 函数1:current_date(); 当前系统日期 格式:"yyyy-MM-dd" -- 函数2:current_timestamp(); 当前系统时间戳: 格式:"yyyy-MM-dd HH:mm:ss.ms" -- 函数3:unix_timestamp(); 当前系统时间戳 格式:距离1970年1月1日0点的秒数。
2)日期转时间戳函数:unix_timestamp()
格式:unix_timestamp([date[,pattern]]) 案例: select unix_timestamp('1970-01-01 0:0:0'); -- 传入的日期时间是东八区的时间, 返回值是相对于子午线的时间来说的 select unix_timestamp('1970-01-01 8:0:0'); select unix_timestamp('0:0:0 1970-01-01',"HH:mm:ss yyyy-MM-dd"); select unix_timestamp(current_date());
3)时间戳转日期函数:from_unixtime
语法:from_unixtime(unix_time[,pattern]) 案例: select from_unixtime(1574092800); select from_unixtime(1574096401,'yyyyMMdd'); select from_unixtime(1574096401,'yyyy-MM-dd HH:mm:ss'); select from_unixtime(0,'yyyy-MM-dd HH:mm:ss'); select from_unixtime(-28800,'yyyy-MM-dd HH:mm:ss');
4)计算时间差函数:datediff()、months_between()
格式:datediff(date1, date2) - Returns the number of days between date1 and date2 select datediff('2019-11-20','2019-11-01'); 格式:months_between(date1, date2) - returns number of months between dates date1 and date2 select months_between('2019-11-20','2019-11-01'); select months_between('2019-10-30','2019-11-30'); select months_between('2019-10-31','2019-11-30'); select months_between('2019-11-00','2019-11-30');
5)日期时间分量函数:year()、month()、day()、hour()、minute()、second()
案例: select year(current_date); select month(current_date); select day(current_date); select year(current_timestamp); select month(current_timestamp); select day(current_timestamp); select hour(current_timestamp); select minute(current_timestamp); select second(current_timestamp); select dayofmonth(current_date); select weekofyear(current_date)
6)日期定位函数:last_day()、next_day()
--月末: select last_day(current_date) --下周 select next_day(current_date,'thursday');
7)日期加减函数:date_add()、date_sub()、add_months()
格式: date_add(start_date, num_days) date_sub(start_date, num_days) 案例: select date_add(current_date,1); select date_sub(current_date,90); select add_months(current_date,1);
定位案例:
--当月第1天: select date_sub(current_date,dayofmonth(current_date)-1) --下个月第1天: select add_months(date_sub(current_date,dayofmonth(current_date)-1),1)
8) 字符串转日期:to_date()
(字符串必须为:yyyy-MM-dd格式) select to_date('2017-01-01 12:12:12');
9)日期转字符串(格式化)函数:date_format
select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'); select date_format(current_date(),'yyyyMMdd'); select date_format('2017-01-01','yyyy-MM-dd HH:mm:ss');
lower--(转小写) select lower('ABC'); upper--(转大写) select upper('abc'); length--(字符串长度,字符数) select length('abc'); concat--(字符串拼接) select concat("A", 'B'); concat_ws --(指定分隔符) select concat_ws('-','a' ,'b','c'); substr--(求子串) select substr('abcde',3); split(str,regex)--切分字符串,返回数组。 select split("a-b-c-d-e-f","-");
cast(value as type) -- 类型转换 select cast('123' as int)+1;
round --四舍五入((42.3 =>42)) select round(42.3); ceil --向上取整(42.3 =>43) select ceil(42.3); floor --向下取整(42.3 =>42) select floor(42.3);
nvl(value,default value):如果value为null,则使用default value,否则使用本身value. isnull() isnotnull() case when then ....when ...then.. else... end if(p1,p2,p3) coalesce(col1,col2,col3...)返回第一个不为空的
先来看一下这个需求:求每个部门的员工信息以及部门的平均工资。在mysql中如何实现呢
- SELECT emp.*, avg_sal
- FROM emp
- JOIN (
- SELECT deptno
- , round(AVG(ifnull(sal, 0))) AS avg_sal
- FROM emp
- GROUP BY deptno
- ) t
- ON emp.deptno = t.deptno
- ORDER BY deptno;
-
-
- select emp.*,(select avg(ifnull(sal,0)) from emp B where B.deptno = A.deptno )
- from emp A;
通过这个需求我们可以看到,如果要查询详细记录和聚合数据,必须要经过两次查询,比较麻烦。
这个时候,我们使用窗口函数,会方便很多。那么窗口函数是什么呢?
-1) 窗口函数又名开窗函数,属于分析函数的一种。 -2) 是一种用于解决复杂报表统计需求的函数。 -3) 窗口函数常用于计算基于组的某种值,它和聚合函数的不同之处是:对于每个组返回多行,而聚合函数对于每个组只返回一行。 简单的说窗口函数对每条详细记录开一个窗口,进行聚合统计的查询 -4) 开窗函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化。 -5) 窗口函数一般不单独使用 -6) 窗口函数内也可以分组和排序
注意:默认mysql老版本没有支持,在最新的8.0版本中支持, Oracle和Hive中都支持窗口函数
数据准备(order.txt)
姓名,购买日期,购买数量 -1. 创建order表:
- create table if not exists t_order
- (
- name string,
- orderdate string,
- cost int
- ) row format delimited fields terminated by ',';
- -2. 加载数据:
- load data local inpath "./data/order.txt" into table t_order;
需求:查询每个订单的信息,以及订单的总数
-- 1.不使用窗口函数
-- 查询所有明细 select * from t_order; # 查询总量 select count(*) from t_order;
-- 2.使用窗口函数:通常格式为 ==可用函数+over()函数==
select *, count(*) over() from t_order;
注意:
窗口函数是针对每一行数据的.
如果over中没有指定参数,默认窗口大小为全部结果集
需求:查询在2018年1月份购买过的顾客购买明细及总人数
- select *,count(*) over()
- from t_order
- where substring(orderdate,1,7) = '2024-01';
在over窗口中进行分组,对某一字段进行分组统计,窗口大小就是同一个组的所有记录
语法: over(distribute by colname[,colname.....])
需求:查看顾客的购买明细及月购买总额
select name, orderdate, cost, sum(cost) over (distribute by month(orderdate)) from t_order; saml 2018-01-01 10 205 saml 2018-01-08 55 205 tony 2018-01-07 50 205 saml 2018-01-05 46 205 tony 2018-01-04 29 205 tony 2018-01-02 15 205 saml 2018-02-03 23 23 mart 2018-04-13 94 341 saml 2018-04-06 42 341 mart 2018-04-11 75 341 mart 2018-04-09 68 341 mart 2018-04-08 62 341 neil 2018-05-10 12 12 neil 2018-06-12 80 80
需求:查看顾客的购买明细及每个顾客的月购买总额
select name, orderdate, cost, sum(cost) over (distribute by name, month(orderdate)) from t_order; mart 2018-04-13 94 299 mart 2018-04-11 75 299 mart 2018-04-09 68 299 mart 2018-04-08 62 299 neil 2018-05-10 12 12 neil 2018-06-12 80 80 saml 2018-01-01 10 111 saml 2018-01-08 55 111 saml 2018-01-05 46 111 saml 2018-02-03 23 23 saml 2018-04-06 42 42 tony 2018-01-07 50 94 tony 2018-01-04 29 94 tony 2018-01-02 15 94
sort by子句会让输入的数据强制排序 (强调:当使用排序时,窗口会在组内逐行变大)
语法: over([distribute by colname] [sort by colname [desc|asc]])
需求:查看顾客的购买明细及每个顾客的月购买总额,并且按照日期降序排序
- select name, orderdate, cost,
- sum(cost) over (distribute by name, month(orderdate) sort by orderdate desc)
- from t_order;
注意:可以使用partition by + order by 组合来代替distribute by+sort by组合
- select name, orderdate, cost,
- sum(cost) over (partition by name, month(orderdate) order by orderdate desc)
- from t_order;
注意:也可以在窗口函数中,只写排序,窗口大小是全表记录。
select name, orderdate, cost, sum(cost) over (order by orderdate desc) from t_order; neil 2018-06-12 80 80 -统计信息会逐行增加 neil 2018-05-10 12 92 mart 2018-04-13 94 186 mart 2018-04-11 75 261 mart 2018-04-09 68 329 mart 2018-04-08 62 391 saml 2018-04-06 42 433 saml 2018-02-03 23 456 saml 2018-01-08 55 511 tony 2018-01-07 50 561 saml 2018-01-05 46 607 tony 2018-01-04 29 636 tony 2018-01-02 15 651 saml 2018-01-01 10 661
如果要对窗口的结果做更细粒度的划分,那么就使用window子句,常见的有下面几个
PRECEDING:往前 FOLLOWING:往后 CURRENT ROW:当前行 UNBOUNDED:起点, UNBOUNDED PRECEDING:表示从前面的起点, UNBOUNDED FOLLOWING:表示到后面的终点
一般window子句都是==rows==开头
案例:
- select name,orderdate,cost,
- sum(cost) over() as sample1,--所有行相加
-
- sum(cost) over(partition by name) as sample2,-- 按name分组,组内数据相加
-
- sum(cost) over(partition by name order by orderdate) as sample3,-- 按name分组,组内数据累加
-
- sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as sample4 ,-- 与sample3一样,由起点到当前行的聚合
-
- sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING and current row) as sample5, -- 当前行和前面一行做聚合
-
- sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING AND 1 FOLLOWING ) as sample6,-- 当前行和前边一行及后面一行
-
- sum(cost) over(partition by name order by orderdate rows between current row and UNBOUNDED FOLLOWING ) as sample7 -- 当前行及后面所有行
-
- from t_order;
需求:查看顾客到目前为止的购买总额
select name, t_order.orderdate, cost, sum(cost) over (partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as allCount from t_order; mart 2018-04-08 62 62 mart 2018-04-09 68 130 mart 2018-04-11 75 205 mart 2018-04-13 94 299 neil 2018-05-10 12 12 neil 2018-06-12 80 92 saml 2018-01-01 10 10 saml 2018-01-05 46 56 saml 2018-01-08 55 111 saml 2018-02-03 23 134 saml 2018-04-06 42 176 tony 2018-01-02 15 15 tony 2018-01-04 29 44 tony 2018-01-07 50 94
需求:求每个顾客最近三次的消费总额
select name,orderdate,cost, sum(cost) over(partition by name order by orderdate rows between 2 preceding and current row) from t_order;
ntile 是Hive很强大的一个分析函数。可以看成是:它把有序的数据集合 平均分配 到 指定的数量(num)个桶中, 将桶号分配给每一行。如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。
例子:
select name,orderdate,cost, ntile(3) over(partition by name), # 按照name进行分组,在分组内将数据切成3份 from t_order; mart 2018-04-13 94 1 mart 2018-04-11 75 1 mart 2018-04-09 68 2 mart 2018-04-08 62 3 neil 2018-06-12 80 1 neil 2018-05-10 12 2 saml 2018-01-01 10 1 saml 2018-01-08 55 1 saml 2018-04-06 42 2 saml 2018-01-05 46 2 saml 2018-02-03 23 3 tony 2018-01-07 50 1 tony 2018-01-02 15 2 tony 2018-01-04 29 3
lag返回当前数据行的前第n行的数据
lead返回当前数据行的后第n行的数据
需求:查询顾客上次购买的时间
select name,orderdate,cost,lag(orderdate,1) over(partition by name order by orderdate) as time1 from t_order; lag(colName,n[,default value]): 取字段的前第n个值。如果为null,显示默认值 select name,orderdate,cost,lag(orderdate,1,'1990-01-01') over(partition by name order by orderdate ) as time1 from t_order;
取得顾客下次购买的时间
select name,orderdate,cost, lead(orderdate,1) over(partition by name order by orderdate ) as time1 from t_order;
first_value 取分组内排序后,截止到当前行,第一个值
last_value 分组内排序后,截止到当前行,最后一个值
案例:
- select name,orderdate,cost,
- first_value(orderdate) over(partition by name order by orderdate) as time1,
- last_value(orderdate) over(partition by name order by orderdate) as time2
- from t_order;
-
-
- select name,orderdate,cost,
- first_value(orderdate) over(partition by name order by orderdate) as time1,
- first_value(orderdate) over(partition by name order by orderdate desc) as time2
- from t_order;
第一种函数:row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列
效果如下: 98 1 97 2 97 3 96 4 95 5 95 6 没有并列名次情况,顺序递增
第二种函数:RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位
效果如下: 98 1 97 2 97 2 96 4 95 5 95 5 94 7 有并列名次情况,顺序跳跃递增
第三种函数:DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位
效果如下: 98 1 97 2 97 2 96 3 95 4 95 4 94 5 有并列名次情况,顺序递增
准备数据
userid classno score 1 gp1808 80 2 gp1808 92 3 gp1808 84 4 gp1808 86 5 gp1808 88 6 gp1808 70 7 gp1808 98 8 gp1808 84 9 gp1808 86 10 gp1807 90 11 gp1807 92 12 gp1807 84 13 gp1807 86 14 gp1807 88 15 gp1807 80 16 gp1807 92 17 gp1807 84 18 gp1807 86 19 gp1805 80 20 gp1805 92 21 gp1805 94 22 gp1805 86 23 gp1805 88 24 gp1805 80 25 gp1805 92 26 gp1805 94 27 gp1805 86 create table if not exists stu_score( userid int, classno string, score int ) row format delimited fields terminated by ' '; load data local inpath './data/stu_score.txt' overwrite into table stu_score;
需求1:对每次考试按照考试成绩倒序
- select *,
- row_number() over(partition by classno order by score desc) rn1
- from stu_score;
-
- select *,
- rank() over(partition by classno order by score desc) rn2
- from stu_score;
-
- select *,
- dense_rank() over(distribute by classno sort by score desc) rn3
- from stu_score;
-
- select *,
- dense_rank() over(order by score desc) `全年级排名`
- from stu_score;
需求2:获取每次考试的排名情况
- select *,
- -- 没有并列,相同名次依顺序排
- row_number() over(distribute by classno sort by score desc) rn1,
- -- rank():有并列,相同名次空位
- rank() over(distribute by classno sort by score desc) rn2,
- -- dense_rank():有并列,相同名次不空位
- dense_rank() over(distribute by classno sort by score desc) rn3
- from stu_score;
需求3:求每个班级的前三名
- select *
- from
- (
- select *,
- row_number() over(partition by classno order by score desc) rn1
- from stu_score
- ) A
- where rn1 < 4;
hive的内置函数满足不了所有的业务需求。hive提供很多的模块可以自定义功能,比如:自定义函数、serde、输入输出格式等。而自定义函数可以分为以下三类:
1)UDF:user defined function
用户自定义函数,一对一的输入输出 (最常用的)。
2)UDAF:user defined aggregation function
用户自定义聚合函数,多对一的输入输出,比如:count sum max。
3)UDTF:user defined table-generate function
用户自定义表生产函数 一对多的 输入输出,比如:lateral view explode
在pom.xml,加入以下maven的依赖包 请查看code/pom.xml
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>3.1.2</version>
- </dependency>
定义UDF函数要注意下面几点:
继承org.apache.hadoop.hive.ql.exec.UDF
重写evaluate
(),这个方法不是由接口定义的,因为它可接受的参数的个数,数据类型都是不确定的。Hive会检查UDF,看能否找到和函数调用相匹配的evaluate()方法
自定义函数第一个案例
- public class FirstUDF extends UDF {
- public String evaluate(String str){
- //关于默认输出值是null,还是"",这个要看需求具体定义,在这里先默认定义为null,
- String result = null;
- //1、检查输入参数
- if (!StringUtils.isEmpty(str)){
- result = str.toUpperCase();
- }
- return result;
- }
-
- //调试自定义函数
- public static void main(String[] args){
- System.out.println(new FirstUDF().evaluate("hadoopedu"));
- }
- }
在Hive3.0中UDF已经过时了,官方建议我们使用GenericUDF
- public class FirstGenericUDF extends GenericUDF {
- /**
- * Serde实现数据序列化和反序列化以及提供一个辅助类ObjectInspector帮助使用者访问需要序列化或者反序列化的对象。
- *
- * Serde层构建在数据存储和执行引擎之间,实现数据存储+中间数据存储和执行引擎的解耦。
- */
- StringObjectInspector word;
- // 只调用一次,在任何evaluate()调用之前,你可以接收到一个可以表示函数输入参数类型的object inspectors数组
- // 这是你用来验证该函数是否接收正确的参数类型和参数个数的地方
- @Override
- public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
- if (objectInspectors.length != 1){
- throw new UDFArgumentLengthException("FirstGenericUDF take only one arguments :String");
- }
- ObjectInspector a = objectInspectors[0];
- if (!(a instanceof StringObjectInspector)){
- throw new UDFArgumentException("argument must be String");
- }
- this.word = (StringObjectInspector) a;
- return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- }
-
- // 这个类似于简单API的evaluat方法,它可以读取输入数据和返回结果
- //DeferredObject:Hive内部封装的一个公共的,稳定的接口,对对象进行包装,有一个基本的子类DeferredJavaObject
- @Override
- public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
- String result = null;
-
- String str = this.word.getPrimitiveJavaObject(deferredObjects[0].get());
- System.out.println("category:"+this.word.getCategory());
-
- if (!StringUtils.isEmpty(str)){
- result = str.toUpperCase();
- }
- return result;
- }
-
- // 该方法无关紧要,我们可以返回任何东西,但应当是描述该方法的字符串
- @Override
- public String getDisplayString(String[] strings) {
- return "FirstGenericUDF";
- }
-
- public static void main(String[] args) throws HiveException {
- //获取当前的实例对象
- FirstGenericUDF udf = new FirstGenericUDF();
- //获取字符串的序列化格式
- ObjectInspector stringio = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- JavaStringObjectInspector resultInspector = (JavaStringObjectInspector) udf.initialize(new ObjectInspector[]{stringio});
- //对数据进行处理
- Object result = udf.evaluate(new DeferredObject[]{new DeferredJavaObject("hehe")});
- //得到具体类型的数据
- System.out.println(resultInspector.getPrimitiveJavaObject(result));
- }
- }
临时加载
# 将编写的udf的jar包上传到服务器上. # 并且将jar包添加到Hive的class path中 # 进入到Hive客户端,执行下面命令 hive> add jar /opt/jar/udf.jar
-- 创建一个临时函数名,要跟上面Hive在同一个session里面: hive> create temporary function toUP as 'com.qf.hive.FirstUDF'; -- 检查函数是否创建成功 hive> show functions; -- 测试功能 hive> select toUp('abcdef'); -- 删除函数 hive> drop temporary function if exists toUP;
配置文件加载
通过配置文件方式这种只要用Hive命令行启动都会加载函数
# 1、将编写的udf的jar包上传到服务器上 # 2、在Hive的安装目录的bin目录下创建一个配置文件,文件名:.hiverc [root@hadoop01 hive]# vi ./bin/.hiverc add jar /hivedata/udf.jar; create temporary function toup as 'com.qf.hive.FirstUDF'; 3、启动Hive [root@hadoop01 hive]# hive
UDTF是一对多的输入输出,实现UDTF需要完成下面步骤
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
重写initlizer()
、process()
、close()
。
执行流程如下:
UDTF首先会调用initialize
方法,此方法返回UDTF的返回行的信息(返回个数,类型)。
初始化完成后,会调用process
方法,真正的处理过程在process
函数中,在process
中,每一次forward()
调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
最后close()
方法调用,对需要清理的方法进行清理。
把"k1:v1;k2:v2;k3:v3"类似的的字符串解析成每一行多行,每一行按照key:value格式输出
自定义函数如下:
- package com.qf.hive;
-
- /**
- * 定义一个UDTF的Hive自定义函数(一对多),默认要继承与GenericUDTF
- */
- public class ParseMapUDTF extends GenericUDTF {
- //在initializez中初始化要输出字段的名称和类型
- @Override
- public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
- //定义要输出列的名字的List,并且添加要输出的列名
- List<String> structFieldNames = new ArrayList<>();
- structFieldNames.add("key");
- structFieldNames.add("value");
-
- // 定义要输出列的类型的List,并且添加要输出列的类型
- List<ObjectInspector> objectInspectorList = new ArrayList<>();
- objectInspectorList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- objectInspectorList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
-
- return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, objectInspectorList);
- }
-
- // process方法用来处理输入的每行数据,每行数据处理调用一次process,类似于Mapper中的map方法
- @Override
- public void process(Object[] objects) throws HiveException {
- // 得到第一个参数,转化为字符串,类似于-> name:zhang;age:30;address:shenzhen
- String insputString = objects[0].toString();
-
- // 把上述例子字符串按照分号;切分为数组
- String[] split = insputString.split(";");
-
- // s=name:zhang
- for (String s : split) {
- // 把每个切分后的key value分开
- String[] kvArray = s.split(":");
- // 如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
- forward(kvArray);
- }
- }
-
- @Override
- public void close() throws HiveException {
-
- }
- }
对上述命令源文件打包为 udtf.jar,拷贝到服务器的 /opt/jar/ 目录
在Hive客户端把udtf.jar加入到Hive中,如下:
hive> add jar /opt/jar/udtf.jar;
在Hive客户端创建函数:
# 创建一个临时函数parseMap hive> create temporary function parseMap as 'com.qf.hive.ParseMapUDTF'; # 查看函数是否加入 hive> show functions ;
hive> select parseMap("name:zhang;age:30;address:shenzhen");
结果如下:
#map key name zhang age 30 address shenzhen
用户自定义聚合函数。user defined aggregate function。多对一的输入输出 count sum max。定义一个UDAF需要如下步骤:
UDF自定义函数必须是org.apache.hadoop.hive.ql.exec.UDAF的子类,并且包含一个火哥多个嵌套的的实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类。
函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
这几个函数作用如下:
函数 | 说明 |
---|---|
init | 实现接口UDAFEvaluator的init函数 |
iterate | 每次对一个新值进行聚集计算都会调用,计算函数要根据计算的结果更新其内部状态,,map端调用 |
terminatePartial | 无参数,其为iterate函数轮转结束后,返回轮转数据,,map端调用 |
merge | 接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。 |
terminate | 返回最终的聚集函数结果。 在reducer端调用 |
计算一组整数的最大值
- package com.qf.hive;
-
- /**
- * 定义一个UDAF自定义函数类,默认要继承于UDAF类
- */
- //给当前函数添加描述信息,方便在desc function方法时查看
- @Description(name="maxInt",value = "Find Max Value" ,extended = "Extended:Find Max Value for all Col")
- public class MaxValueUDAF extends UDAF {
- //UDAF要求 并且包含一个或者多个嵌套的的实现类
- // org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类。
- public static class MaxnumIntUDAFEvaluator implements UDAFEvaluator {
- //在静态类内部定义一个返回值,作为当前UDAF最后的唯一返回值,因为返回值要在Hive调用,所以必须要使用序列化类型
- private IntWritable result;
-
- /**
- * 在初始化是把返回值设为null,避免和上次调用时混淆
- */
- @Override
- public void init() {
- result=null;
- }
-
- //定义一个函数iterate用来处理遍历多行时,每行值传进来是调用的函数
- public boolean iterate(IntWritable value) {
- // 把遍历每行的值value传入,和result比较,如果比result大,那么result就设置为value,否则result不变
- if (value == null) {
- return true;
- }
-
- //如果是第一行数据,那么直接给result赋值为第一行数据
- if (result == null) {
- result = new IntWritable(value.get());
- } else {
- // 给result赋值result和value之间的最大值
- result.set(Math.max(result.get(),value.get()));
- }
- return true;
- }
-
- /**
- * 在map端进行并行执行后的结果
- * @return
- */
- public IntWritable terminatePartial() {
- return result;
- }
-
- /**
- * 接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
- * @param other
- * @return
- */
- public boolean merge(IntWritable other) {
- return iterate( other );
- }
-
- /**
- * 将最终的结果返回为Hive
- * @return
- */
- public IntWritable terminate() {
- return result;
- }
- }
- }
-
注意:如果你要给自己写的函数加上desc function后的说明,可以在自定义函数类上面加上下面的注解:
@Description(name = "maxValue",value = "Find Max Value",extended = "Extended:Find Max Value for all Col") hive> desc function maxInt; hive> desc function extended maxInt;
对上述命令源文件打包为udaf.jar,拷贝到服务器的/opt/jar/目录
在Hive客户端把udf.jar加入到Hive中,如下:
hive> add jar /opt/jar/udaf.jar;
在Hive客户端创建函数:
hive> create temporary function maxInt as 'com.qf.hive.MaxValueUDAF'; # 查看函数是否加入 hive> show functions ;
# 使用前面的任意一个有int类型字段的表进行测试 hive> select maxInt(id) from dy_part1;
结果如下:
#结果 4
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。