当前位置:   article > 正文

Hive UDF UDTF UDAF 自定义函数详解_hive udaf函数编写

hive udaf函数编写

Hive笔记05 – Hive UDF UDTF UDAF

UDF

UDF在Hive中的实现

在这里插入图片描述

UDF的创建与配置
类名定义规则
示例:com.ybg.hive.ql.func.udf.UDFDateDiffByUnit
规则:反向域名+模块名+功能分类(ql.func.udf:hive查询语言中的UDF函数)+具体功能|类名
  • 1
  • 2
基本配置
New Project - Maven模板
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
UDF核心

1.参数类型和参数值分开管理
2.将共性的校验写在接口中

UDF示例:

主类:UDFDateDiffByUnit extends GenericUDF implements UDFCom,DateCom
目的:计算两个日期之间的差异,可以按年、季、月、周或日计算。
方法:
initialize:参数验证并定义UDF的返回类型

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    validateArgs(arguments,3);
    validateAllPrimitiveArgs(arguments,3);
    return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

evaluate: 具体的数据校验
核心计算方法,计算两个日期之间的差异。

@Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        validateArgs(arguments,3);
        final String strDateSmall = arguments[0].get().toString();
        final String strDateBig = arguments[1].get().toString();
        validateDateFormat(strDateSmall,strDateBig);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Calendar dateSmall = Calendar.getInstance();
        Calendar dateBig = Calendar.getInstance();
        try {
            dateSmall.setTime(sdf.parse(strDateSmall));
            dateBig.setTime(sdf.parse(strDateBig));
        } catch (ParseException e) {
            throw new HiveException(e);
        }
        if(dateSmall.after(dateBig)){
            throw new HiveException("dateSmall by arg1 > dateBig by arg2");
        }
        final String unit = arguments[2].get().toString().toLowerCase();
        int intUnit = 0;
        switch(unit){
            case "y":
                intUnit = Calendar.YEAR;
                break;
            case "q": case "m":
                intUnit = Calendar.MONTH;
                break;
            case "w": case "d":
                intUnit = Calendar.DATE;
                break;
            default:
                throw new HiveException("Unsupported unit by arg3 :"+unit);
        }
        int diff = -1;
        while(true){
            diff++;
            dateSmall.add(intUnit,1);
            if(dateSmall.after(dateBig)){
                break;
            }
        }
        switch(unit){
            case "q":
                diff/=3;
                break;
            case "w":
                diff/=7;
                break;
        }
        return diff;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

getDisplayString:提供函数及其参数的描述(并不重要)

@Override
public String getDisplayString(String[] children) {
    return Objects.isNull(children) || children.length == 0 || null == children[0] ? null : children[0];
}
  • 1
  • 2
  • 3
  • 4

eg:对于"两数相加的UDF"

