赞
踩
在Hive的使用中,系统内置函数有时无法满足业务需求,这时就需要开发者自己编写函数来实现业务需求。
自定义函数,极大丰富了个性化定制的需要,使Hive得到了极大的拓展。
Hive有三种自定义函数,可以实现不同方面的需求。
常见的函数类型,可以操作单个数据行,且产生一个数据行作为输出,大多数函数为这一类。
用户自定义表生成函数,用于操作单个输入数据行,产生多个数据行(一张表)作为输出。
用户自定义聚集函数,接收多个输入数据行,产生一个输出数据行。
Pom文件导入
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
1)首先,需要Java代码继承UDF,但是编译器不会要求必须实现某个函数。需要开发者自己实现evaluate函数,该函数可以重载多个,设置不同的参数等。
package com; import org.apache.hadoop.hive.ql.exec.UDF; public class MyUDF extends UDF { public int evaluate(int data) { return data + 10; } //可以重载多个 public int evaluate(int data,String temp) { return data + 200; } }
2)然后进行打包,传入服务器中(可以是任意路径,但一般放入Hive安装目录的lib目录下)。
3)进入Hive命令行,开始操作。
## 把jar包加入进来
add jar /opt/hive/lib/Hive-UDF-CustomFunctions-1.0-SNAPSHOT.jar;
## 创建函数(create function [函数名] as [包名+类名])
create function ass_udf as 'com.MyUDF';
4)测试
## 可以自行创建一张表,有一个列为数字即可(示例中是把数字加10)
select id , name , score , add_udf(score) as add_score from udf_test;
1)java代码需要继承GenericUDTF,并实现必要函数。在示例中,写了一个将一行数据,根据传入的参数分割成多行的代码。
package com; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.*; public class MyUDTF extends GenericUDTF { private List<String> list = new ArrayList<String>(); //定义输出数据的列名和数据类型 @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //输出数据的列名 ArrayList<String> fieldNames = new ArrayList<String>(); //列名取名 fieldNames.add("word"); //输出数据的类型 ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //根据将来的返回值确定返回的类型 fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //返回定义的数据 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //对数据进行操作 @Override public void process(Object[] args) throws HiveException { //获取数据 String data = args[0].toString(); //获取分隔符 String splitKey = args[1].toString(); //切分数据 String[] words = data.split(splitKey); //遍历输出 for (String word : words) { //将数据放入集合 list.clear(); list.add(word); //使用自带方法写出 forward(list); } } @Override public void close() throws HiveException { } }
2)同样是进行打包传入、创建函数。
3)测试一
## 创建了一张表,包含id、name、score三列。
## 其他列随意,name列中,包含的数据需要存在分隔符数据的,比如‘Mike,Tom,John,Allen’。
## 测试函数
select split_udtf("aa,bb,cc",',');
## 进行表测试
select split_udtf(name,',') from udtf_test;
4)单独查询列没有问题,若这时需要进行多列的查询,则会报错:UDTF’s are not supported outside the SELECT clause, nor nested in expressions。这样直接查询是UDTF函数所不执行的,所以需要使用lateral view。
5) 测试二
## 使用lateral view
select id,names,score from udtf_test lateral view split_udtf(name,',') temp as names;
## 事实上,如果需要达到行转列的效果,Hive本身已经提供很好的支持
select id,names,score from udtf_test lateral view explode(split(name,',')) temp as names;
1) 在hive中创建一张简单的测试表并插入数据
-- 建表
create table udaf_test(id int,name string,subject string,grade string);
-- 插入数据
insert into udaf_test values (1,'Mike','math','B'),(1,'Mike','science','C'),(1,'Mike','lanuage','B') ,(2,'Tom','math','B'),(2,'Tom','science','C'),(2,'Tom','sports','A');
2)编写java代码
init函数初始化
iterate接收传入的参数,进行迭代。返回类型为boolean。
terminatePartial无参数,类似于 hadoop的Combiner。
merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
terminate返回最终的聚集函数结果。
package com; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import java.util.HashMap; import java.util.Map; public class MyUDAF extends UDAF { public static class Evaluator implements UDAFEvaluator { private Map<String, String> map; //初始化 public Evaluator() { init(); } // 初始化函数间传递的中间变量 public void init() { map = new HashMap<String, String>(); } // map阶段,返回值为boolean类型,当为true则程序继续执行,当为false则程序退出 public boolean iterate(String course, String score) { map.put(course, score); return true; } // 类似于combiner,局部聚合 public Map<String, String> terminatePartial() { return map; } // reduce 阶段,用于逐个迭代处理map当中每个不同key对应的 terminatePartial的结果 public boolean merge(Map<String, String> output) { this.map.putAll(output); return true; } // 处理merge计算完成后的结果,即对merge完成后的结果做最后的业务处理 public String terminate() { return map.toString(); } } }
3)一样的流程,添加jar包,创建一个函数
## 添加jar包
add jar /opt/hive/lib/Hive-UDF-CustomFunctions-1.0-SNAPSHOT.jar;
## 创建一个临时的函数(加入temporary关键字,重启hive自动删除)
create temporary function tempmerge as 'com.MyUDAF';
4)查询测试
select id,name,tempmerge(subject,grade) from udaf_test group by id,name;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。