赞
踩
在上一篇文章中,我们对第一种用户定义函数(UDF)进行了基础介绍。接下来,本文将带您深入了解剩余的两种UDF函数类型。
第一种方式是 Simple(简单) 方式,即继承 org.apache.hadoop.hive.ql.exec.UDAF 类,并在派生类中以静态内部类的方式实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口。这个计算类将负责执行具体的聚合逻辑,具体步骤如下:
a)初始化(init):首先,我们需要实现UDAFEvaluator接口的init方法,用于初始化聚合过程中所需的任何资源或状态。
b)迭代(iterate):接下来,iterate方法将被用来处理传入的数据。此方法将逐个接收数据项,并更新聚合状态。它返回一个布尔值,指示是否继续迭代或停止。
c)部分终止(terminatePartial):在迭代完成后,terminatePartial方法将被调用。它的作用类似于Hadoop中的Combiner,用于返回一个中间聚合结果,以便在多个任务之间进行合并。
d)合并(merge):merge方法用于接收来自terminatePartial的中间结果,并将其合并以形成更接近最终结果的聚合状态。此方法同样返回一个布尔值,指示合并操作是否成功。
e)最终终止(terminate):最后,terminate方法将被用来生成并返回聚合操作的最终结果。
import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.io.IntWritable; // 自定义的UDAF类,用于计算最大值 public class MyMaxUDAF extends UDAF { // 实现UDAFEvaluator接口的静态内部类 static public class MaxIntEvaluator implements UDAFEvaluator { // 存放当前聚合操作过程中的最大值 private int mMax; // 用于标记聚合数据集是否为空 private boolean mEmpty; // 构造方法,用于执行初始化操作 public MaxIntEvaluator() { super(); init(); } // 初始化方法,用于重置聚合状态 public void init() { // 初始化最大值为0 mMax = 0; // 初始化聚合数据集为空 mEmpty = true; } // 迭代处理每一行数据。每次调用处理一行记录 public boolean iterate(IntWritable o) { // 检查传入的数据是否为null if (o != null) { // 如果当前聚合数据集为空,则直接将当前值设置为最大值 if (mEmpty) { mMax = o.get(); mEmpty = false; // 更新状态,标记聚合数据集不再为空 } else { // 聚合数据集不为空时,用当前值和之前的最大值比较,保留较大的那个 mMax = Math.max(mMax, o.get()); } } return true; } // 输出Map阶段处理结果的方法,返回当前的最大值 public IntWritable terminatePartial() { // 如果聚合数据集为空,则返回null;否则,返回当前的最大值 return mEmpty ? null : new IntWritable(mMax); } // Combine/Reduce阶段,合并处理结果 public boolean merge(IntWritable o) { // 通过调用iterate方法进行合并操作 return iterate(o); } // 返回最终的聚集函数结果 public IntWritable terminate() { // 如果聚合数据集为空,则返回null;否则,返回最终的最大值 return mEmpty ? null : new IntWritable(mMax); } } }
编写简单的UDAF(用户定义聚合函数)相对容易,但这种方法由于依赖Java的反射机制,可能会牺牲一些性能,并且它不支持变长参数等高级特性。相比之下,通用UDAF(Generic UDAF)提供了这些高级特性的支持,虽然它的编写可能不如简单UDAF那样直接明了。
Hive社区推崇使用通用UDAF作为最佳实践,建议采用新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
来替代旧的UDAF接口,并推荐使用org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
抽象类来替换旧的UDAFEvaluator
接口。这种新方法不仅提升了性能,还增加了灵活性,使得UDAF的功能更加强大和多样化。
import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.io.IntWritable; // 通过继承AbstractGenericUDAFResolver并使用Description注解来定义一个新的UDAF。 @Description(name = "max_int", value = "_FUNC_(int) - Returns the maximum value of the column") public class MyMaxUDAF2 extends AbstractGenericUDAFResolver { // 聚合函数的求值器内部类,继承自GenericUDAFEvaluator。 public static class MaxIntEvaluator extends GenericUDAFEvaluator { // 用于存储输入参数的ObjectInspector。 private PrimitiveObjectInspector inputOI; // 用于存储聚合结果。 private IntWritable result; // 初始化方法,用于设置聚合函数的参数和返回类型。 @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); // 确认参数是原始类型并初始化inputOI。 inputOI = (PrimitiveObjectInspector) parameters[0]; // 设置聚合函数的返回类型为可写的整型。 return PrimitiveObjectInspectorFactory.writableIntObjectInspector; } // 创建聚合缓冲区对象的方法。 @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { MaxAggBuffer buffer = new MaxAggBuffer(); reset(buffer); return buffer; } // 重置聚合缓冲区对象的方法。 @Override public void reset(AggregationBuffer agg) throws HiveException { ((MaxAggBuffer) agg).setValue(Integer.MIN_VALUE); } // 迭代方法,用于处理每一行数据。 @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { if (parameters[0] != null) { MaxAggBuffer myagg = (MaxAggBuffer) agg; // 从参数中获取整数值并更新聚合缓冲区中的最大值。 int value = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI); if (value > myagg.value) { myagg.setValue(value); } } } // 终止部分聚合的方法,通常返回最终聚合结果。 @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } // 合并部分聚合结果的方法。 @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { MaxAggBuffer myagg = (MaxAggBuffer) agg; // 从部分聚合结果中获取整数值并更新聚合缓冲区中的最大值。 int partialValue = PrimitiveObjectInspectorUtils.getInt(partial, inputOI); if (partialValue > myagg.value) { myagg.setValue(partialValue); } } } // 终止方法,用于返回最终聚合结果。 @Override public Object terminate(AggregationBuffer agg) throws HiveException { MaxAggBuffer myagg = (MaxAggBuffer) agg; // 创建IntWritable对象并设置聚合结果,然后返回。 result = new IntWritable(myagg.value); return result; } // 聚合缓冲区对象的内部类定义,用于存储聚合过程中的中间状态。 static class MaxAggBuffer implements AggregationBuffer { int value; // 聚合缓冲区中的值 // 设置聚合缓冲区中的值 void setValue(int val) { value = val; } } } }
特性/UDAF类型 | 简单UDAF | 通用UDAF |
---|---|---|
性能 | 依赖反射,性能较低 | 不依赖反射,性能较高 |
参数灵活性 | 不支持变长参数 | 支持变长参数 |
易用性 | 编写简单直观 | 编写复杂,功能强大 |
推荐使用 | 适合简单聚合操作 | 适合复杂聚合逻辑和高性能需求 |
接口和抽象类 | 旧的UDAF接口和UDAFEvaluator | 新的AbstractGenericUDAFResolver 和GenericUDAFEvaluator |
功能特性 | 功能有限,实现常见聚合 | 支持复杂迭代逻辑和自定义终止逻辑 |
应用场景 | - 快速开发和原型设计 - 实现基本聚合操作,如求和、平均值 - 对性能要求不高的小型项目 | - 实现复杂的数据分析和处理 - 大数据量处理,需要高性能 - 需要变长参数支持的复杂查询 - 高级功能实现,如窗口函数、复杂的分组聚合 |
选择UDAF类型时应根据实际需求和上述特性来决定,以确保既能满足功能需求,又能获得较好的性能表现。
继承GenericUDTF类的步骤:
开发自定义的表生成函数(UDTF)时,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象类,它为UDTF提供了一个通用的实现框架。
实现initialize()、process()和close()
方法:
为了完成自定义UDTF的功能,需要实现三个核心方法:initialize()用于初始化UDTF,process()用于处理输入数据并生成输出,close()用于执行清理操作。
- initialize()
方法的调用与作用:在UDTF的执行过程中,initialize()方法是首先被调用的。它负责初始化UDTF的状态,并返回关于UDTF返回行的信息,包括返回行的个数和类型。
process()
方法的执行:initialize()方法执行完成后,接下来会调用process()方法。该方法是UDTF的核心,负责对输入参数进行处理。在process()方法中,可以通过调用forward()方法将处理结果逐行返回。close()
方法的清理作用:在UDTF的所有处理工作完成后,最终会调用close()方法。这个方法用于执行必要的清理工作,如释放资源或关闭文件句柄等,确保UDTF在结束时不会留下任何未处理的事务。import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; /** * 自定义一个UDTF,实现将一个由任意分割符分隔的字符串切割成独立的单词。 **/ public class LineToWordUDTF extends GenericUDTF { // 用于存储输出单词的集合 private ArrayList<String> outList = new ArrayList<String>(); /** * initialize方法:当GenericUDTF函数初始化时被调用一次,用于执行一些初始化操作。 * 包括: * 1. 判断函数参数个数 * 2. 判断函数参数类型 * 3. 确定函数返回值类型 */ @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { // 1. 定义输出数据的列名和类型 List<String> fieldNames = new ArrayList<String>(); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); // 2. 添加输出数据的列名和类型 fieldNames.add("lineToWord"); // 输出列名 fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 输出列类型 // 返回输出数据的ObjectInspector return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } /** * process方法:自定义UDTF的核心逻辑实现方法。 * 代码实现步骤可以分为三部分: * 1. 参数接收 * 2. 自定义UDTF核心逻辑 * 3. 输出结果 */ @Override public void process(Object[] objects) throws HiveException { // 1. 获取原始数据 String arg = objects[0].toString(); // 假设第一个参数为要分割的字符串 // 2. 获取数据传入的第二个参数,此处为分隔符 String splitKey = objects[1].toString(); // 假设第二个参数为分隔符 // 3. 将原始数据按照传入的分隔符进行切分 String[] fields = arg.split(splitKey); // 分割字符串 // 4. 遍历切分后的结果,并写出 for(String field : fields) { // 集合为复用的,首先清空集合 outList.clear(); // 将每个单词添加至集合 outList.add(field); // 将集合内容通过forward方法写出,这里假设forward方法可以处理集合 forward(outList); } } /** * close方法:当没有其他输入行时,调用该函数。 * 可以进行一些资源关闭处理等最终处理。 */ @Override public void close() throws HiveException { // 资源清理逻辑,当前示例中无具体实现 } }
本文我们详细解析了UDAF和UDTF在Hive中的应用。通过实际代码示例,我们展示了UDAF如何帮助我们深入分析数据,以及UDTF如何简化复杂的数据转换任务。
感谢您的阅读和支持。如果您对UDAF、UDTF或Hive的其他高级功能有疑问,或者想要更深入地讨论,欢迎在文章下留言或直接联系我们。期待我们的下一次分享,一起在大数据的世界里探索新知。
再次感谢,希望您喜欢这次的分享。我们下次见!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。