当前位置:   article > 正文

SparkSQL中的自定义函数-UDF&UDAF_spark 注册udf

spark 注册udf

一、UDF(User-Defined-Function)

用户自定义函数

1、注册UDF

udf对象 = spark.udf.register( 参数1, 参数2, 参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名

参数3:声明UDF的返回值类型

udf 对象:返回值对象,是一个 UDF对象,可用于DSL风格

  1. //获取系统时间
  2. val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  3. private val startTimeID: UserDefinedFunction = spark.udf.register("STARTTIMEID", (s: String) => {
  4. s + " time:"+df.format(new Date())
  5. })

2、 DSL使用UDF函数

只能通过DataFrame进行DSL查询

ruleFrame.select(startTimeID($"API_CODE"),$"API_ID").show()

3、SparkSQL使用UDF

需要先注册成表,在进行sql查询

  1. ruleFrame.createOrReplaceTempView("AAAS_RULE_20220316")
  2. spark.sql("select STARTTIMEID(API_CODE),API_CODE from AAAS_RULE_20220316").show()

二、UDAF 用户自定义聚合函数

1、自定义UDAF函数类--传入类型为样例类

自定义聚合函数类:计算年龄的平均值

继承org.apache.spark.sql.expressions.Aggregator, 定义泛型

IN : 输入的数据类型 Long

BUF : 缓冲区的数据类型 Buff ->样例类

OUT : 输出的数据类型 Long

重写方法(6个)

  1. package Util
  2. import bean.{UDAFAgeFunction, UserInfo}
  3. import org.apache.spark.sql.expressions.Aggregator
  4. import org.apache.spark.sql.{Encoder, Encoders}
  5. /**
  6. * 自定义聚合函数类:计算年龄的平均值
  7. * 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
  8. * IN : 输入的数据类型 Long
  9. * BUF : 缓冲区的数据类型 Buff
  10. * OUT : 输出的数据类型 Long
  11. * 2. 重写方法(6个)
  12. */
  13. class MyAvgUDAF extends Aggregator[UserInfo,UDAFAgeFunction,Long]{
  14. // z & zero : 初始值或零值
  15. // 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
  16. override def zero: UDAFAgeFunction = {
  17. UDAFAgeFunction(0L,0L)
  18. }
  19. //根据输入的数据更新缓冲区的数据,缓冲区的计算方式
  20. //入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
  21. override def reduce(buffer: UDAFAgeFunction, userInfo: UserInfo): UDAFAgeFunction = {
  22. buffer.ageSum=buffer.ageSum+userInfo.age
  23. buffer.cnt=buffer.cnt+1
  24. buffer
  25. }
  26. //合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
  27. override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
  28. UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
  29. }
  30. //计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
  31. override def finish(buffer: UDAFAgeFunction): Long = {
  32. buffer.ageSum/buffer.cnt
  33. }
  34. // 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
  35. override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
  36. // 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
  37. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  38. }

2、自定义UDAF函数类--传入类型为数据类型Int等

  1. package Util
  2. import bean.UDAFAgeFunction
  3. import org.apache.spark.sql.expressions.Aggregator
  4. import org.apache.spark.sql.{Encoder, Encoders}
  5. /**
  6. * 自定义聚合函数类:计算年龄的平均值
  7. * 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
  8. * IN : 输入的数据类型 Long
  9. * BUF : 缓冲区的数据类型 Buff
  10. * OUT : 输出的数据类型 Long
  11. * 2. 重写方法(6个)
  12. */
  13. class MyAvgUDAFNew extends Aggregator[Int,UDAFAgeFunction,Long]{
  14. // z & zero : 初始值或零值
  15. // 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
  16. override def zero: UDAFAgeFunction = {
  17. UDAFAgeFunction(0L,0L)
  18. }
  19. //根据输入的数据更新缓冲区的数据,缓冲区的计算方式
  20. //入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
  21. override def reduce(buffer: UDAFAgeFunction, age: Int): UDAFAgeFunction = {
  22. buffer.ageSum=buffer.ageSum+age
  23. buffer.cnt=buffer.cnt+1
  24. buffer
  25. }
  26. //合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
  27. override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
  28. UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
  29. }
  30. //计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
  31. override def finish(buffer: UDAFAgeFunction): Long = {
  32. buffer.ageSum/buffer.cnt
  33. }
  34. // 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
  35. override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
  36. // 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLon\scalaInt等类型
  37. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  38. }

3、二者使用中的区别

  1. package SparkSQL.UDF
  2. import Util.{Env, MyAvgUDAF, MyAvgUDAFNew}
  3. import bean.UserInfo
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.SparkContext
  6. import org.apache.spark.sql.expressions.UserDefinedFunction
  7. import org.apache.spark.sql.{Dataset, SparkSession, TypedColumn, functions}
  8. object UDAFDemo extends App with Env{
  9. /*
  10. * 自定义聚合函数
  11. * 强类型聚合函数 Aggregator
  12. * */
  13. Logger.getLogger("org").setLevel(Level.ERROR)
  14. //准备spark环境
  15. private val sc: SparkContext = getSparkContext()
  16. //准备SparkSession环境,SparkSession 是 Spark 最新的 SQL 查询起始点
  17. private val spark: SparkSession = getSparkSession()
  18. import spark.implicits._
  19. //创建DataSet
  20. private val userInfoRdd = sc.textFile("src/data/userInfo.txt").map(data=>UserInfo(
  21. data.split(" ")(0),data.split(" ")(1).toInt)
  22. )
  23. private val userInfoSet: Dataset[UserInfo] = userInfoRdd.toDF().as[UserInfo]
  24. userInfoSet.createOrReplaceTempView("userInfo")
  25. println("############传参为有类型的UDAF###################")
  26. //因为myAvgUDAFFunction传入的是样例类UserInfo,所以不需要处理select的字段,而是把整个函数当成列数据传入,直接查询结果
  27. //不注册 将UDAF函数转换为查询的列对象
  28. private val myAvgUDAFCol: TypedColumn[UserInfo, Long] = new MyAvgUDAF().toColumn
  29. println(" DSl风格结果")
  30. userInfoSet.select(myAvgUDAFCol).show
  31. println(" SQl风格不支持查询列对象")
  32. println("############传参为无类型的UDAF###################")
  33. //替换UDAF的入参,UserInfo改为(Int)就可以通过select myAvgUDAFFunction(name,age) from table 查询
  34. //注册UDAF,方式和UDF一致
  35. private val myAvgUDAFNew = new MyAvgUDAFNew
  36. private val myAvgAgeNew: UserDefinedFunction = spark.udf.register("MYAvgAgeNew", functions.udaf(myAvgUDAFNew))
  37. println(" DSl风格结果")
  38. userInfoSet.select(myAvgAgeNew($"age")).show()
  39. println(" sql风格结果")
  40. spark.sql("select MYAvgAgeNew(age) from userInfo").show()
  41. sc.stop()
  42. spark.stop()
  43. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/603712
推荐阅读
相关标签
  

闽ICP备14008679号