当前位置:   article > 正文

spark sql自定义UDF函数-java语言_spark java udf自定义函数

spark java udf自定义函数

背景说明

基于spark sql开发过程中,需要一些类似与官网提供的 int()、from_json()等自定函数处理数据。下属将简单讲解通过java如何实现spark sql自定义函数

官方UDF接口说明

官方提供了0-22 UDF接口,UDF0代表无参数输入只有返回参数,UDF1接口表示有一个入参,含义以此类推
在这里插入图片描述
UDF1官方接口说明

package org.apache.spark.sql.api.java;

import java.io.Serializable;
import org.apache.spark.annotation.InterfaceStability.Stable;

@Stable
public interface UDF2<T1, T2, R> extends Serializable {
    R call(T1 var1, T2 var2) throws Exception;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

UDF2<T1, T2, R> :T1代表第一个输入参数类型,T2代表第二个输入参数类型,R代表返回参数类型

实现案例

将下属数据在spark sql里面实现 logs.age=age_01 的数据找到一条返回name名称。
在spark sql里面的虚拟表名称为 logs_view

{
	"age_01": 20,
	"logs": [
		{
			"age": 20,
			"name": "悠悠球"
		},
		{
			"age": 21,
			"name": "棒棒冰"
		}
	]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

实现代码

import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import scala.collection.mutable.WrappedArray;

public class GetLogAge implements UDF2<WrappedArray, Integer, String> {

    @Override
    public Long call(WrappedArray logs, Integer age_01) throws Exception {

        for(int z=0; z<logs.size(); z++){
            GenericRowWithSchema log = (GenericRowWithSchema)dependUnits.apply(z);
            long age_ = log.getAs("age");
           
            if(age_ == age_01){
                return log.getAs("name");
            }
        }
        return "";
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

注册函数

sqlContext.udf().register("get_logs_name", new GetLogAge(),DataTypes.StringType);
  • 1

调用

sqlContext.sql("select age_01,logs, get_logs_name(logs, age_01) as name from logs_view ").show()
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/603751
推荐阅读
相关标签
  

闽ICP备14008679号