@Override
public String getDisplayString(String[] children){
	if(children == null || children.length<2){
		// 重述函数用法
		return "Usage:MyAddFunction(int,int)";	
	}
	// 查询解释
	return "MyAddFunction(" + children[0] + ", " + children[1] + ")";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

接口一:DateCom
功能:提供日期相关的辅助方法。
validateDateFormat:验证日期格式是否符合 yyyy-MM-dd。

default void validateDateFormat(String...dateStrArr) throws HiveException {
    for (String dateStr : dateStrArr) {
        // dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")
        if (!dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")) {
            throw new HiveException("date format illegal : " + dateStr);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接口二:UDFCom
功能:提供通用的参数验证方法。
validateArgs:检查传入的参数数量是否正确,以及是否有空参数。
validateAllPrimitiveArgs:确保所有参数都是原始类型。
原始类型指的是最基本的数据类型,直接包含了数据的值。(byte short int long float double char boolean string✔)

/**
 * @param args  实际参数数组
 * @param size  预期参数个数
 * @throws HiveException
 */
// 实现非具体的通用校验
default void validateArgs(Object[] args,int size) throws UDFArgumentException {
    if (size>0 && (Objects.isNull(args) || args.length < size)) {
        // 检验提供的参数数量是否满足size个
        throw new UDFArgumentException(size+" args must be provided.");
    }
    for (int i = 0; i < size; i++) {
        // 检测某一参数是否为空
        if (Objects.isNull(args[i])) {
            throw new UDFArgumentException("type of args["+i+"] null");
        }
    }
}

// 验证参数的类型是否为原始类型
default void validateAllPrimitiveArgs(Object[] args, int size) throws UDFArgumentException{
    for (int i = 0; i < size; i++) {
        if (((ObjectInspector)args[i]).getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("only support primitive type");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

打jar包上传至 HDFS 的两种方式:
– install => 打资源jar包,直接将jar包打入到 maven localRepository
– package => 打执行jar包,直接将jar包打入到 project target ✔

Hive UDF集成到Hive查询环境的四步骤:

  1. 打包(Package)
    package打架包
  2. 找包(Locate Package)
    架包位于target处,show in explorer显示物理路径
  3. 上传(Upload)
    复制架包路径并上传到HDFS上。
  4. 创建Hive UDF映射至HDFS上的JAR文件,并且指定了UDF实现的完整类名。
create function FUNC_NAME as 'com.ybg.hive.ql.func.udf.UDFDateDiffByUnit'(主类的全包路径)
using jar 'hdfs://single01:9000/hive_data/udf/hiveudf2-1.0-SNAPSHOT.jar';(hdfs://single01:9000+HDFS中架包存放路径)
  • 1
  • 2
问题与解决方法:

如果架包删除后重新上传会出现"UDF按照前一个架包方式继续运行"的情况
解决方式是:close project之后重新打开project(重新连接)。

UDTF

UDTF的创建与配置

与UDF相同

UDTF示例
package com.ybg.hive.ql.func.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class MyExplode extends GenericUDTF {
    private static Logger logger = LoggerFactory.getLogger(MyExplode.class);
    private ObjectInspector oi;
    private Object[] params;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        oi = argOIs[0];
        final ObjectInspector.Category category = oi.getCategory();
        // names 和 types分别存储数据的名称和类型
        List<String> names = new ArrayList<>(2);
        List<ObjectInspector> types = new ArrayList<>(2);
        switch (category){
            // 默认的名字
            case MAP:
                logger.info("receive explode category : Map");
                names.add("key");
                names.add("value");
                final MapObjectInspector moi = (MapObjectInspector) this.oi;
                types.add(moi.getMapKeyObjectInspector());
                types.add(moi.getMapValueObjectInspector());
                params = new Object[2];
                break;
            case LIST:
                logger.info("receive explode category : List");
                names.add("value");
                final ListObjectInspector loi = (ListObjectInspector) oi;
                types.add(loi.getListElementObjectInspector());
                params = new Object[1];
                break;
            default:
                throw new UDFArgumentException("not supported category for function explode : " + category);
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(names,types);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        if (args.length != 1 || Objects.isNull(args[0])){
            throw new HiveException("Only 1 nonnull arg supported for function explode, but got " + args.length);
        }
        ObjectInspector.Category category = oi.getCategory();
        switch(category){
            case MAP:
                final Map<?, ?> map = ((MapObjectInspector) oi).getMap(args[0]);
			 // map.entrySet().forEach(entry -> {
			 //     params[0] = entry.getKey();
			 //     params[1] = entry.getValue();
			 //     try {
			 //         forward(params);
			 //     } catch (HiveException e) {
			 //         throw new RuntimeException(e);
			 //     }
			 // });
                final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
                while(it.hasNext()){
                    final Map.Entry<?, ?> entry = it.next();
                    params[0] = entry.getKey();
                    params[1] = entry.getValue();
                    forward(params);
                }
                break;
            case LIST:
                final List<?> list = ((ListObjectInspector) oi).getList(args[0]);
                final Iterator<?> itl = list.iterator();
                while (itl.hasNext()) {
                    params[0] = itl.next();
                    forward(params);
                }
                break;
        }
    }

    @Override
    public void close() throws HiveException {
        oi = null;
        /**
         * 将数组置空
         * 1.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
         * 2.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。
         */
        for (int i = 0; i < params.length; i++) {
            params[i] = null;
        }
        params = null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
代码注意点
  1. public class MyExplode extends GenericUDTF 继承GenericUDTF抽象类之后,会自动重构两个方法process()close(),但是,我们需要手动重构另一个方法initialize(ObjectInspector[] argOIs)(ps:只能用这个过期方法才能处理结构化数据类型,用initialize(StructObjectInspector[] argOIs))无法实现。

  2. 根据处理异常的方式选择循环方式

// 1.发生异常后继续执行
//  map.entrySet().forEach(entry -> {
//      params[0] = entry.getKey();
//      params[1] = entry.getValue();
//      try {
//          forward(params);
//      } catch (HiveException e) {
//          ...
//      }
//  });
// 2.发生异常后终止while循环
  final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
  while(it.hasNext()){
      final Map.Entry<?, ?> entry = it.next();
      params[0] = entry.getKey();
      params[1] = entry.getValue();
      forward(params);
  }
  break;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

注释部分的lambda表达式中foreach期望一个consumer接口,而该接口不允许抛出检查型异常,只能尝试在lambda内捕获异常并进行处理。
而改为iterator迭代则可以选择抛出异常。

  1. 将数组置空
    a.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
    b.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。

UDAF

UDAF Mode
		PARTITIAL1
			-- 对原始数据进行部分聚合
			-- iterate() & teriminatePartitial() 会被调用
			-- Mapper
		
		PARTITIAL2
			-- 将部分聚合进行聚合
			-- merge() & teriminatePartitial() 会被调用
			-- Combiner
		
		FINAL
			-- 将所有的部分聚合进行完全聚合
			-- merge() & terminate() 会被调用
			-- Reducer
		
		COMPLETE
			-- 直接对原始数据进行全量聚合
			-- iterate() & terminate() 会被调用
			-- Mapper -> Reducer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
代码
package cn.ybg.hive.ql.func.udaf;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class MySum extends AbstractGenericUDAFResolver {
    private static Logger logger = LoggerFactory.getLogger(MySum.class);

    // 检查参数类型是否非空且长度为1(是否传入参数都是同一类型)
    private static void checkParam(String content, Object...params) throws SemanticException{
        if(Objects.isNull(params) || params.length!=1 || Objects.isNull(params[0])){
            throw new SemanticException(content);
        }
    }

    // getEvaluator():根据输入参数的类型,选择并返回合适的UDAF计算器
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        // 检查参数info是否非空
        if(Objects.isNull(info)){
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : info NullPointerException");
        }
        // 获取参数的类型信息
        ObjectInspector[] params = info.getParameterObjectInspectors();

        // 子类向上类型转换:自动转换
        checkParam("From YB12211 : MySum getEvaluator(info) : only support one nonnull param",params);

        // 验证参数类型的种类是否为基本数据类型
        ObjectInspector.Category category = params[0].getCategory();
        if(category != ObjectInspector.Category.PRIMITIVE){
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : only support primitive type");
        }

        PrimitiveObjectInspector inputIO = (PrimitiveObjectInspector) params[0];
        AbstractSumEvaluator evaluator;
        // 根据参数类型选择相应的计算器
        switch (inputIO.getPrimitiveCategory()){
            case BYTE: case SHORT: case INT: case LONG:
                evaluator = new SumLong();
                break;
            case FLOAT: case DOUBLE:
                evaluator = new SumDouble();
                break;
            case DECIMAL:
                evaluator = new SumDecimal();
                break;
            default:
                throw new SemanticException("From YB12211 : MySum getEvaluator(info) : doesn't support type of "
                        +inputIO.getPrimitiveCategory());
        }

        // 根据参数设置计算器的”是否开窗“和"是否去重"
        evaluator.setWindowing(info.isWindowing());
        evaluator.setDistinct(info.isDistinct());
        return evaluator;
    }

    // 去除私有
    // AbstractSumEvaluator:实现UDAF中SUM函数的通用逻辑
    // T表示SUM的结果数据类型,通常是 DoubleWritable、LongWritable 或 HiveDecimalWritable。
    static abstract class AbstractSumEvaluator<T extends Writable> extends GenericUDAFEvaluator{
        // AbstractSumAgg是SUM函数的聚合缓冲区
        // E表示SUM的中间结果类型
        abstract class AbstractSumAgg<E> extends AbstractAggregationBuffer{
            // 标识聚合缓冲区是否为空
            boolean empty;
            E agg;
            // 类型差异,不一定是 E
            // 如果使用DISTINCT关键字进行聚合计算,会用它检测唯一性
            Set<Object> unique;

            // 去构造器
            /*public AbstractSumAgg() {
                reset();
            }*/

            public boolean isEmpty() {
                return empty;
            }

            // 添加类型差异
            boolean add(Object parameter){
                if(empty){
                    empty = false;
                }
                // 类型转换
                if (isWindowingAndDistinct()) {
                    // 将参数值parameter转化为java对象obj,便于后续进行唯一性检查
                    Object obj = parameter instanceof ObjectInspectorObject ?
                            (ObjectInspectorObject) parameter :
                            ObjectInspectorUtils.copyToStandardJavaObject(parameter,inputIO);
                    if(unique.contains(obj)){
                        return false;
                    }else{ // 忘了半个逻辑
                        unique.add(obj);
                    }
                }
                return true;
            }

            // 重置聚合缓冲区的状态,将`empty`置为`true`,并清空`unique`集合(如果使用DISTINCT关键字的话)
            void reset(){
                empty = true;
                if (isWindowingAndDistinct()) {
                    if(Objects.nonNull(unique)){
                        if(!unique.isEmpty()){
                            unique.clear();
                        }
                    }else{
                        unique = new HashSet<>();
                    }
                }
            }
        }

        // 属性迁移
        PrimitiveObjectInspector inputIO;
        PrimitiveObjectInspector outputIO;

        boolean isWindowing;
        boolean isDistinct;

        // 去除构造器
        /*public AbstractSumEvaluator(boolean isWindowing, boolean isDistinct) {
            this.isWindowing = isWindowing;
            this.isDistinct = isDistinct;
        }*/

        // 新增 setter
        void setWindowing(boolean windowing) {
            isWindowing = windowing;
        }

        void setDistinct(boolean distinct) {
            isDistinct = distinct;
        }

        // willInit 放回
        void checkParamsAndInit(ObjectInspector[] params,Mode mode, boolean willInit, String content)
                throws HiveException {
            checkParam(content,params);
            // 初始化UDAF计算器的模式和参数
            super.init(mode, params);
            // 根据需要初始化输入输出类型
            if (willInit) {
                inputIO = (PrimitiveObjectInspector) params[0];
                // 将输入的ObjectInspector转化为输出的标准Java ObjectInspector
                outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils
                        .getStandardObjectInspector(inputIO, ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
            }
        }

        boolean isWindowingAndDistinct(){
            return isWindowing && isDistinct;
        }

        // terminatePartial 方法用于计算部分聚合结果,但如果启用了DISTINCT属性,则会抛出异常,因为DISTINCT不支持部分聚合。
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            if (isWindowingAndDistinct()) {
                throw new HiveException("From YB12211 : distinct sum doesn't support terminatePartial(AggregationBuffer agg)");
            }
            return terminate(agg);
        }
    }

    static class SumDecimal extends AbstractSumEvaluator<HiveDecimalWritable>{
        class SumAggDecimal extends AbstractSumAgg<HiveDecimalWritable>{
            // 调用reset()将其重置为初始状态
            public SumAggDecimal() {
                // 变动
                reset();
            }

            // 父类负责对`待加数`进行校验,子类负责实现真正的添加
            @Override
            boolean add(Object parameter) {
                HiveDecimal value = PrimitiveObjectInspectorUtils.getHiveDecimal(parameter, inputIO);
                if (super.add(value)) {
                    agg.mutateAdd(value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new HiveDecimalWritable(HiveDecimal.ZERO);
                }else{
                    agg.set(HiveDecimal.ZERO);
                }
            }
        }

        // 用于初始化计算器的状态
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,false,"From YB12211 : SumDecimal.init(Mode m, ObjectInspector[] parameters) parameters can't be NULL");

            // 考虑到取值范围:将长度放大
            inputIO = (PrimitiveObjectInspector) parameters[0];

            int precision = inputIO.precision();
            int scale = inputIO.scale();
            switch (m){
                // 在部分聚合(PARTIAL1)和最终聚合(COMPLETE)阶段,SUM 函数需要对输入的 DECIMAL 类型数据进行累加,并且可能会产生更大精度的结果。为了确保计算不会丢失精度,需要在这些阶段增加精度。
                case PARTIAL1: case COMPLETE:
                    precision = Math.min(precision+10, HiveDecimal.MAX_PRECISION);
                    break;
            }
            DecimalTypeInfo decimalTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, scale);
            /**
             * PrimitiveObjectInspector
             *      AbstractPrimitiveObjectInspector
             *          AbstractPrimitiveWritableObjectInspector
             */
            outputIO = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(decimalTypeInfo);
            outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(outputIO);
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggDecimal();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggDecimal)agg).reset();
        }

        /*private HiveDecimalWritable toHiveDecimalWritable(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new HiveDecimalWritable(PrimitiveObjectInspectorUtils.getHiveDecimal(value, inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(parameters[0]));
            checkParam("From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDecimal)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(partial));
            checkParam("From YB12211 : SumDecimal.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDecimal)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggDecimal sumAgg = (SumAggDecimal) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumDouble extends AbstractSumEvaluator<DoubleWritable>{
        class SumAggDouble extends AbstractSumAgg<DoubleWritable>{
            public SumAggDouble() {
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
                double value = PrimitiveObjectInspectorUtils.getDouble(parameter, inputIO);
                if (super.add(value)) {
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new DoubleWritable(0.0);
                }else{
                    agg.set(0.0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumDouble.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggDouble();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggDouble)agg).reset();
        }

        /*private DoubleWritable toDouble(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new DoubleWritable(PrimitiveObjectInspectorUtils.getDouble(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggDouble)agg).add(toDouble(parameters[0]));
            checkParam("From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDouble)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggDouble)agg).add(toDouble(partial));
            checkParam("From YB12211 : SumDouble.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDouble)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggDouble sumAgg = (SumAggDouble) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumLong extends AbstractSumEvaluator<LongWritable>{
        class SumAggLong extends AbstractSumAgg<LongWritable>{
            public SumAggLong() {
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
                long value = PrimitiveObjectInspectorUtils.getLong(parameter, inputIO);
                if (super.add(value)) {
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
                    agg = new LongWritable(0);
                }else{
                    agg.set(0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumLong.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new SumAggLong();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((SumAggLong)agg).reset();
        }

        /*private LongWritable toLong(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new LongWritable(PrimitiveObjectInspectorUtils.getLong(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            //((SumAggLong)agg).add(toLong(parameters[0]));
            checkParam("From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggLong)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            //((SumAggLong)agg).add(toLong(partial));
            checkParam("From YB12211 : SumLong.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggLong)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumAggLong sumAgg = (SumAggLong) agg;
            if(sumAgg.isEmpty()){
                return null;
            }
            return sumAgg.agg;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
UDAF的创建与配置

同上

UDAF流程
  1. 创建UDAF的Resolver类(继承于AbstractGenericUDAFResolver)
  2. 在Resolver类中实现getEvaluator方法:返回合适类型的UDAF计算器
  3. 创建计算器类(Evaluator类,通常继承于GenericUDAFEvaluator)
    Evaluator类中实现具体的UDAF函数逻辑和其他和新方法(reset,interate,merge…)
  4. 创建Evaluator类的具体子类

UDF UDTF UDAF的区别

UDF:一进一出
特殊的多进一出(进的部分并列,如:concat)
UDTF:一进多出(列转行,如:explode)
UDAF:多进一出

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号