当前位置:   article > 正文

Hive之系统内置函数&自定义函数(UDF、UDAF、UDTF)介绍和案例(附带完整代码)、IDEA运行Hive_hive udf

hive udf

1.1 系统内置函数

1.查看系统自带的函数

hive (default)> show functions;

2.显示自带的函数的用法

hive (default)> desc function upper;

3.详细显示自带的函数的用法

hive (default)> desc function extended upper;

1.2 自定义函数

1)Hive 自带了一些函数,比如:max/min 等,但是数量有限(大概二、三百个),自己可以通过自定义 UDF来方便的扩展。

2)当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

3)根据用户自定义函数类别分为以下三种:

(1)UDF(User-Defined-Function)

一进一出

(2)UDAF(User-Defined Aggregation Function)

聚集函数,多进一出

类似于:count/max/min

(3)UDTF(User-Defined Table-Generating Functions)

一进多出

如 lateral view explore()

4)官方文档地址

HivePlugins - Apache Hive - Apache Software Foundation

5)编程步骤:

(1)继承 org.apache.hadoop.hive.ql.UDF

(2)需要实现 evaluate 函数;evaluate 函数支持重载;

(3)在 hive 的命令行窗口创建函数

a)添加 jar :add jar linux_jar_path

b)创建 function,

create [temporary] function [dbname.]function_name AS class_name;

(4)在 hive 的命令行窗口删除函数

Drop [temporary] function [if exists] [dbname.]function_name;

6)注意事项

 UDF 必须要有返回类型,可以返回 null,但是返回类型不能为 void;

1.3 自定义 UDF 函数

案例一:大写字母变成小写字母

1.创建一个 Maven 工程 Hive

2.导入依赖

  1. <dependencies>
  2. <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
  3. <dependency>
  4. <groupId>org.apache.hive</groupId>
  5. <artifactId>hive-exec</artifactId>
  6. <version>2.1.0</version>
  7. </dependency>
  8. </dependencies>

3.创建一个类

  1. package com.allen.hive;
  2. import org.apache.hadoop.hive.ql.exec.UDF;
  3. public class Lower extends UDF {
  4. public String evaluate (final String s) {
  5. if (s == null) {
  6. return null;
  7. }
  8. return s.toLowerCase();
  9. }
  10. }

4.打成 jar 包上传到服务器/opt/jar/udf.jar

使用rz命令或者winscp等其他工具上传到你想上传的目录即可

5.将 jar 包添加到 hive 的 classpath

hive (default)> add jar /opt/jar/udf.jar;

6.创建临时函数与开发好的 java class 关联

hive (default)> create temporary function mylower as "com.allen.hive.Lower";

7.即可在 hql 中使用自定义的函数 strip

hive (default)> select ename, mylower(ename) lowername from emp;

下面的案例就不再一一截图了,提供一下代码,有兴趣的可以自己实践。

案例二:修改数据类型使之成为想要的类型

  1. package com.allen.hive;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.Locale;
  6. import org.apache.hadoop.hive.ql.exec.UDF;
  7. //1.定义一个类继承UDF,然后添加一个方法:evaluate,这个方法的参数和返回类型和函数的输入输出一致
  8. //2.把项目打成jar包,然后放到hive的classPath下,或者在hive里面:add jar /opt/jar/myudf.jar
  9. //3.在hive里面新建一个function然后指定到我们新建的类型:create function mydateparse as 'com.allen.hive.MyDateParser';
  10. //4.使用方法:select mydateparser(time) from apache-log limit 10;
  11. public class MyDataParser extends UDF{
  12.      //hive自定义函数,继承UDF类之后,还需要定义一个
  13.      //evaluate方法,这个方法的参数和hive函数接受的参数个数和数据类型一致
  14.      //方法的返回值和hive函数的返回值类型一致
  15.      //这里接受的参数,[29/April/2016:17:38:20 +0800]
  16.      //返回的结果:2016-4-28 20:40:39
  17.      public String evaluate(String s){
  18.           SimpleDateFormat format=new SimpleDateFormat("dd/MMMMM/yyyy:HH:mm:ss Z",Locale.ENGLISH);
  19.           if(s.indexOf("[")>-1){
  20.               s=s.replace("[", "");
  21.           }if(s.indexOf("]")>-1){
  22.               s=s.replace("]", "");
  23.           }
  24.           try {
  25.               //将输入的string转换成date数据类型
  26.               Date date=format.parse(s);
  27.               SimpleDateFormat rformat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  28.               return rformat.format(date);
  29.           } catch (ParseException e) {
  30.               // TODO Auto-generated catch block
  31.               e.printStackTrace();
  32.               return "";
  33.           }
  34.      }
  35. }

