当前位置:   article > 正文

Spark学习(六):Spark SQL一_org.apache.spark.sql.{dataframe, sqlcontext}

org.apache.spark.sql.{dataframe, sqlcontext}

目录

1 Spark SQL

1.1 Spark SQL是什么

1.2 Spark的优点

1.3 RDD vs DataFrame vs Dataset

1.3.1 RDD

1.3.2 DataFrame

1.3.3 Dataset

1.3.4 三者的共性

1.3.5 三者的区别

2 Spark SQL编程

2.1 spark-shell编程

2.2 IDEA创建Spark SQL 程序

3 Spark SQL解析

3.1 新的起始点SparkSession

3.2 创建DataFrame

3.3 DataFrame的常用操作

3.3.1 DSL(Domain Specific Language)风格语法

3.3.2 SQL风格语法

3.4 创建Dataset

3.5 Dataset和RDD互操作

3.5.1 通过反射获取Scheam

3.5.2 通过编程设置schema

3.6 类型之间的转换总结

3.6.1 DataFrame/Dataset转RDD:

3.6.2 RDD转DataFrame:

3.6.3 RDD转Dataset:

3.6.4 Dataset转DataFrame:

3.6.5 DataFrame转Dataset:

3.7 用户自定义函数

3.7.1 用户自定义UDF函数

3.7.2 用戶自定义聚合函数


1 Spark SQL

1.1 Spark SQL是什么

官网:http://spark.apache.org/sql/

Spark SQL是Apache Spark用来处理结构化数据的一个模块,它提供了一个最核心的编程抽象叫做DataFrame,并且作为分布式SQL查询引擎的作用。

1.2 Spark的优点

  • 易整合,Spark SQL DataFrame API 多种语言都支持
  • 统一的数据访问形式,都是以read()方法进行访问
  • 兼容Hive,之前Hive查询底层是解析成MapReduce运行,现在在hive查询被解析为spark程序,执行效率非常快!
  • 标准的数连接,通过JDBC或ODBC连接

SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。

1.3 RDD vs DataFrame vs Dataset

 

 

Sql

DataFram

Dataset

Syntax errors  

Run time

Compile time

Compile time

Analysis errors

Run time

Run time

Compile time

Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错,而Dataset是一个强数据类型,在编译时期就会检查数据类型

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6测试)—>Dataset(Spark2.x之后)

spark1.6—>2.x改变:本文用的版本是spark2.2.0

  1. 由SparkSession作为spark程序的入口,取代了之前的SparkContext和HiveContext;保留是为了向后兼容
  2. 数据集API和Dataframe API是统一的,在Scala中DataFrame成为类型别名Dataset[Row],相当于DateFrame=Dateset[Row];而Java API中必须替换为Dateset<Row>
  3. 一些数据集和DataFrame API 的优化

1.3.1 RDD

RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。

RDD的最大好处就是简单,API的人性化程度很高。

RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。

1.3.2 DataFrame

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待

DataFrame也是懒执行的。

性能上比RDD要高,主要有两方面原因: 

  1. 定制化内存管理
  2. 数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。

 

图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

1.3.3 Dataset

  1. 是Dataframe API的一个扩展,是Spark最新的数据抽象
  2. 用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
  3. Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
  4. 样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
  5. Dataframe是Dataset的特例,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
  6. DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
  7. DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

1.3.4 三者的共性

1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。

val sparkconf = new SparkConf().setMaster("local").setAppName("test")

val spark = SparkSession.builder().config(sparkconf).getOrCreate()

val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

// map不运行

rdd.map{line=>

  println("运行")

  line._1

}

3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

4、三者都有partition的概念

5、三者有许多共同的函数,如filter,排序等

6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持

import spark.implicits._

7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

DataFrame:

