当前位置:   article > 正文

大数据开发之SparkSQL_spark sql

spark sql

第 1 章:spark sql概述

1.1 什么是spark sql

1、spark sql是spark用于结构化数据处理的spark模块
1)半结构化数据(日志数据)
在这里插入图片描述

2)结构化数据(数据库数据)
在这里插入图片描述

1.2 为什么要有sparksql

在这里插入图片描述

hive on spark:hive既作为存储元数据又负责sql的解析优化,语法是hql语法,执行引擎编程了spark,spark负责采用rdd执行。
在这里插入图片描述

spark on hive:hive只作为存储元数据,spark负责sql解析优化,语法是spark sql语法,spark底层采用优化后的df或者ds执行。

1.3 spark sql原理

spark sql它提供了2个编程抽象,dataframe、dataset(类似spark core中的rdd)
在这里插入图片描述

1.3.1 什么是dataframe

1、dataframe是一种类似rdd的分布式数据集,类似于传统数据库中的二维表格。
2、dataframe与rdd的主要区别在于,dataframe带有schema元信息,即dataframe所表示的二维表数据集的每一列都带有名称和类型。
在这里插入图片描述

左侧的rdd[person]虽然person为类型参数,但spark框架本身不了解person类的内部结构。而右侧的dataframe却提供了详细的结构信息,使得spark sql可以清楚的指导这些数据集中包含哪些列,每列的名称和类型各是什么。
3、spark sql性能上比rdd要高。因为spark sql了解数据内部结构,从而对藏于dataframe背后的数据源以及作用域dataframe之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观rdd,由于无从得知所存数据元素的具体内部结构,spark core只能在stage层面进行简单、通用的流水线优化。
在这里插入图片描述
在这里插入图片描述

1.3.2 什么是dataset

dataset是分布式数据集。
dataset是强类型的。比如可以有dataset[car],dataset[user]。具有类型安全检查。
dataframe是dataset的特例,type dataframe=dataset[row],row是一个类型,跟car、user这些的类型一样,所有的表结构信息都用row来表示。

1.3.3 rdd、dataframe和dataset之间关系

1、发展历史
在这里插入图片描述

如果同样的数据都给到这三种数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在后期的spark版本中,dataset有可能会逐步取代rdd和dataframe成为唯一的api接口。
2、三者的共性
1)rdd、dataframe、dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利。
2)三者都是惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到action行动算子如foreach时,三者才会开始遍历运算
3)三者有许多共同的函数,如filter,排序等
4)三者都会根据spark的内存情况自动缓存运算
5)三者都有分区概念

1.4 spark sql的特点

1、易整合
无缝的整合了sql查询和spark编程。
在这里插入图片描述

2、统一的数据访问方式
使用相同的方式连接不同的数据源
在这里插入图片描述

3、兼容hive
在已有的仓库上直接运行sql或者hql
在这里插入图片描述

4、标准的数据连接
通过jdbc或者odbc来连接
在这里插入图片描述

第2 章:spark sql编程

本章重点学习如何使用dataframe和dataset进行编程,以及他们之间的关系和转换,关于具体的sql书写不是本章的重点。

2.1 sparksession新的起始点

在老的版本中,sparksql提供两种sql查询起始点:
1、一个是sqlcontext,用于spark自己提供的sql查询
2、一个叫hivecontext,用于连接hive的查询
sparksession是spark最新的sql查询起始点,实质上是sqlcontext和hivecontext的组合,所以在sqlcontext和hivecontext上可用的api在sparksession上同样是可用使用的。
sparksession内部封装了sparkcontext,所以计算实际上是由sparkcontext完成的。当我们使用spark-shell的时候,spark框架会自动地创建一个名称叫做spark的sparksession,就像我们以前可以自动获取到一个sc来表示sparkcontext。

[atguigu@hadoop102 spark-local]$ bin/spark-shell

20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1599880621394).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.2 dataframe

dataframe是一种类似于rdd的分布式数据集,类似于传统数据库中的二维表格

2.2.1 创建dataframe

在spark sql中sparksession是创建dataframe和执行sql的入口,创建dataframe有三种方式:
通过spark的数据源进行创建;
从一个存在的rdd进行转换;
还可以从hive table进行查询返回;
1、从spark数据源进行创建
1)数据准备,在/opt/module/spark-local目录下创建一个user.json文件

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}

  • 1
  • 2
  • 3
  • 4