步骤同案例一

案例三:把一个字段拆分成多个字段

  1. package com.allen.hive;
  2. import java.util.ArrayList;
  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  4. import org.apache.hadoop.hive.ql.metadata.HiveException;
  5. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
  6. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  8. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  10. public class MyRequestParser extends GenericUDTF{
  11.         @Override
  12.           public StructObjectInspector initialize(ObjectInspector[] argIOs) throws UDFArgumentException {
  13.               if(argIOs.length!=1){
  14.               throw new UDFArgumentException("参数不正确");
  15.           }
  16.           ArrayList<String> filedNames=new ArrayList<String>();
  17.           ArrayList<ObjectInspector> fieldOIs=new ArrayList<ObjectInspector>();
  18.           filedNames.add("rool1");
  19.           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  20.           filedNames.add("rool2");
  21.           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  22.           filedNames.add("rool3");
  23.           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  24.           //将返回字段设置到该UDTF的返回值类型中
  25.           return ObjectInspectorFactory.getStandardStructObjectInspector(filedNames, fieldOIs);
  26.      }
  27.      @Override
  28.      public void close() throws HiveException {
  29.      }
  30.      //process方法是我们处理函数的输入并且输出结果的过程定义方法
  31.      @Override
  32.      public void process(Object[] args) throws HiveException {
  33.         String input =args[0].toString();
  34.         //去掉两头的“"”,\是转义字符。即两头的“"”,用空来代替“”
  35.           input=input.replace("\"", "");
  36.           String[] result=input.split(" ");
  37.           //如果解析错误或失败,则返回三个字段的内容是“--”
  38.           if(result.length!=3){
  39.               result[0]="--";
  40.               result[1]="--";
  41.               result[2]="--";
  42.           }
  43.               forward(result);
  44.      }
  45. }

步骤同案例一

案例四:求和函数

  1. package com.allen.hive;
  2. import org.apache.hadoop.hive.ql.exec.UDAF;
  3. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
  4. import org.apache.hadoop.io.IntWritable;
  5. public class MaxFlowUDAF extends UDAF {
  6.      public static class MaxNumberUDAFEvaluator implements UDAFEvaluator{
  7.           private IntWritable result;
  8.           public void init(){
  9.               result=null;
  10.           }
  11.           //聚合的多行中每行的被聚合的值都会被调用一次iterate方法,所以在这个方法里面我们来定义聚合规则
  12.           public boolean iterate(IntWritable value){
  13.               if(value==null){
  14.                    return false;
  15.               }if(result==null){
  16.                    result=new IntWritable(value.get());
  17.               }else{
  18.                    //需求是求出流量最大值,在这里进行流量值的比较,将最大值放入result
  19.                    result.set(Math.max(result.get(), value.get()));
  20.               }
  21.               return true;
  22.           }
  23.           //hive需要部分聚合结果时会调用该方法,返回当前的result作为hive取部分聚合值得结果
  24.           public IntWritable terminatePartial(){
  25.               return result;
  26.           }
  27.           //聚合值,新行未被处理的值会调用merge加入聚合,在这里直接调用上面定义的聚合规则方法iterate
  28.           public boolean merge(IntWritable other){
  29.               return iterate(other);
  30.           }
  31.           //hive需要最终聚合结果时调用的方法,返回最终结果
  32.           public IntWritable terminate(){
  33.               return result;
  34.           }
  35.      }
  36. }

步骤同案例一

