赞
踩
基于spark sql开发过程中,需要一些类似与官网提供的 int()、from_json()等自定函数处理数据。下属将简单讲解通过java如何实现spark sql自定义函数
官方提供了0-22 UDF接口,UDF0代表无参数输入只有返回参数,UDF1接口表示有一个入参,含义以此类推
UDF1官方接口说明
package org.apache.spark.sql.api.java;
import java.io.Serializable;
import org.apache.spark.annotation.InterfaceStability.Stable;
@Stable
public interface UDF2<T1, T2, R> extends Serializable {
R call(T1 var1, T2 var2) throws Exception;
}
UDF2<T1, T2, R> :T1代表第一个输入参数类型,T2代表第二个输入参数类型,R代表返回参数类型
将下属数据在spark sql里面实现 logs.age=age_01 的数据找到一条返回name名称。
在spark sql里面的虚拟表名称为 logs_view
{
"age_01": 20,
"logs": [
{
"age": 20,
"name": "悠悠球"
},
{
"age": 21,
"name": "棒棒冰"
}
]
}
实现代码
import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import scala.collection.mutable.WrappedArray; public class GetLogAge implements UDF2<WrappedArray, Integer, String> { @Override public Long call(WrappedArray logs, Integer age_01) throws Exception { for(int z=0; z<logs.size(); z++){ GenericRowWithSchema log = (GenericRowWithSchema)dependUnits.apply(z); long age_ = log.getAs("age"); if(age_ == age_01){ return log.getAs("name"); } } return ""; } }
注册函数
sqlContext.udf().register("get_logs_name", new GetLogAge(),DataTypes.StringType);
调用
sqlContext.sql("select age_01,logs, get_logs_name(logs, age_01) as name from logs_view ").show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。