testDF.map{

      case Row(col1:String,col2:Int)=>{

        println(col1);println(col2)

        col1}

      case _=>  ""

    }

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

    testDS.map{

      case Coltest(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=> ""

    }

1.3.5 三者的区别

RDD:

1、RDD一般和spark mlib同时使用

2、RDD不支持sparksql操作

DataFrame:

1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问,如

testDF.foreach{ line =>

    val col1=line.getAs[String]("col1")

    val col2=line.getAs[String]("col2")

}

2、DataFrame与Dataset一般不与spark ml同时使用

3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如

dataDF.createOrReplaceTempView("tmp")

spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

//保存

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")

datawDF.write.format("com.bigdata.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

//读取

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")

val datarDF= spark.read.options(options).format("com.bigdata.spark.csv").load()

利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。

Dataset:

Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。

DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段

而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

/**

 rdd("a", 1)("b", 1)("a", 1)

**/

val test: Dataset[Coltest]=rdd.map{line=>

      Coltest(line._1,line._2)

    }.toDS

test.map{ line=>

        println(line.col1)

        println(line.col2)

    }

可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题

2 Spark SQL编程

2.1 spark-shell编程

例子:查询年龄大于20岁的用户,创建如下JSON格式的文件

  1. {"name":"jack", "age":22}
  2. {"name":"rose", "age":21}
  3. {"name":"mike", "age":19}

2.2 IDEA创建Spark SQL 程序

IDEA中程序的打包和运行都和SparkCore类似,在Maven依赖中导入新的依赖项:

  1. <!--导入spark sql的依赖jar包-->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-sql_2.11</artifactId>
  5. <version>2.2.0</version>
  6. </dependency>

程序如下:

  1. import org.apache.spark.sql.{DataFrame, SQLContext}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object SaprkSqlFirstDemo {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. .setMaster("local")
  7. .setAppName(this.getClass.getSimpleName)
  8. val sc = new SparkContext(conf)
  9. //操作sql的专用实例
  10. val sQLContext = new SQLContext(sc)
  11. //导入sql实例上的隐式转换
  12. import sQLContext.implicits._
  13. val df: DataFrame = sQLContext.read.json("e://student.json")
  14. df.show()
  15. df.filter("age > 20").show()
  16. df.createTempView("student")
  17. sQLContext.sql("select name,age from student where age > 20").show()
  18. sc.stop()
  19. }
  20. }

3 Spark SQL解析

3.1 新的起始点SparkSession

在老的版本中,SparkSQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询,SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

  1. val spark = SparkSession.builder()
  2. .master("local[*]")
  3. .appName(this.getClass.getSimpleName)
  4. .getOrCreate()
  5. import spark.implicits._

SparkSession.builder 用于创建一个SparkSession。

import spark.implicits._的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。

如果需要Hive支持的话,则需要创建以下语句

  1. val spark = SparkSession.builder()
  2. .master("local[*]")
  3. .appName(this.getClass.getSimpleName)
  4. .enableHiveSupport() //开启spark对hive的支持
  5. .getOrCreate()
  6. import spark.implicits._

3.2 创建DataFrame

 在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。

从已有的RDD进行转换

  1. /*
  2. 1001 zhangshan male
  3. 1002 lishi female
  4. 1003 zhaoliu male
  5. */
  6. val file: RDD[String] = sc.textFile("spark_day01/student.txt")
  7. val df: DataFrame = file.map(line => {
  8. val fiels: Array[String] = line.split(" ")
  9. (fiels(0).toInt, fiels(1), fiels(2))
  10. }).toDF("id", "name", "sex")
  11. df.show()

其他两种方式我们在后面的数据源介绍

3.3 DataFrame的常用操作

3.3.1 DSL(Domain Specific Language)风格语法

people.txt

tom 18 87

jack 19 88

rose 20 95

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  3. object DatasetDSLDemo {
  4. // 配置日志的显示级别
  5. Logger.getLogger("org").setLevel(Level.ERROR)
  6. def main(args: Array[String]): Unit = {
  7. val spark: SparkSession = SparkSession.builder().master("local[*]")
  8. .appName(this.getClass.getSimpleName)
  9. .getOrCreate()
  10. // 导入隐式转换
  11. import spark.implicits._
  12. val file: Dataset[String] = spark.read.textFile("hdfs://hadoop101:9000/people.txt")
  13. val tpDs: Dataset[(String, Int, Int)] = file.map(t => {
  14. val split = t.split(" ")
  15. (split(0), split(1).toInt, split(2).toInt)
  16. })
  17. val pdf: DataFrame = tpDs.toDF("name", "age", "fv")
  18. // ("select age from v_person where age > 23 ")
  19. // DSL 对应着sql中的一些操作
  20. // select 选择
  21. pdf.select("age","name").show()
  22. // where 是sql中的写法 filter where 调用的就是filter
  23. pdf.where("age > 18").show()
  24. // 排序 orderBy sort 同一个API 默认是升序
  25. pdf.orderBy($"age" desc).show()
  26. // 分组聚合
  27. // 统计次数 count(*)
  28. pdf.groupBy("age").count().show()
  29. // 导入函数
  30. import org.apache.spark.sql.functions._
  31. pdf.groupBy("age").agg(count("*") as "cnts").show()
  32. val sumfv = pdf.groupBy("age").agg(sum("fv"))
  33. sumfv.show()
  34. pdf.groupBy("age").agg(min("fv") as "minfv").orderBy("minfv").show()
  35. sumfv.printSchema()
  36. // 如何修改字段的名称
  37. sumfv.withColumnRenamed("sum(fv)","sumFV").show()
  38. //实际 优先使用SQl 语法风格,一些简单的可以用DSL
  39. spark.stop()
  40. }
  41. }

3.3.2 SQL风格语法

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  3. object DatasetSQLDemo {
  4. def main(args: Array[String]): Unit = {
  5. //两种方式获取sparkSession实例
  6. val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate()
  7. /*val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
  8. val spark = SparkSession.builder().config(conf).getOrCreate()*/
  9. //在sparkSession中,已经创建好了sqlContext和sparkContext
  10. val sqlContext = spark.sqlContext
  11. val sc = spark.sparkContext
  12. import spark.implicits._
  13. /* 当没有隐式转换时出现下面的错误:缺少import spark.implicits._的支持
  14. Error:(24, 53) Unable to find encoder for type stored in a Dataset.
  15. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
  16. Support for serializing other types will be added in future releases.
  17. val tpDS: Dataset[(String, Int, Int)] = file.map(t => {
  18. */
  19. //读取外部文件获取到Dataset,在Dataset中自带了schema信息
  20. val file: Dataset[String] = spark.read.textFile("hdfs://hadoop101:9000/people.txt")
  21. file.printSchema()
  22. val tpDS: Dataset[(String, Int, Int)] = file.map(t => {
  23. val strings = t.split(" ")
  24. (strings(0), strings(1).toInt, strings(2).toInt)
  25. })
  26. tpDS.printSchema()
  27. val pdf: DataFrame = tpDS.toDF("name","age","fv")
  28. pdf.printSchema()
  29. //注册临时试图,使用范围为当前session,session退出后,表就失效了。
  30. //可以注册为全局表,但是当使用全局表时,需要全路径访问,如global_temp.people
  31. pdf.createTempView("v_people")
  32.   pdf.createGlobalTempView("people")
  33.   //写sql
  34. val result = spark.sql("select name, age, fv from v_people where age > 18")
  35.   val result1 = spark.sql("select name, age, fv from global_temp.people where age > 18"
  36. result.show()
  37.   result1.show()
  38. spark.close()
  39. }
  40. }

3.4 创建Dataset

Dataset是强类型的数据集合,需要提供对应的数据类型信息

  1. import org.apache.spark.sql.{Dataset, SparkSession}
  2. object CreateDataset {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession.builder()
  5. .master("local[*]")
  6. .appName(this.getClass.getSimpleName)
  7. .getOrCreate()
  8. import spark.implicits._
  9. val caseClassDS: Dataset[Person] = Seq(Person("jack","18")).toDS()
  10. caseClassDS.show()
  11. val primitiveDS: Dataset[Int] = Seq(1, 2, 3).toDS()
  12. primitiveDS.map(_ + 1).collect().foreach(println(_))
  13. val peopleDS = spark.read.json("hdfs://hadoop101:9000/WordCount/student.json").as[Person]
  14. peopleDS.show()
  15. }
  16. }
  17. case class Person(name: String, age: String)

3.5 Dataset和RDD互操作

Spark SQL支持通过两种方式将存在的RDD转换为Dataset,转换的过程中需要让Dataset获取RDD中的Schema信息,主要有两种方式,一种是通过反射来获取RDD中的Schema信息。这种方式适合于列名已知的情况下。第二种是通过编程接口的方式将Schema信息应用于RDD,这种方式可以处理那种在运行时才能知道列的方式。

3.5.1 通过反射获取Scheam

SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,之前我们都是使用case类的方式,将case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。

3.5.2 通过编程设置schema

如果case类不能够提前定义,可以通过下面三个步骤定义一个DataFrame

创建一个多行结构的RDD;

创建用StructType来表示的行结构信息。

通过SparkSession提供的createDataFrame方法来应用Schema 。

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.types._
  4. object CreateBySetSchema {
  5. def main(args: Array[String]): Unit = {
  6. val spark: SparkSession = SparkSession.builder()
  7. .master("local[*]")
  8. .appName("CreateBySetSchema")
  9. .getOrCreate()
  10. val peopleRDD: RDD[String] = spark.sparkContext.textFile("hdfs://hadoop101:9000/people.txt")
  11. //创建schema信息
  12. val schemaDemo = StructType(
  13. Seq(
  14. StructField("name", StringType),
  15. StructField("age", IntegerType),
  16. StructField("fv", IntegerType)
  17. ))
  18. import org.apache.spark.sql._
  19. //将读取到的数据切分好字段,转换为Row
  20. val rowRDD = peopleRDD
  21. .map(_.split(" "))
  22. .map(attributes => Row(attributes(0), attributes(1).toInt,attributes(2).toInt))
  23. //将创建的schema信息和Row传入即可创建DF
  24. val peopleDF: DataFrame = spark.createDataFrame(rowRDD,schemaDemo)
  25. peopleDF.createTempView("people")
  26. val result: DataFrame = spark.sql("select * from people")
  27. result.show()
  28. }
  29. }

3.6 类型之间的转换总结

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

3.6.1 DataFrame/Dataset转RDD:

这个转换很简单

val rdd1=testDF.rdd

val rdd2=testDS.rdd

3.6.2 RDD转DataFrame:

import spark.implicits._

val testDF = rdd.map { line => (line._1,line._2) }.toDF("col1","col2")

一般用元组把一行的数据写在一起,然后在toDF中指定字段名

3.6.3 RDD转Dataset:

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

val testDS = rdd.map {line=>

      Coltest(line._1,line._2)

    }.toDS

可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

3.6.4 Dataset转DataFrame

这个也很简单,因为只是把case class封装成Row

import spark.implicits._

val testDF = testDS.toDF

3.6.5 DataFrame转Dataset:

import spark.implicits._

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

val testDS = testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。

在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用

3.7 用户自定义函数

3.7.1 用户自定义UDF函数

  1. import org.apache.spark.sql.{DataFrame, SparkSession}
  2. object DefFunctionUDF {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession.builder()
  5. .master("local[*]")
  6. .appName(this.getClass.getSimpleName)
  7. .getOrCreate()
  8. import spark.implicits._
  9. val df: DataFrame = spark.read.json("hdfs://hadoop101:9000/WordCount/student.json")
  10. df.show()
  11. //自定义函数并注册
  12. spark.udf.register("addName",(x: String) => "Name:" + x)
  13. df.createTempView("student")
  14. //使用自定函数
  15. spark.sql("select addName(name),age from student").show()
  16. /*
  17. +-----------------+---+
  18. |UDF:addName(name)|age|
  19. +-----------------+---+
  20. | Name:jack| 22|
  21. | Name:rose| 21|
  22. | Name:mike| 19|
  23. +-----------------+---+
  24. */
  25. }
  26. }