2)查看spark支持创建文件的数据源格式,使用tab键查看

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

  • 1
  • 2
  • 3

3)读取json文件创建dataframe

scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

  • 1
  • 2
  • 3

注意:如果从内存种获取数据,spark可以指导数据类型具体是什么,如果是数字,默认作为int处理;但是从文件种读取的数字,不能确定是什么类型,所以用bigint接收,可以和long类型转换,但是和int不能进行转换。
4)查看dataframe算子

scala> df.
  • 1

5)展示结果

scala> df.show
+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2、从rdd进行转换
3、hive table进行查询返回

2.2.2 sql风格语法

sql语法风格是指我们查询数据的时候使用sql语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
视图:对特定表的数据的查询结果重复使用。view只能查询,不能修改和插入。

1、临时视图
1)创建一个dataframe

scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

  • 1
  • 2
  • 3

2)对dataframe创建一个临时视图

scala> df.createOrReplaceTempView("user")
  • 1

3)通过sql语句实现查询全表

scala> val sqlDF = spark.sql("SELECT * FROM user")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]4)结果展示

  • 1
  • 2
  • 3
  • 4

4)结果展示

scala> sqlDF.show
+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5)求年龄的平均值

scala> val sqlDF = spark.sql("SELECT avg(age) from user")
sqlDF: org.apache.spark.sql.DataFrame = [avg(age): double]

  • 1
  • 2
  • 3

6)结果展示

scala> sqlDF.show
+--------+                                                                      
|avg(age)|
+--------+
|    19.0|
+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

7)创建一个新会话再执行,发现视图找不到

scala> spark.newSession().sql("SELECT avg(age) from user ").show()
org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;

  • 1
  • 2
  • 3

注意:普通临时视图是session范围内的,如果向全局有效,可以创建全局临时视图。
2、全局视图
1)对于dataframe创建一个全局视图

scala> df.createOrReplaceGlobalTempView ("user2")
  • 1

2)通过sql语句查询全表

scala> spark.sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3)新建session,通过sql语句实现查询全表

scala> spark.newSession().sql("SELECT * FROM global_temp.user2").show()
+---+--------+
|age|    name|
+---+--------+
| 20|qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.2.3 dsl风格语法

dataframe提供一个特定领域语言去管理格式化的数据,可以在scala,java,python和r种使用dsl,使用dsl语法风格不必去创建临时视图了。
1、创建一个dataframe