案例五:排序TopN

  1. package com.allen.hive;
  2. import java.util.ArrayList;
  3. import java.util.Collections;
  4. import java.util.Comparator;
  5. import java.util.List;
  6. import org.apache.hadoop.hive.ql.exec.UDAF;
  7. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
  8. public class TopnUDAF extends UDAF{
  9.      public static class State{
  10.           ArrayList<Double> a;//保存topn的结果
  11.           int n;//调用该函数的topn的n
  12.      }
  13.      public static class Evaluator implements UDAFEvaluator{
  14.           private State state;
  15.           public Evaluator() {
  16.               init();
  17.           }
  18.           //初始化Evaluator对象
  19.           public void init() {
  20.               if(state==null){
  21.                    state = new State();
  22.               }
  23.               state.a = new ArrayList<Double>();
  24.               state.n = 0;
  25.           }
  26.           /**
  27.            *map任务每行的值都会被调用一次iterate方法,iterate接收的参数正是调用函数时传入的参数
  28.            * @param o 聚合的字段值
  29.            * @param n   topn的n
  30.            * @return
  31.            */
  32.           public boolean iterate(Double o,int n){
  33.               //升降序topn表示,false表示最大值topn,true表示最小值topn
  34.               boolean ascending = false;
  35.               state.n = n;
  36.               if(o!=null){
  37.                    //是否插入标志
  38.                    boolean doInsert = state.a.size()<n;
  39.                    //如果当前的state.a的元素数量大于或者等于n则需要插入操作
  40.                    if(!doInsert){
  41.                         Double last = state.a.get(state.a.size()-1);
  42.                         if(ascending){
  43.                              doInsert = o<last;
  44.                         }else{
  45.                              doInsert = o>last;
  46.                         }
  47.                    }
  48.                    if(doInsert){
  49.                         //有顺序的插入o的值
  50.                         binaryInsert(state.a,o,ascending);
  51.                         if(state.a.size()>n){
  52.                              state.a.remove(state.a.size()-1);
  53.                         }
  54.                    }
  55.               }
  56.               return true;
  57.           }
  58.           //将value的值按照ascending的顺序插入到List中相应的位置处
  59.           static <T extends Comparable<T>> void binaryInsert(List<T> list,T value,boolean ascending){
  60.               //根据顺序获取value在list中的位置
  61.               int position = Collections.binarySearch(list, value,getComparator(ascending,(T)null));//!!!!
  62.               if(position<0){
  63.                    position = (-position) - 1;
  64.               }
  65.               list.add(position, value);
  66.           }
  67.           //比较器方法
  68.           static <T extends Comparable<T>> Comparator<T> getComparator(boolean ascending,T dummy){
  69.               Comparator<T> comp;
  70.               if(ascending){
  71.                    comp = new Comparator<T>(){
  72.                         public int compare(T o1,T o2){
  73.                              return o1.compareTo(o2);
  74.                         }
  75.                    };
  76.               }else{
  77.                    comp = new Comparator<T>(){
  78.                         public int compare(T o1,T o2){
  79.                              return o2.compareTo(o1);
  80.                         }
  81.                    };
  82.               }
  83.               return comp;
  84.           }
  85.           //一个map端执行结束后的输出值,这个值会被送到merge去合并
  86.           public State terminatePartial(){
  87.               if(state.a.size()>0){
  88.                    return state;
  89.               }else{
  90.                    return null;
  91.               }
  92.           }
  93.           /**
  94.            * reduce端,将map端的输出结果,即terminatePartial的返回值,进行合并操作
  95.            * 有多少个map端,reduce将会调用多少次merge方法
  96.            * @param o 本次merge合并需要处理的map端terminatePartial方法返回的state对象
  97.            * @return
  98.            */
  99.           public boolean merge(State o){
  100.               //升降序topn表示,false表示最大值topn,true表示最小值topn
  101.               boolean ascending = false;
  102.               if(o!=null){
  103.                    state.n = o.n;
  104.                    state.a = sortedMerge(o.a,state.a,ascending,o.n);
  105.               }
  106.               return true;
  107.           }
  108.           static <T extends Comparable<T>> ArrayList<T> sortedMerge(List<T> a1,List<T> a2,boolean ascending,int n){
  109.               Comparator<T> comparator = getComparator(ascending,(T)null);
  110.               int n1 = a1.size();
  111.               int n2 = a2.size();
  112.               int p1 = 0;//当前a1的元素
  113.               int p2 = 0;//当前a2的元素
  114.               //保存结果list,有n个元素
  115.               ArrayList<T> output = new ArrayList<T>(n);
  116.               //遍历并将a1和a2合并到output中,合并过程中保证output最多有n个元素
  117.               while(output.size()<n && (p1<n1 || p2<n2)){
  118.                    if(p1<n1){
  119.                         if(p2==n2||comparator.compare(a1.get(p1), a2.get(p2))<0){
  120.                              output.add(a1.get(p1++));
  121.                         }
  122.                    }
  123.                    if(output.size()==n){
  124.                         break;
  125.                    }
  126.                    if(p2<n2){
  127.                         if(p1==n1||comparator.compare(a2.get(p2), a1.get(p1))<0){
  128.                              output.add(a2.get(p2++));
  129.                         }
  130.                    }
  131.               }
  132.               return output;
  133.           }
  134.           public ArrayList<Double> terminate(){
  135.               if(state.a.size()>0){
  136.                            return state.a;
  137.               }else{
  138.                    return null;
  139.               }
  140.           }
  141.      }   
  142. }