3.7.2 用戶自定义聚合函数

强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。

1. 弱类型用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。

salary.json

{"name":"jack", "salary":1500}
{"name":"rose", "salary":1300}
{"name":"mike", "salary":1800}
{"name":"jone", "salary":1600}

  1. import org.apache.spark.sql.expressions.MutableAggregationBuffer
  2. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
  3. import org.apache.spark.sql.types._
  4. import org.apache.spark.sql.Row
  5. import org.apache.spark.sql.SparkSession
  6. object MyAverage extends UserDefinedAggregateFunction {
  7. // 聚合函数输入参数的数据类型
  8. def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  9. // 聚合缓冲区中值得数据类型
  10. def bufferSchema: StructType = {
  11. StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  12. }
  13. // 返回值的数据类型
  14. def dataType: DataType = DoubleType
  15. // 对于相同的输入是否一直返回相同的输出。
  16. def deterministic: Boolean = true
  17. // 初始化
  18. def initialize(buffer: MutableAggregationBuffer): Unit = {
  19. // 存工资的总额
  20. buffer(0) = 0L
  21. // 存工资的个数
  22. buffer(1) = 0L
  23. }
  24. // 相同Execute间的数据合并。
  25. def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  26. if (!input.isNullAt(0)) {
  27. buffer(0) = buffer.getLong(0) + input.getLong(0)
  28. buffer(1) = buffer.getLong(1) + 1
  29. }
  30. }
  31. // 不同Execute间的数据合并
  32. def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  33. buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  34. buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  35. }
  36. // 计算最终结果
  37. def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
  38. }
  1. import org.apache.spark.sql.SparkSession
  2. object DefFunctionText {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession.builder()
  5. .master("local[*]")
  6. .appName(this.getClass.getSimpleName)
  7. .getOrCreate()
  8. import spark.implicits._
  9. // 注册函数
  10. spark.udf.register("myAverage", MyAverage)
  11. val df = spark.read.json("e://salary.json")
  12. df.createOrReplaceTempView("employees")
  13. df.show()
  14. // +----+------+
  15. // |name|salary|
  16. // +----+------+
  17. // |jack| 1500|
  18. // |rose| 1300|
  19. // |mike| 1800|
  20. // |jone| 1600|
  21. // +----+------+
  22. val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
  23. result.show()
  24. // +--------------+
  25. // |average_salary|
  26. // +--------------+
  27. // | 1550.0|
  28. // +--------------+
  29. }
  30. }

