当前位置:   article > 正文

spark学习笔记(十)——sparkSQL核心编程-自定义函数UDF、UDAF/读取保存数据/五大数据类型_java spark udf自定义 aggregator

java spark udf自定义 aggregator

目录

用到的全部依赖

用户自定义函数

UDF 

UDAF

弱类型

强类型

数据的读取与保存

通用的方式

数据类型

(1)JSON

(2)CSV

(3)Parquet

(4)MySQL

(5)hive 


用到的全部依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-sql_2.12</artifactId>
  10. <version>3.0.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>mysql</groupId>
  14. <artifactId>mysql-connector-java</artifactId>
  15. <version>5.1.27</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.spark</groupId>
  19. <artifactId>spark-hive_2.12</artifactId>
  20. <version>3.0.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.hive</groupId>
  24. <artifactId>hive-exec</artifactId>
  25. <version>1.2.1</version>
  26. </dependency>
  27. </dependencies>
  28. <build>
  29. <plugins>
  30. <!--该插件用于把Scala代码编译成为class文件-->
  31. <plugin>
  32. <groupId>net.alchim31.maven</groupId>
  33. <artifactId>scala-maven-plugin</artifactId>
  34. <version>3.2.2</version>
  35. <executions>
  36. <execution>
  37. <!--声明绑定到maven的compile阶段-->
  38. <goals>
  39. <goal>testCompile</goal>
  40. </goals>
  41. </execution>
  42. </executions>
  43. </plugin>
  44. <plugin>
  45. <groupId>org.apache.maven.plugins</groupId>
  46. <artifactId>maven-assembly-plugin</artifactId>
  47. <version>3.1.0</version>
  48. <configuration>
  49. <descriptorRefs>
  50. <descriptorRef>jar-with-dependencies</descriptorRef>
  51. </descriptorRefs>
  52. </configuration>
  53. <executions>
  54. <execution>
  55. <id>make-assembly</id>
  56. <phase>package</phase>
  57. <goals>
  58. <goal>single</goal>
  59. </goals>
  60. </execution>
  61. </executions>
  62. </plugin>
  63. </plugins>
  64. </build>

用户自定义函数

用户可以通过spark.udf功能添加自定义函数,实现自定义功能。

UDF 

(1)创建DataFrame

在spark的bin目录下创建input文件夹,在input里创建user.json文件,user.json内容如下:

{"username":"zj","age":20}
{"username":"xx","age":21}
{"username":"yy","age":22}

val df = spark.read.json("input/user.json")

(2)注册UDF

 spark.udf.register("addName",(x:String)=> "Name:"+x)

(3)创建临时表

 df.createOrReplaceTempView("people")

(4)应用UDF

spark.sql("Select addName(username),age from people").show()

UDAF

强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数max()min(),count(),avg()等等。

用户可以设定自定义聚合函数,通过继承UserDefinedAggregateFunction来实现用户自定义弱类型聚合函数。Spark3.0版本推荐使用强类型聚合函数Aggregator。

在datas目录下新建user.json文件,内容为:

{"username": "zj","age": 25}
{"username": "qq","age": 32}
{"username": "ww","age": 43}

弱类型

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  3. import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
  4. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, types}
  5. object sparkSQL_UDAF {
  6. def main(args: Array[String]): Unit = {
  7. //TODO 创建sparkSQL运行环境
  8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  9. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  10. //TODO 执行逻辑操作 计算平均年龄
  11. //创建DataFrame
  12. val df = spark.read.json("datas/user.json")
  13. //创建临时表
  14. df.createOrReplaceTempView("user")
  15. //自定义
  16. spark.udf.register("ageAvg",new AvgUDAF())
  17. spark.sql("select ageAvg(age) from user").show()
  18. //TODO 关闭环境
  19. spark.stop()
  20. }
  21. /*
  22. 自定义函数:计算平均值
  23. 1.继承
  24. 2.重写方法
  25. */
  26. class AvgUDAF extends UserDefinedAggregateFunction{
  27. //输入数据的结构
  28. override def inputSchema: StructType = {
  29. StructType(
  30. Array(
  31. StructField("age",LongType)
  32. )
  33. )
  34. }
  35. //缓冲区数据的结构
  36. override def bufferSchema: StructType = {
  37. StructType(
  38. Array(
  39. StructField("total",LongType),
  40. StructField("count",LongType)
  41. )
  42. )
  43. }
  44. //函数计算结果数据类型
  45. override def dataType: DataType = LongType
  46. //函数稳定性
  47. override def deterministic: Boolean = true
  48. //缓冲区初始化
  49. override def initialize(buffer: MutableAggregationBuffer): Unit = {
  50. buffer.update(0,0L)
  51. buffer.update(1,0L)
  52. }
  53. //根据输入的值更新缓冲区数据
  54. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  55. buffer.update(0,buffer.getLong(0)+input.getLong(0))
  56. buffer.update(1,buffer.getLong(1)+1)
  57. }
  58. //缓冲区数据合并
  59. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  60. buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
  61. buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
  62. }
  63. //计算平均值
  64. override def evaluate(buffer: Row): Any = {
  65. buffer.getLong(0)/buffer.getLong(1)
  66. }
  67. }
  68. }

