当前位置:   article > 正文

hive源码之explode函数_hive explode

hive explode

目录

一、函数介绍

二、使用案例

三、源码分析

四、总结


一、函数介绍

UDTF(User-Defined Table-Generating Functions)是一进多出函数,如hive中的explode()、posexplode()函数。explode()函数可以将数组(array类型)的元素分隔成多行,或将映射(map类型)的元素分隔为多行和多列。工作中经常会用到这个函数,今天我们这次来分析下explode()函数源码。

二、使用案例

查询sql以及结果

select explode(array(1,2,3));

三、源码分析

一个自定义的UDTF函数需要继承GenericUDTF抽象类,且实现initialize()、 process()、close()(可选)方法。initialize()方法会被hive调用去通知UDTF函数将要接收到的参数类型。该UDTF必须返回一个与UDTF函数输出相对应的对象检查器。一旦initialize()方法被调用,hive将通过process()方法把一行行数据传给UDTF。在process()方法中,UDTF可以通过调用forward()方法将数据传给其他的operator。最后,当把所有的数据都处理完以后hive会调用close()方法。
由于需要继承GenericUDTF抽象类,我们先来看下GenericUDTF抽象类源码

  1. /**
  2. * A Generic User-defined Table Generating Function (UDTF)
  3. *
  4. * Generates a variable number of output rows for a single input row. Useful for
  5. * explode(array)...
  6. */
  7. public abstract class GenericUDTF {
  8. Collector collector = null;
  9. /**
  10. * Additionally setup GenericUDTF with MapredContext before initializing.
  11. * This is only called in runtime of MapRedTask.
  12. *
  13. * @param context context
  14. */
  15. public void configure(MapredContext mapredContext) {
  16. }
  17. public StructObjectInspector initialize(StructObjectInspector argOIs)
  18. throws UDFArgumentException {
  19. List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
  20. ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
  21. for (int i = 0; i < inputFields.size(); i++) {
  22. udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
  23. }
  24. return initialize(udtfInputOIs);
  25. }
  26. /**
  27. * Initialize this GenericUDTF. This will be called only once per instance.
  28. *
  29. * @param argOIs
  30. * An array of ObjectInspectors for the arguments
  31. * @return A StructObjectInspector for output. The output struct represents a
  32. * row of the table where the fields of the stuct are the columns. The
  33. * field names are unimportant as they will be overridden by user
  34. * supplied column aliases.
  35. */
  36. @Deprecated
  37. public StructObjectInspector initialize(ObjectInspector[] argOIs)
  38. throws UDFArgumentException {
  39. throw new IllegalStateException("Should not be called directly");
  40. }
  41. /**
  42. * Give a set of arguments for the UDTF to process.
  43. *
  44. * @param args
  45. * object array of arguments
  46. */
  47. public abstract void process(Object[] args) throws HiveException;
  48. /**
  49. * Called to notify the UDTF that there are no more rows to process.
  50. * Clean up code or additional forward() calls can be made here.
  51. */
  52. public abstract void close() throws HiveException;
  53. /**
  54. * Associates a collector with this UDTF. Can't be specified in the
  55. * constructor as the UDTF may be initialized before the collector has been
  56. * constructed.
  57. *
  58. * @param collector
  59. */
  60. public final void setCollector(Collector collector) {
  61. this.collector = collector;
  62. }
  63. /**
  64. * Passes an output row to the collector.
  65. *
  66. * @param o
  67. * @throws HiveException
  68. */
  69. protected final void forward(Object o) throws HiveException {
  70. collector.collect(o);
  71. }
  72. }
  1. initialize初始化:UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型,名称)。其实就是把上一个operator返回的对象检查器传给UDTF,因为object inspector 对象检查器中保存了数据的类型,initialize针对任务调一次。
  2. process:初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果写出。传入一行数据写出去多次,与mapreduce中的map方法很像,也是一行一行的数据传入,传入一行数据输出多行数据,process针对每行数据调用一次该方法。
  3. close:最后close()方法调用,对需要清理的方法进行清理,close()方法针对整个任务调一次。

了解GenericUDTF抽象类之后,我们再来看下explode函数源码。

  1. /**
  2. * GenericUDTFExplode.
  3. *
  4. */
  5. @Description(name = "explode",
  6. value = "_FUNC_(a) - separates the elements of array a into multiple rows,"
  7. + " or the elements of a map into multiple rows and columns ")
  8. public class GenericUDTFExplode extends GenericUDTF {
  9. private transient ObjectInspector inputOI = null;
  10. @Override
  11. public void close() throws HiveException {
  12. }
  13. @Override
  14. public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  15. if (args.length != 1) {
  16. throw new UDFArgumentException("explode() takes only one argument");
  17. }
  18. ArrayList<String> fieldNames = new ArrayList<String>();
  19. ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
  20. switch (args[0].getCategory()) {
  21. case LIST:
  22. inputOI = args[0];
  23. fieldNames.add("col");
  24. fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
  25. break;
  26. case MAP:
  27. inputOI = args[0];
  28. fieldNames.add("key");
  29. fieldNames.add("value");
  30. fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
  31. fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
  32. break;
  33. default:
  34. throw new UDFArgumentException("explode() takes an array or a map as a parameter");
  35. }
  36. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
  37. fieldOIs);
  38. }
  39. private transient final Object[] forwardListObj = new Object[1];
  40. private transient final Object[] forwardMapObj = new Object[2];
  41. @Override
  42. public void process(Object[] o) throws HiveException {
  43. switch (inputOI.getCategory()) {
  44. case LIST:
  45. ListObjectInspector listOI = (ListObjectInspector)inputOI;
  46. List<?> list = listOI.getList(o[0]);
  47. if (list == null) {
  48. return;
  49. }
  50. for (Object r : list) {
  51. forwardListObj[0] = r;
  52. forward(forwardListObj);
  53. }
  54. break;
  55. case MAP:
  56. MapObjectInspector mapOI = (MapObjectInspector)inputOI;
  57. Map<?,?> map = mapOI.getMap(o[0]);
  58. if (map == null) {
  59. return;
  60. }
  61. for (Entry<?,?> r : map.entrySet()) {
  62. forwardMapObj[0] = r.getKey();
  63. forwardMapObj[1] = r.getValue();
  64. forward(forwardMapObj);
  65. }
  66. break;
  67. default:
  68. throw new TaskExecutionException("explode() can only operate on an array or a map");
  69. }
  70. }
  71. @Override
  72. public String toString() {
  73. return "explode";
  74. }
  75. }

有了对GenericUDTF抽象类的理解,就不难理解explode函数的源码了。由于explode函数可以把array数组或者映射map类型转成对行或者多列,所以会对数据类型进行判断,然后分别处理。有了以上源码经验,就可以尝试下自己去写一个自定义UDTF了。

四、总结

不管是hive还是mapreduece,都是比较复杂的一整块,我们在学习的时候不妨“浅尝辄止”。比如我们在继承这个类的时候,只需要关心它能实现什么功能、我们需要处理什么业务逻辑,而不去“深究”为什么它可以实现这样的功能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/426574
推荐阅读
相关标签
  

闽ICP备14008679号