scala> val df = spark.read.json("/opt/module/spark-local/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

  • 1
  • 2
  • 3

2、查看dataframe的schema信息

scala> df.printSchema
root
 |-- age: Long (nullable = true)
 |-- name: string (nullable = true)

  • 1
  • 2
  • 3
  • 4
  • 5

3、只查看“name”列数据
注意:列名要用双括号引起来,如果是单引号的话,只能在前面加一个单引号

scala> df.select("name").show()
+--------+
|  name|
+--------+
|qiaofeng|
|  xuzhu|
| duanyu|
+--------+

scala> df.select('name).show
+--------+
|  name|
+--------+
|qiaofeng|
|  xuzhu|
| duanyu|
+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4、查看年龄和姓名,且年龄大于18

scala> df.select("age","name").where("age>18").show
+---+--------+
|age|  name|
+---+--------+
| 20|qiaofeng|
| 19|  xuzhu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

5、查看所有列

scala> df.select("*").show
+---+--------+
|age|  name|
+---+--------+
| 20| qiaofeng|
| 19|   xuzhu|
| 18|  duanyu|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

6、查看"name"列数据以及“age+1”数据
注意:涉及到运算的时候,每列都必须使用$,或者采用单引号表达式:单引号+字段名

scala> df.select($"name",$"age" + 1).show
scala> df.select('name, 'age + 1).show()
scala> df.select('name, 'age + 1 as "newage").show()

+--------+---------+
| name  |(age + 1)|
+--------+---------+
|qiaofeng|    21|
|  xuzhu|    20|
| duanyu|    19|
+--------+---------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

7、查看”age“大于”19“的数据

scala> df.filter("age>19").show
+---+--------+
|age |  name|
+---+--------+
| 20|qiaofeng|
+---+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

8、按照”age“分组,查看数据条数

scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19|    1|
| 18|    1|
| 20|    1|
+---+-----+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

9、求平均年龄avg(age)

scala> df.agg(avg("age")).show
+--------+
|avg(age)|
+--------+
|   19.0|
+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

10、求年龄总和sum(age)

scala> df.agg(max("age")).show
+--------+
|max(age)|
+--------+
|     20|
+--------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.3 dataset

dataset是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建dataset(基本数据类型)

使用基本类型的序列创建dataset。
1、将集合转换为dataset

scala> val ds = Seq(1,2,3,4,5,6).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

  • 1
  • 2
  • 3

2、查看dataset的值

scala> ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
+-----+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.3.2 创建dataset(样例类序列)

使用样例类序列创建dataset。
1、创建一个user的样例类

scala> case class User(name: String, age: Long)
defined class User

  • 1
  • 2
  • 3

2、将集合转换为dataset

scala> val caseClassDS = Seq(User("wangyuyan",18)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[User] = [name: string, age: bigint]

  • 1
  • 2
  • 3

3、查看dataset的值

scala> caseClassDS.show
+---------+---+
|     name|age|
+---------+---+
|wangyuyan|  18|
+---------+---+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注意:在实际开发的时候,很少会把序列转换成dataset,更多是通过rdd和dataframe转换来得到dataset

2.4 rdd、dataframe、dataset相互转换

在这里插入图片描述

2.4.1 idea创建sparksql工程

1、创建一个maven工程sparksqltest
2、在项目sparksqltest上点击右键,add framework support->勾选scala
3、在main下创建scala文件夹,并右键mark directory as sources root->在Scala下创建包名com.atguigu.sparksql
4、输入文件夹准备:在新建的sparksqltest项目上右键->新建input文件夹->在input文件夹上右键->新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}

  • 1
  • 2
  • 3
  • 4

5、在pom.xml文件中添加spark-sql的依赖和scala的编译插件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

<build>
<finalName>SparkSQLTest</finalName>
<plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

6、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQL01_input {

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setAppName("SparkSQLTest").setMaster("local[*]")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 读取数据
        val df: DataFrame = spark.read.json("input/user.json")

        // 4 可视化
        df.show()

        // 5 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2.4.2 rdd与dataframe相互转换

1、rdd转换为dataframe
手动转换:rdd.todf(“列名1”,“列名2”)
通过样例类反射转换:userrdd.map{x->user(x._1,x._2)}.todf()
2、dataframe转换为rdd
dataframe.rdd
3、在Input/目录下准备user.txt

qiaofeng,20
xuzhu,19
duanyu,18 

  • 1
  • 2
  • 3
  • 4

4、代码实现

package com.atguigu.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object SparkSQL02_RDDAndDataFrame {
  def main(args: Array[String]): Unit = {
    //TODO 1 创建SparkConf配置文件,并设置App名称
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    //TODO 2 利用SparkConf创建sc对象
    val sc = new SparkContext(conf)
    val lineRDD: RDD[String] = sc.textFile("input\\user.txt")
    //普通rdd,数据只有类型,没有列名(缺少元数据)
    val rdd: RDD[(String, Long)] = lineRDD.map {
      line => {
        val fileds: Array[String] = line.split(",")
        (fileds(0), fileds(1).toLong)
      }
    }
    //TODO 3 利用SparkConf创建sparksession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //RDD和DF、DS转换必须要导的包(隐式转换),spark指的是上面的sparkSession
    import spark.implicits._

    //TODO RDD=>DF
    //普通rdd转换成DF,需要手动为每一列补上列名(补充元数据)
    val df: DataFrame = rdd.toDF("name", "age")

    df.show()

    //样例类RDD,数据是一个个的样例类,有类型,有属性名(列名),不缺元数据
    val userRDD: RDD[User] = rdd.map {
      t => {
        User(t._1, t._2)
      }
    }
    //样例类RDD转换DF,直接toDF转换即可,不需要补充元数据
    val userDF: DataFrame = userRDD.toDF()
    userDF.show()

    //TODO DF=>RDD
    //DF转换成RDD,直接.rdd即可,但是要注意转换出来的rdd数据类型会变成Row
    val rdd1: RDD[Row] = df.rdd
    val userRDD2: RDD[Row] = userDF.rdd
    rdd1.collect().foreach(println)
    userRDD2.collect().foreach(println)

    //如果想获取到row里面的数据,直接row.get(索引)即可
    val rdd2: RDD[(String, Long)] = rdd1.map {
      row => {
        (row.getString(0), row.getLong(1))
      }
    }

    rdd2.collect().foreach(println)

    //TODO 4 关闭资源
    sc.stop()
  }
}
case class User(name:String,age:Long)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

2.4.3 rdd与dataset相互转换

1、rdd转换为dataset
rdd.map{x->user(x._1,x._2)},tods()
sparksql能够自动将包含有样例类的rdd转换成dataset,样例类定义了table的结构,样例类属性通过反射编程了表的列名。样例类可以包含诸如seq或者array等复杂的结构。
2、dataset转换为rdd
ds.rdd
3、代码实现

package com.atguigu.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object SparkSQL03_RDDAndDataSet {
  def main(args: Array[String]): Unit = {
    //TODO 1 创建SparkConf配置文件,并设置App名称
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    //TODO 2 利用SparkConf创建sc对象
    val sc = new SparkContext(conf)
    val lineRDD: RDD[String] = sc.textFile("input\\user.txt")
    //普通rdd,数据只有类型,没有列名(缺少元数据)
    val rdd: RDD[(String, Long)] = lineRDD.map {
      line => {
        val fileds: Array[String] = line.split(",")
        (fileds(0), fileds(1).toLong)
      }
    }
    //TODO 3 利用SparkConf创建sparksession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //RDD和DF、DS转换必须要导的包(隐式转换),spark指的是上面的sparkSession
    import spark.implicits._

    //TODO RDD=>DS
    //普通rdd转DS,没办法补充元数据,因此一般不用
    val ds: Dataset[(String, Long)] = rdd.toDS()
    ds.show()

    //样例类RDD,数据是一个个的样例类,有类型,有属性名(列名),不缺元数据
    val userRDD: RDD[User] = rdd.map {
      t => {
        User(t._1, t._2)
      }
    }
    //样例类RDD转换DS,直接toDS转换即可,不需要补充元数据,因此转DS一定要用样例类RDD
    val userDs: Dataset[User] = userRDD.toDS()
    userDs.show()

    //TODO DS=>RDD
    //ds转成rdd,直接.rdd即可,并且ds不会改变rdd里面的数据类型
    val rdd1: RDD[(String, Long)] = ds.rdd
    val userRDD2: RDD[User] = userDs.rdd
    
    //TODO 4 关闭资源
    sc.stop()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

2.4.4 dataframe与dataset相互转换

1、dataframe转为dataset
df.as[user]
2、dataset转换为dataframe
ds.todf
3、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object SparkSQL04_DataFrameAndDataSet {

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 读取数据
        val df: DataFrame = spark.read.json("input/user.json")

        //4.1 RDD和DataFrame、DataSet转换必须要导的包
        import spark.implicits._

        // 4.2 DataFrame 转换为DataSet
        val userDataSet: Dataset[User] = df.as[User]
        userDataSet.show()

        // 4.3 DataSet转换为DataFrame
        val userDataFrame: DataFrame = userDataSet.toDF()
        userDataFrame.show()

        // 5 释放资源
        spark.stop()
    }
}

case class User(name: String,age: Long)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2.5 用户自定义函数

2.5.1 udf

1、udf:一行进入,一行出
2、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQL05_UDF{

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 读取数据
        val df: DataFrame = spark.read.json("input/user.json")

        // 4 创建DataFrame临时视图
        df.createOrReplaceTempView("user")
        
        // 5 注册UDF函数。功能:在数据前添加字符串“Name:”
        spark.udf.register("addName", (x:String) => "Name:"+ x)

        // 6 调用自定义UDF函数
        spark.sql("select addName(name), age from user").show()

        // 7 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

2.5.2 udaf

1、udaf:输入多行,返回一行
2、spark3.x推荐使用extends aggregator自定义udaf,属于强类型的dataset方式
3、spark2.x使用extends userdefinedaggregatefunction,数以弱类型的dataframe
4、案例:
需求:实现求平均年龄,自定义udaf,myavg(age)
1)自定义聚合函数实现-强类型

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}

object SparkSQL06_UDAF {

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 读取数据
        val df: DataFrame = spark.read.json("input/user.json")

        // 4 创建DataFrame临时视图
        df.createOrReplaceTempView("user")
        
        // 5 注册UDAF
        spark.udf.register("myAvg", functions.udaf(new MyAvgUDAF()))

        // 6 调用自定义UDAF函数
        spark.sql("select myAvg(age) from user").show()

        // 7 释放资源
        spark.stop()
    }
}

//输入数据类型
case class Buff(var sum: Long, var count: Long)

/**
 * 1,20岁; 2,19岁; 3,18岁
 * IN:聚合函数的输入类型:Long
 * Buff : sum = (18+19+20)  count = 1+1+1
 * OUT:聚合函数的输出类型:Double  (18+19+20) / 3
 */
class MyAvgUDAF extends Aggregator[Long, Buff, Double] {

    // 初始化缓冲区
    override def zero: Buff = Buff(0L, 0L)

    // 将输入的年龄和缓冲区的数据进行聚合
    override def reduce(buff: Buff, age: Long): Buff = {
        buff.sum = buff.sum + age
        buff.count = buff.count + 1
        buff
    }

    // 多个缓冲区数据合并
    override def merge(buff1: Buff, buff2: Buff): Buff = {
        buff1.sum = buff1.sum + buff2.sum
        buff1.count = buff1.count + buff2.count
        buff1
    }

    // 完成聚合操作,获取最终结果
    override def finish(buff: Buff): Double = {
        buff.sum.toDouble / buff.count
    }

    // SparkSQL对传递的对象的序列化操作(编码)
    // 自定义类型就是product   自带类型根据类型选择
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

第 3 章:sparksql数据的加载和保存

3.1 加载数据

1、加载数据通用方法
spark.read.load是加载数据的通用方式
2、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

object SparkSQL08_Load{

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3.1 spark.read直接读取数据:csv   format   jdbc   json   load   option
        // options   orc   parquet   schema   table   text   textFile
        // 注意:加载数据的相关参数需写到上述方法中,
        // 如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
        spark.read.json("input/user.json").show()

        // 3.2 format指定加载数据类型
        // spark.read.format("…")[.option("…")].load("…")
         // format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"text"
         // load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"text"格式下需要传入加载数据路径
         // option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
        spark.read.format("json").load ("input/user.json").show

        // 4 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

3.2 保存数据

1、保存数据通用方法
df.write.save是保存数据的通用方法
2、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object SparkSQL09_Save{

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 获取数据
        val df: DataFrame = spark.read.json("input/user.json")

        // 4.1 df.write.保存数据:csv  jdbc   json  orc   parquet  text
        // 注意:保存数据的相关参数需写到上述方法中。如:text需传入加载数据的路径,JDBC需传入JDBC相关参数。
        // 默认保存为parquet文件(可以修改conf.set("spark.sql.sources.default","json"))
        df.write.save("output")

        // 默认读取文件parquet
        spark.read.load("output").show()

        // 4.2 format指定保存数据类型
        // df.write.format("…")[.option("…")].save("…")
         // format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"text"。
         // save ("…"):在"csv"、"orc"、"parquet"和"text"(单列DF)格式下需要传入保存数据的路径。
         // option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
        df.write.format("json").save("output2")

        // 4.3 可以指定为保存格式,直接保存,不需要再调用save了
        df.write.json("output1")

        // 4.4 如果文件已经存在则追加
        df.write.mode("append").json("output2")

        // 如果文件已经存在则忽略(文件存在不报错,也不执行;文件不存在,创建文件)
        df.write.mode("ignore").json("output2")

        // 如果文件已经存在则覆盖
        df.write.mode("overwrite").json("output2")

        // 默认default:如果文件已经存在则抛出异常
        // path file:/E:/ideaProject2/SparkSQLTest/output2 already exists.;
        df.write.mode("error").json("output2")

        // 5 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

3.3 与mysql交互

1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2、从mysql读数据

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object SparkSQL10_MySQL_Read{

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3.1 通用的load方法读取mysql的表数据
        val df: DataFrame = spark.read.format("jdbc")
            .option("url", "jdbc:mysql://hadoop102:3306/gmall")
            .option("driver", "com.mysql.jdbc.Driver")
            .option("user", "root")
            .option("password", "000000")
            .option("dbtable", "user_info")
            .load()

        // 3.2 创建视图
        df.createOrReplaceTempView("user")

        // 3.3 查询想要的数据
        spark.sql("select id, name from user").show()

        // 4 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

3、向mysql写数据

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

object SparkSQL11_MySQL_Write {

    def main(args: Array[String]): Unit = {

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")

        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

        // 3 准备数据
        // 注意:id是主键,不能和MySQL数据库中的id重复
        val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User(3000, "zhangsan"), User(3001, "lisi")))

        val ds: Dataset[User] = rdd.toDS

        // 4 向MySQL中写入数据
        ds.write
            .format("jdbc")
            .option("url", "jdbc:mysql://hadoop102:3306/gmall")
			.option("driver", "com.mysql.jdbc.Driver")
            .option("user", "root")
            .option("password", "000000")
            .option("dbtable", "user_info")
            .mode(SaveMode.Append)
            .save()

        // 5 释放资源
        spark.stop()
    }

    case class User(id: Int, name: String)
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

3.4 与hive交互

sparksql可以采用内嵌hive,也可以采用外部hive。企业开发中,通常采用外部hive。

3.4.1 内嵌hive应用

内嵌hive,元数据存储在derby数据库
1、如果使用spark内嵌的hive,则什么都不用做,直接使用即可。

[atguigu@hadoop102 spark-local]$ bin/spark-shell

scala> spark.sql("show tables").show

  • 1
  • 2
  • 3
  • 4

注意:执行完后,发现多了$spark_home/metastore_db和derby.log,用于存储元数据。
2、创建一个表

scala> spark.sql("create table user(id int, name string)")
  • 1

注意:执行完后,发现多了$spark_home/spark-warehouse/user,用于存储数据库数据。
3、查看数据库

scala> spark.sql("show tables").show
  • 1

4、向表中插入数据

scala> spark.sql("insert into user values(1,'zs')")
  • 1

5、查询数据

scala> spark.sql("select * from user").show
  • 1

注意:然而在实际使用中,几乎没有任何人会使用内置的hive,因为元数据存储在derby数据库,不支持多客户端访问。

3.4.2 外部hive应用

如果spark要接管hive外部已经部署好的hive,需要通过一下几个步骤。
1、为了说明内嵌hive和外部hive区别:删除内嵌hive的metastore_db和spark-warehouse

[atguigu@hadoop102 spark-local]$ rm -rf metastore_db/ spark-warehouse/
  • 1

2、确定原有hive是正常工作的

[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

[atguigu@hadoop102 hive]$ bin/hive

  • 1
  • 2
  • 3
  • 4
  • 5

3、需要把hive-site.xml拷贝到spark的conf/目录下

[atguigu@hadoop102 conf]$ cp hive-site.xml /opt/module/spark-local/conf/
  • 1

4、如果以前hive-site.xml文件中,配置过tez相关信息,注释掉(不是必须)
5、把mysql的驱动copy到spark的jars/目录下

[atguigu@hadoop102 software]$ cp mysql-connector-java-5.1.48.jar /opt/module/spark-local/jars/
  • 1

6、需要提前启动hive服务,/opt/module/hive/bin/hiveservices.sh start(不是必须)
7、如果访问不到hdfs,则需把core-site.xml和hdfs-site.xml拷贝到conf/目录(不是必须)
8、启动spark-shell

[atguigu@hadoop102 spark-local]$ bin/spark-shell
  • 1

9、查询表

scala> spark.sql("show tables").show
  • 1

10、创建一个表

scala> spark.sql("create table student(id int, name string)")
  • 1

11、向表中插入数据

scala> spark.sql("insert into student values(1,'zs')")
  • 1

12、查询数据

scala> spark.sql("select * from student").show
  • 1

3.4.3 运行spark sql cli

spark sql cli可以方便的在本地下运行hive元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动spark sql cli,直接执行sql语句,类型hive窗口。

[atguigu@hadoop102 spark-local]$ bin/spark-sql

spark-sql (default)> show tables;

  • 1
  • 2
  • 3
  • 4

3.4.4 idea操作外部hive

1、添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2、拷贝hive-site.xml到resources目录(如果需要操作hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)
3、代码实现

package com.atguigu.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object SparkSQL12_Hive {

    def main(args: Array[String]): Unit = {

        System.setProperty("HADOOP_USER_NAME","atguigu")

        // 1 创建上下文环境配置对象
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLTest")
        // 2 创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

        // 3 连接外部Hive,并进行操作
        spark.sql("show tables").show()
        spark.sql("create table user3(id int, name string)")
        spark.sql("insert into user3 values(1,'zs')")
        spark.sql("select * from user3").show

        // 4 释放资源
        spark.stop()
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号