步骤同案例一

附加:pom.xml配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>com.allen.hive</groupId>
  7.     <artifactId>Hive_Test</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <dependencies>
  10.         <dependency>
  11.             <groupId>junit</groupId>
  12.             <artifactId>junit</artifactId>
  13.             <version>3.8.1</version>
  14.             <scope>test</scope>
  15.         </dependency>
  16.         <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
  17.         <dependency>
  18.             <groupId>org.apache.hive</groupId>
  19.             <artifactId>hive-exec</artifactId>
  20.             <version>2.1.0</version>
  21.         </dependency>
  22.         <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-contrib -->
  23.         <dependency>
  24.             <groupId>org.apache.hive</groupId>
  25.             <artifactId>hive-contrib</artifactId>
  26.             <version>2.1.0</version>
  27.         </dependency>
  28.         <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
  29.         <dependency>
  30.             <groupId>org.apache.hive</groupId>
  31.             <artifactId>hive-jdbc</artifactId>
  32.             <version>2.1.0</version>
  33.         </dependency>
  34.     </dependencies>
  35. </project>

1.4 IDEA连接Hive,执行select简单测试

  1. package com.allen.hive;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.ResultSet;
  5. import java.sql.Statement;
  6. public class HiveTest {
  7.      public static void main(String[] args) throws Exception {
  8.           Class.forName("org.apache.hive.jdbc.HiveDriver");
  9.           Connection conn=DriverManager.getConnection("jdbc:hive2://node4:10000","root","123qwe");
  10.           try{
  11.               Statement st=conn.createStatement();
  12.               ResultSet ret=st.executeQuery("select count(*) from log_table");
  13.               if(ret.next()){
  14.                    System.out.println(ret.getInt(1));
  15.               }
  16.           }catch(Exception e){
  17.               e.printStackTrace();
  18.           }finally{
  19.               conn.close();
  20.           }
  21.      }
  22. }

因为使用的是hive2,所以要在CLI先使用命令hiveserver2启动10000端口,再执行程序,不然会报错:拒绝连接

结果如下:

与CLI执行结果一致:

执行程序时遇到的问题:

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

原因:log4j2的配置文件没有导入

解决办法:

尝试导入log4j.properties ,但并不行

需要导入log4j2.xml

在你项目的src下的resources下新建log4j2.xml,eclipse和IDEA会把其配置到WEB-INF的classes下

log4j2的配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <Configuration>
  3.     <Appenders>
  4.         <Console name="STDOUT" target="SYSTEM_OUT">
  5.             <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
  6.         </Console>
  7.         <RollingFile name="RollingFile" fileName="logs/strutslog1.log"
  8.                      filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
  9.             <PatternLayout>
  10.                 <Pattern>%d{MM-dd-yyyy} %p %c{1.} [%t] -%M-%L- %m%n</Pattern>
  11.             </PatternLayout>
  12.             <Policies>
  13.                 <TimeBasedTriggeringPolicy />
  14.                 <SizeBasedTriggeringPolicy size="1 KB"/>
  15.             </Policies>
  16.             <DefaultRolloverStrategy fileIndex="max" max="2"/>
  17.         </RollingFile>
  18.     </Appenders>
  19.     <Loggers>
  20.         <Logger name="com.opensymphony.xwork2" level="WAN"/>
  21.         <Logger name="org.apache.struts2" level="WAN"/>
  22.         <Root level="warn">
  23.             <AppenderRef ref="STDOUT"/>
  24.         </Root>
  25.     </Loggers>
  26. </Configuration>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/753824
推荐阅读
相关标签
  

闽ICP备14008679号