2. 强类型用户自定义聚合函数:通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资

  1. import org.apache.spark.sql.expressions.Aggregator
  2. import org.apache.spark.sql.Encoder
  3. import org.apache.spark.sql.Encoders
  4. import org.apache.spark.sql.SparkSession
  5. // 既然是强类型,可能有case类
  6. case class Employee(name: String, salary: Long)
  7. case class Average(var sum: Long, var count: Long)
  8. object MyAverage1 extends Aggregator[Employee, Average, Double] {
  9. // 定义一个数据结构,保存工资总数和工资总个数,初始都为0
  10. def zero: Average = Average(0L, 0L)
  11. // Combine two values to produce a new value. For performance, the function may modify `buffer`
  12. // and return it instead of constructing a new object
  13. def reduce(buffer: Average, employee: Employee): Average = {
  14. buffer.sum += employee.salary
  15. buffer.count += 1
  16. buffer
  17. }
  18. // 聚合不同execute的结果
  19. def merge(b1: Average, b2: Average): Average = {
  20. b1.sum += b2.sum
  21. b1.count += b2.count
  22. b1
  23. }
  24. // 计算输出
  25. def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  26. // 设定之间值类型的编码器,要转换成case类
  27. // Encoders.product是进行scala元组和case类转换的编码器
  28. def bufferEncoder: Encoder[Average] = Encoders.product
  29. // 设定最终输出值的编码器
  30. def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  31. }
  1. import org.apache.spark.sql.{Dataset, SparkSession, TypedColumn}
  2. object DefFunctionText2 {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession.builder()
  5. .master("local[*]")
  6. .appName(this.getClass.getSimpleName)
  7. .getOrCreate()
  8. //导入隐式转换
  9. import spark.implicits._
  10. //创建Dataset
  11. val ds: Dataset[Employee] = spark.read.json("e://salary.json").as[Employee]
  12. ds.createOrReplaceTempView("employees")
  13. ds.show()
  14. // +----+------+
  15. // |name|salary|
  16. // +----+------+
  17. // |jack| 1500|
  18. // |rose| 1300|
  19. // |mike| 1800|
  20. // |jone| 1600|
  21. // +----+------+
  22. val averageSalary: TypedColumn[Employee, Double] = MyAverage1.toColumn.name("average_salary")
  23. val result = ds.select(averageSalary)
  24. result.show()
  25. // +--------------+
  26. // |average_salary|
  27. // +--------------+
  28. // | 1550.0|
  29. // +--------------+
  30. }
  31. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/785609
推荐阅读
相关标签
  

闽ICP备14008679号