赞
踩
用户自定义函数
udf对象 = spark.udf.register( 参数1, 参数2, 参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf 对象:返回值对象,是一个 UDF对象,可用于DSL风格
- //获取系统时间
- val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- private val startTimeID: UserDefinedFunction = spark.udf.register("STARTTIMEID", (s: String) => {
- s + " time:"+df.format(new Date())
- })
只能通过DataFrame进行DSL查询
ruleFrame.select(startTimeID($"API_CODE"),$"API_ID").show()
需要先注册成表,在进行sql查询
- ruleFrame.createOrReplaceTempView("AAAS_RULE_20220316")
- spark.sql("select STARTTIMEID(API_CODE),API_CODE from AAAS_RULE_20220316").show()
自定义聚合函数类:计算年龄的平均值
继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
IN : 输入的数据类型 Long
BUF : 缓冲区的数据类型 Buff ->样例类
OUT : 输出的数据类型 Long
重写方法(6个)
- package Util
-
- import bean.{UDAFAgeFunction, UserInfo}
- import org.apache.spark.sql.expressions.Aggregator
- import org.apache.spark.sql.{Encoder, Encoders}
-
- /**
- * 自定义聚合函数类:计算年龄的平均值
- * 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
- * IN : 输入的数据类型 Long
- * BUF : 缓冲区的数据类型 Buff
- * OUT : 输出的数据类型 Long
- * 2. 重写方法(6个)
- */
- class MyAvgUDAF extends Aggregator[UserInfo,UDAFAgeFunction,Long]{
- // z & zero : 初始值或零值
- // 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
- override def zero: UDAFAgeFunction = {
- UDAFAgeFunction(0L,0L)
- }
- //根据输入的数据更新缓冲区的数据,缓冲区的计算方式
- //入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
- override def reduce(buffer: UDAFAgeFunction, userInfo: UserInfo): UDAFAgeFunction = {
- buffer.ageSum=buffer.ageSum+userInfo.age
- buffer.cnt=buffer.cnt+1
- buffer
- }
- //合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
- override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
- UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
-
- }
- //计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
- override def finish(buffer: UDAFAgeFunction): Long = {
- buffer.ageSum/buffer.cnt
- }
- // 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
- override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
- // 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
- override def outputEncoder: Encoder[Long] = Encoders.scalaLong
- }
- package Util
- import bean.UDAFAgeFunction
- import org.apache.spark.sql.expressions.Aggregator
- import org.apache.spark.sql.{Encoder, Encoders}
-
- /**
- * 自定义聚合函数类:计算年龄的平均值
- * 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
- * IN : 输入的数据类型 Long
- * BUF : 缓冲区的数据类型 Buff
- * OUT : 输出的数据类型 Long
- * 2. 重写方法(6个)
- */
- class MyAvgUDAFNew extends Aggregator[Int,UDAFAgeFunction,Long]{
- // z & zero : 初始值或零值
- // 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
- override def zero: UDAFAgeFunction = {
- UDAFAgeFunction(0L,0L)
- }
- //根据输入的数据更新缓冲区的数据,缓冲区的计算方式
- //入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
- override def reduce(buffer: UDAFAgeFunction, age: Int): UDAFAgeFunction = {
- buffer.ageSum=buffer.ageSum+age
- buffer.cnt=buffer.cnt+1
- buffer
- }
- //合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
- override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
- UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
-
- }
- //计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
- override def finish(buffer: UDAFAgeFunction): Long = {
- buffer.ageSum/buffer.cnt
- }
- // 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
- override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
- // 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
- override def outputEncoder: Encoder[Long] = Encoders.scalaLong
- }
- package SparkSQL.UDF
-
- import Util.{Env, MyAvgUDAF, MyAvgUDAFNew}
- import bean.UserInfo
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.SparkContext
- import org.apache.spark.sql.expressions.UserDefinedFunction
- import org.apache.spark.sql.{Dataset, SparkSession, TypedColumn, functions}
-
- object UDAFDemo extends App with Env{
- /*
- * 自定义聚合函数
- * 强类型聚合函数 Aggregator
- * */
- Logger.getLogger("org").setLevel(Level.ERROR)
-
- //准备spark环境
- private val sc: SparkContext = getSparkContext()
- //准备SparkSession环境,SparkSession 是 Spark 最新的 SQL 查询起始点
- private val spark: SparkSession = getSparkSession()
- import spark.implicits._
-
- //创建DataSet
- private val userInfoRdd = sc.textFile("src/data/userInfo.txt").map(data=>UserInfo(
- data.split(" ")(0),data.split(" ")(1).toInt)
- )
-
- private val userInfoSet: Dataset[UserInfo] = userInfoRdd.toDF().as[UserInfo]
- userInfoSet.createOrReplaceTempView("userInfo")
- println("############传参为有类型的UDAF###################")
- //因为myAvgUDAFFunction传入的是样例类UserInfo,所以不需要处理select的字段,而是把整个函数当成列数据传入,直接查询结果
- //不注册 将UDAF函数转换为查询的列对象
-
- private val myAvgUDAFCol: TypedColumn[UserInfo, Long] = new MyAvgUDAF().toColumn
- println(" DSl风格结果")
- userInfoSet.select(myAvgUDAFCol).show
- println(" SQl风格不支持查询列对象")
- println("############传参为无类型的UDAF###################")
-
- //替换UDAF的入参,UserInfo改为(Int)就可以通过select myAvgUDAFFunction(name,age) from table 查询
- //注册UDAF,方式和UDF一致
- private val myAvgUDAFNew = new MyAvgUDAFNew
- private val myAvgAgeNew: UserDefinedFunction = spark.udf.register("MYAvgAgeNew", functions.udaf(myAvgUDAFNew))
- println(" DSl风格结果")
- userInfoSet.select(myAvgAgeNew($"age")).show()
-
- println(" sql风格结果")
-
- spark.sql("select MYAvgAgeNew(age) from userInfo").show()
-
- sc.stop()
- spark.stop()
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。