结果

强类型

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.expressions.Aggregator
  3. import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}
  4. object sparkSQL_UDAF02 {
  5. def main(args: Array[String]): Unit = {
  6. //TODO 创建sparkSQL运行环境
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  8. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  9. //TODO 执行逻辑操作 计算平均年龄
  10. //创建DataFrame
  11. val df = spark.read.json("datas/user.json")
  12. //创建临时表
  13. df.createOrReplaceTempView("user")
  14. //自定义操作
  15. spark.udf.register("ageAvg",functions.udaf(new AvgUDAF()))
  16. spark.sql("select ageAvg(age) from user").show()
  17. //TODO 关闭环境
  18. spark.stop()
  19. }
  20. /*
  21. 自定义函数:计算平均值
  22. 1.继承Aggregator
  23. IN:输入的数据类型Long
  24. BUF:缓冲区的数据类型Buff
  25. OUT:输出的数据类型Long
  26. 2.重写方法
  27. */
  28. case class Buff(var total:Long,var count:Long)
  29. class AvgUDAF extends Aggregator[Long,Buff,Long]{
  30. //初始值/零值 缓冲区的初始化
  31. override def zero: Buff = {
  32. Buff(0L,0L)
  33. }
  34. //根据输入的数据更新缓冲区的数据
  35. override def reduce(buff: Buff, in: Long): Buff = {
  36. buff.total = buff.total + in
  37. buff.count = buff.count + 1
  38. buff
  39. }
  40. //合并缓冲区
  41. override def merge(buff1: Buff, buff2: Buff): Buff = {
  42. buff1.total = buff1.total + buff2.total
  43. buff1.count = buff1.count + buff2.count
  44. buff1
  45. }
  46. //计算结果
  47. override def finish(buff: Buff): Long = {
  48. buff.total / buff.count
  49. }
  50. //缓冲区的编码操作
  51. override def bufferEncoder: Encoder[Buff] = Encoders.product
  52. //输出的编码操作
  53. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  54. }
  55. }

结果

数据的读取与保存

通用的方式

SparkSQL提供了通用的保存数据和读取数据的方式;通用指的是使用相同的API根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式是parquet。

(1)读取数据

读取数据的通用方法:spark.read.load

数据类型:csv、format、jdbc、json、load、option、options、orc、parquet、schema、table、text、textFile

读取不同格式的数据要对不同的数据格式进行设定:

spark.read.format("…")[.option("…")].load("…")

format("…"):指定加载的数据类型:csv、jdbc。json、orc、parquet、textFile;

load("…"):在csv、jdbc、json、orc、parquet、textFile格式下传入数据的路径;

option("…"):在jdbc格式下需要传入JDBC相应参数:url、user、password、dbtable;

注:直接在文件上进行查询:文件格式 ‘文件路径’

spark.sql("select * from json.`datas/user.json`").show

(2)保存数据

保存数据的通用方法:df.write.save

保存不同格式的数据,可以对不同的数据格式进行设定:

df.write.format("…")[.option("…")].save("…")

format("…")指定保存的数据类型:csvjdbc、jsonorcparquet、textFile;

save ("…"):在csvorcparquet、textFile格式下保存数据的路径;

option("…"):在jdbc格式下需要传入JDBC相应参数:urluserpassword、dbtable;

保存操作可以使用SaveMode用来指明如何处理数据,使用mode()方法来设置;有一点很重要这些 SaveMode都是没有加锁的, 也不是原子操作。

Scala/Javaany languagemeaning
SaveMode ErrorIfExists(默认)“error”如果文件已经存在则异常
SaveMode Append“append”如果文件已经存在则追加
SaveMode Overwrite“overwrite”

如果文件已经存在则覆盖

SaveMode Ignore“ignore”如果文件已经存在则忽略
df.write.mode("append").json("/data/output")

数据类型

(1)JSON

SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row],可以通过 SparkSession.read.json()去加载JSON文件,且读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。

如:

{"username": "zj","age": 25}
{"username": "qq","age": 32}
{"username": "ww","age": 43}
  1. //导入隐式转换
  2. import spark.implicits._
  3. //加载JSON文件
  4. val path = "input/user.json"
  5. val df = spark.read.json(path)
  6. //创建临时表
  7. df.createOrReplaceTempView("user")
  8. //数据查询
  9. val userDF = spark.sql("select name from user where age between 20 and 40")
  10. userDF.show()

(2)CSV

SparkSQL可以配置CSV文件的列表信息、读取CSV文件;CSV文件的第一行设置为数据列。

spark.read.format("csv").option("sep", ";").option("inferSchema","true").option("header", "true").load("input/user.csv").show

(3)Parquet

SparkSQL的默认数据源为Parquet格式;

Parquet是一种能够有效存储嵌套数据的列式存储格式;

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format;

修改配置项spark.sql.sources.default可修改默认数据源格式。

  1. //加载数据
  2. val df = spark.read.load("/input/users.parquet")
  3. df.show
  4. //保存数据
  5. df.write.mode("append").save("/output")

