赞
踩
目录
UDTF(User-Defined Table-Generating Functions)是一进多出函数,如hive中的explode()、posexplode()函数。explode()函数可以将数组(array类型)的元素分隔成多行,或将映射(map类型)的元素分隔为多行和多列。工作中经常会用到这个函数,今天我们这次来分析下explode()函数源码。
查询sql以及结果
select explode(array(1,2,3));
一个自定义的UDTF函数需要继承GenericUDTF抽象类,且实现initialize()、 process()、close()(可选)方法。initialize()方法会被hive调用去通知UDTF函数将要接收到的参数类型。该UDTF必须返回一个与UDTF函数输出相对应的对象检查器。一旦initialize()方法被调用,hive将通过process()方法把一行行数据传给UDTF。在process()方法中,UDTF可以通过调用forward()方法将数据传给其他的operator。最后,当把所有的数据都处理完以后hive会调用close()方法。
由于需要继承GenericUDTF抽象类,我们先来看下GenericUDTF抽象类源码。
-
- /**
- * A Generic User-defined Table Generating Function (UDTF)
- *
- * Generates a variable number of output rows for a single input row. Useful for
- * explode(array)...
- */
-
- public abstract class GenericUDTF {
- Collector collector = null;
-
- /**
- * Additionally setup GenericUDTF with MapredContext before initializing.
- * This is only called in runtime of MapRedTask.
- *
- * @param context context
- */
- public void configure(MapredContext mapredContext) {
- }
-
- public StructObjectInspector initialize(StructObjectInspector argOIs)
- throws UDFArgumentException {
- List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
- ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
- for (int i = 0; i < inputFields.size(); i++) {
- udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
- }
- return initialize(udtfInputOIs);
- }
-
- /**
- * Initialize this GenericUDTF. This will be called only once per instance.
- *
- * @param argOIs
- * An array of ObjectInspectors for the arguments
- * @return A StructObjectInspector for output. The output struct represents a
- * row of the table where the fields of the stuct are the columns. The
- * field names are unimportant as they will be overridden by user
- * supplied column aliases.
- */
- @Deprecated
- public StructObjectInspector initialize(ObjectInspector[] argOIs)
- throws UDFArgumentException {
- throw new IllegalStateException("Should not be called directly");
- }
-
- /**
- * Give a set of arguments for the UDTF to process.
- *
- * @param args
- * object array of arguments
- */
- public abstract void process(Object[] args) throws HiveException;
-
- /**
- * Called to notify the UDTF that there are no more rows to process.
- * Clean up code or additional forward() calls can be made here.
- */
- public abstract void close() throws HiveException;
-
- /**
- * Associates a collector with this UDTF. Can't be specified in the
- * constructor as the UDTF may be initialized before the collector has been
- * constructed.
- *
- * @param collector
- */
- public final void setCollector(Collector collector) {
- this.collector = collector;
- }
-
- /**
- * Passes an output row to the collector.
- *
- * @param o
- * @throws HiveException
- */
- protected final void forward(Object o) throws HiveException {
- collector.collect(o);
- }
-
- }
写出去多次
,与mapreduce中的map方法很像,也是一行一行的数据传入,传入一行数据输出多行数据,process针对每行数据调用一次该方法。了解GenericUDTF抽象类之后,我们再来看下explode函数源码。
-
- /**
- * GenericUDTFExplode.
- *
- */
- @Description(name = "explode",
- value = "_FUNC_(a) - separates the elements of array a into multiple rows,"
- + " or the elements of a map into multiple rows and columns ")
- public class GenericUDTFExplode extends GenericUDTF {
-
- private transient ObjectInspector inputOI = null;
- @Override
- public void close() throws HiveException {
- }
-
- @Override
- public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
- if (args.length != 1) {
- throw new UDFArgumentException("explode() takes only one argument");
- }
-
- ArrayList<String> fieldNames = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-
- switch (args[0].getCategory()) {
- case LIST:
- inputOI = args[0];
- fieldNames.add("col");
- fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
- break;
- case MAP:
- inputOI = args[0];
- fieldNames.add("key");
- fieldNames.add("value");
- fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
- fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
- break;
- default:
- throw new UDFArgumentException("explode() takes an array or a map as a parameter");
- }
-
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
- fieldOIs);
- }
-
- private transient final Object[] forwardListObj = new Object[1];
- private transient final Object[] forwardMapObj = new Object[2];
-
- @Override
- public void process(Object[] o) throws HiveException {
- switch (inputOI.getCategory()) {
- case LIST:
- ListObjectInspector listOI = (ListObjectInspector)inputOI;
- List<?> list = listOI.getList(o[0]);
- if (list == null) {
- return;
- }
- for (Object r : list) {
- forwardListObj[0] = r;
- forward(forwardListObj);
- }
- break;
- case MAP:
- MapObjectInspector mapOI = (MapObjectInspector)inputOI;
- Map<?,?> map = mapOI.getMap(o[0]);
- if (map == null) {
- return;
- }
- for (Entry<?,?> r : map.entrySet()) {
- forwardMapObj[0] = r.getKey();
- forwardMapObj[1] = r.getValue();
- forward(forwardMapObj);
- }
- break;
- default:
- throw new TaskExecutionException("explode() can only operate on an array or a map");
- }
- }
-
- @Override
- public String toString() {
- return "explode";
- }
- }
有了对GenericUDTF抽象类的理解,就不难理解explode函数的源码了。由于explode函数可以把array数组或者映射map类型转成对行或者多列,所以会对数据类型进行判断,然后分别处理。有了以上源码经验,就可以尝试下自己去写一个自定义UDTF了。
不管是hive还是mapreduece,都是比较复杂的一整块,我们在学习的时候不妨“浅尝辄止”。比如我们在继承这个类的时候,只需要关心它能实现什么功能、我们需要处理什么业务逻辑,而不去“深究”为什么它可以实现这样的功能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。