(4)MySQL

SparkSQL可以通过JDBC从关系型数据库中以读取数据的方式创建DataFrame,通过对 DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

使用spark-shell操作可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。

1)导入依赖

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.27</version>
  5. </dependency>

2)读取数据/写入数据

创建数据库

  1. create database spaek;
  2. use spark;

创建表

  1. CREATE TABLE IF NOT EXISTS Produce
  2. (
  3. id int NOT NULL,
  4. name varchar(45) NOT NULL,
  5. age INT NULL,
  6. PRIMARY KEY (id)
  7. )
  8. ENGINE = innodb;

添加mysql数据

insert into user values (1,'zj',24),(2,'zjj',34),(7,'zjjj',42);

spark代码:

  1. import java.util.Properties
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
  4. object sparkSQL_JDBC {
  5. def main(args: Array[String]): Unit = {
  6. //TODO 创建sparkSQL运行环境
  7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  8. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  9. import spark.implicits._
  10. //TODO 执行逻辑操作
  11. //读取MySQL数据 方式一
  12. val df = spark.read
  13. .format("jdbc")
  14. .option("url","jdbc:mysql://hadoop01:3306/spark")
  15. .option("driver","com.mysql.jdbc.Driver")
  16. .option("user","root")
  17. .option("password","123456")
  18. .option("dbtable","user")
  19. .load()
  20. df.show()
  21. //读取MySQL数据 方式二
  22. spark.read.format("jdbc")
  23. .options(Map("url"->"jdbc:mysql://hadoop01:3306/spark?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
  24. //读取MySQL数据 方式三
  25. val props: Properties = new Properties()
  26. props.setProperty("user", "root")
  27. props.setProperty("password", "123456")
  28. val dff: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop01:3306/spark",
  29. "user", props)
  30. dff.show
  31. //保存数据
  32. df.write
  33. .format("jdbc")
  34. .option("url","jdbc:mysql://hadoop01:3306/spark")
  35. .option("driver","com.mysql.jdbc.Driver")
  36. .option("user","root")
  37. .option("password","123456")
  38. .option("dbtable","user1")
  39. .mode(SaveMode.Append)
  40. .save()
  41. //TODO 关闭环境
  42. spark.stop()
  43. }
  44. }

 运行结果:

  

(5)hive 

1)Apache Hive是Hadoop上的SQL引擎,SparkSQL包含Hive支持,支持Hive表访问、UDF、Hive查询语言HQL等。

2)在Spark SQL中包含Hive库,并不需要事先安装Hive。最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果下载的是二进制版本的Spark,应该已经在编译时添加了Hive支持。

3)若要把Spark SQL连接到一个部署好的Hive上,必须把hive-site.xml复制到Spark的配置文件目录中($SPARK_HOME/conf)

4)没有部署好HiveSpark SQL也可以运行。 需要注意的是,如果没有部署好HiveSpark SQL会在当前的工作目录中创建出自己的Hive元数据仓库metastore_db。尝试使用HQL中的CREATE TABLE语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse目录中

注:如果classpath中有配好的hdfs-site.xml,默认的文件系统就是HDFS,否则就是本地文件系统,spark-shell默认Hive支持;代码中默认不支持,需要手动指定)。

5)连接外部已经部署好的Hive,需要几个步骤:

Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下;

Mysql的驱动拷贝jars/目录下;

如果访问不到hdfs,则需要把core-site.xmlhdfs-site.xml拷贝到conf/目录下;

重启spark-shell。

6)Spark SQL CLI 可以在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行命令bin/spark-sql启动Spark SQL CLI,直接执行SQL语句,类似Hive窗口。

7)Spark Thrift Server是Spark基于HiveServer2实现的一个Thrift服务,无缝兼容HiveServer2;Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hivebeeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的只是取代HiveServer2,它依旧可以和Hive Metastore进行交互,获取到 hive 的元数据。

连接Thrift Server,需要几个步骤:

Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下;

Mysql的驱动拷贝jars/目录下;

如果访问不到hdfs,则需要把core-site.xmlhdfs-site.xml拷贝到conf/目录下;

启动Thrift Server。

注:sbin/start-thriftserver.sh

使用beeline连接Thrift Server:bin/beeline -u jdbc:hive2://hadoop01:10000 -n root

8)导入依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-hive_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hive</groupId>
  8. <artifactId>hive-exec</artifactId>
  9. <version>1.2.1</version>
  10. </dependency>

9)将hive-site.xml文件拷贝到项目的resources目录中,代码实现:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.{SparkSession}
  3. object sparkSQL_hive {
  4. def main(args: Array[String]): Unit = {
  5. //TODO 创建sparkSQL运行环境
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  7. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  8. //TODO 执行逻辑操作
  9. //使用sparkSQL连接hive
  10. //1.拷贝hive-site.xml文件到classpath下
  11. //2.启用hive支持
  12. //3.增加依赖
  13. spark.sql("show tables").show()
  14. //TODO 关闭环境
  15. spark.stop()
  16. }
  17. }

本文仅仅是学习笔记的记录!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/603682
推荐阅读
相关标签
  

闽ICP备14008679号