当前位置:   article > 正文

SparkSQL编程_2、构建样例类,包含用户id,用户地址,字段格式根据需要定义,将上述rdd转换为对应da

2、构建样例类,包含用户id,用户地址,字段格式根据需要定义,将上述rdd转换为对应da

第1章 Spark SQL概述

1.1 什么是Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

1.2 什么是DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame也是懒执行的,性能上比RDD,主要原因:

优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。

1.3 什么是DataSet

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。

1)是DataFrame API的一个扩展,是SparkSQL最新的数据抽象;

2)用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;

3)用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;

4)DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。

第2章 Spark SQL编程

2.1 SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

2.2 DataFrame

2.2.1 创

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

  1. 1)查看Spark数据源进行创建的文件格式
  2. scala> spark.read.
  3. csv format jdbc json load option options orc parquet schema table text textFile
  4. 2)读取json文件创建DataFrame
  5. scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
  6. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  7. 3)展示结果
  8. scala> df.show
  9. +----+-------+
  10. | age| name|
  11. +----+-------+
  12. |null|Michael|
  13. | 30| Andy|
  14. | 19| Justin|
  15. +----+-------+

2.2.2 SQL风格语法(主要)

  1. 1)创建一个DataFrame
  2. scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
  3. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  4. 2)对DataFrame创建一个临时表
  5. scala> df.createOrReplaceTempView("people")
  6. 3)通过SQL语句实现查询全表
  7. scala> val sqlDF = spark.sql("SELECT * FROM people")
  8. sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  9. 4)结果展示
  10. scala> sqlDF.show
  11. +----+-------+
  12. | age| name|
  13. +----+-------+
  14. |null|Michael|
  15. | 30| Andy|
  16. | 19| Justin|
  17. +----+-------+

注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表需要全路径访问,如:global_temp.people

  1. 5)对于DataFrame创建一个全局表
  2. scala> df.createGlobalTempView("people")
  3. 6)通过SQL语句实现查询全表
  4. scala> spark.sql("SELECT * FROM global_temp.people").show()
  5. +----+-------+
  6. | age| name|
  7. +----+-------+
  8. |null|Michael|
  9. | 30| Andy|
  10. | 19| Justin|
  11. scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
  12. +----+-------+
  13. | age| name|
  14. +----+-------+
  15. |null|Michael|
  16. | 30| Andy|
  17. | 19| Justin|
  18. +----+-------+

2.2.3 DSL风格语法(次要)

  1. 1)创建一个DataFrame
  2. scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
  3. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  4. 2)查看DataFrame的Schema信息
  5. scala> df.printSchema
  6. root
  7. |-- age: long (nullable = true)
  8. |-- name: string (nullable = true)
  9. 3)只查看”name”列数据
  10. scala> df.select("name").show()
  11. +-------+
  12. | name|
  13. +-------+
  14. |Michael|
  15. | Andy|
  16. | Justin|
  17. +-------+
  18. 4)查看”name”列数据以及”age+1”数据
  19. scala> df.select($"name", $"age" + 1).show()
  20. +-------+---------+
  21. | name|(age + 1)|
  22. +-------+---------+
  23. |Michael| null|
  24. | Andy| 31|
  25. | Justin| 20|
  26. +-------+---------+
  27. 5)查看”age”大于”21”的数据
  28. scala> df.filter($"age" > 21).show()
  29. +---+----+
  30. |age|name|
  31. +---+----+
  32. | 30|Andy|
  33. +---+----+
  34. 6)按照”age”分组,查看数据条数
  35. scala> df.groupBy("age").count().show()
  36. +----+-----+
  37. | age|count|
  38. +----+-----+
  39. | 19| 1|
  40. |null| 1|
  41. | 30| 1|
  42. +----+-----+

2.2.4 RDD转换为DateFrame

注意:如果需要RDDDF或者DS之间操作,那么都需要引入 import spark.implicits._  spark不是包名,而是sparkSession对象的名称】

  1. 前置条件:导入隐式转换并创建一个RDD
  2. scala> import spark.implicits._
  3. import spark.implicits._
  4. scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
  5. peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
  6. 1)通过手动确定转换
  7. scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
  8. res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
  9. 2)通过反射确定(需要用到样例类)
  10. 1)创建一个样例类
  11. scala> case class People(name:String, age:Int)
  12. 2)根据样例类将RDD转换为DataFrame
  13. scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
  14. res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
  15. 3)通过编程的方式(了解)
  16. 1)导入所需的类型
  17. scala> import org.apache.spark.sql.types._
  18. import org.apache.spark.sql.types._
  19. 2)创建Schema
  20. scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
  21. structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
  22. 3)导入所需的类型
  23. scala> import org.apache.spark.sql.Row
  24. import org.apache.spark.sql.Row
  25. 4)根据给定的类型创建二元组RDD
  26. scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
  27. data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
  28. 5)根据数据及给定的schema创建DataFrame
  29. scala> val dataFrame = spark.createDataFrame(data, structType)
  30. dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.2.5 DateFrame转换为RDD

  1. 直接调用rdd即可
  2. 1)创建一个DataFrame
  3. scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
  4. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  5. 2)将DataFrame转换为RDD
  6. scala> val dfToRDD = df.rdd
  7. dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
  8. 3)打印RDD
  9. scala> dfToRDD.collect
  10. res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])

2.3 DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建

  1. 1)创建一个样例类
  2. scala> case class Person(name: String, age: Long)
  3. defined class Person
  4. 2)创建DataSet
  5. scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
  6. caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2.3.2 RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。

  1. 1)创建一个RDD
  2. scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
  3. peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
  4. 2)创建一个样例类
  5. scala> case class Person(name: String, age: Long)
  6. defined class Person
  7. 3)将RDD转化为DataSet
  8. scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
  9. res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2.3.3 DataSet转换为RDD

  1. 调用rdd方法即可。
  2. 1)创建一个DataSet
  3. scala> val DS = Seq(Person("Andy", 32)).toDS()
  4. DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  5. 2)将DataSet转换为RDD
  6. scala> DS.rdd
  7. res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28

2.4 DataFrame与DataSet的互操作

2.4.1 DataFrame转Dataset

  1. 1)创建一个DateFrame
  2. scala> val df = spark.read.json("examples/src/main/resources/people.json")
  3. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  4. 2)创建一个样例类
  5. scala> case class Person(name: String, age: Long)
  6. defined class Person
  7. 3)将DateFrame转化为DataSet
  8. scala> df.as[Person]
  9. res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

2.4.2 Dataset转DataFrame

  1. 1)创建一个样例类
  2. scala> case class Person(name: String, age: Long)
  3. defined class Person
  4. 2)创建DataSet
  5. scala> val ds = Seq(Person("Andy", 32)).toDS()
  6. ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  7. 3)将DataSet转化为DataFrame
  8. scala> val df = ds.toDF
  9. df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
  10. 4)展示
  11. scala> df.show
  12. +----+---+
  13. |name|age|
  14. +----+---+
  15. |Andy| 32|
  16. +----+---+

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

2.5 RDD、DataFrame、DataSet

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口

2.5.1 三者的共性

(1)RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;

(3)三者有许多共同的函数,如filter,排序等;

(4)在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)

2.5.2 三者的互相转化 

 

2.6 IDEA创建SparkSQL程序

  1. 1)添加依赖
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-sql_2.11</artifactId>
  5. <version>2.1.1</version>
  6. </dependency>
  7. 2)代码实现
  8. package com.atguigu.test
  9. import org.apache.spark.sql.SparkSession
  10. import org.apache.spark.{SparkConf, SparkContext}
  11. object HelloWorld {
  12. def main(args: Array[String]) {
  13. //创建SparkConf()并设置App名称
  14. val spark = SparkSession
  15. .builder()
  16. .master("local[*]")
  17. .appName("HelloWorld")
  18. //.config("spark.some.config.option", "some-value")
  19. .getOrCreate()
  20. //导入隐式转换
  21. import spark.implicits._
  22. //读取本地文件,创建DataFrame
  23. val df = spark.read.json("examples/src/main/resources/people.json")
  24. //打印
  25. df.show()
  26. //DSL风格:查询年龄在21岁以上的
  27. df.filter($"age" > 21).show()
  28. //创建临时表
  29. df.createOrReplaceTempView("persons")
  30. //SQL风格:查询年龄在21岁以上的
  31. spark.sql("SELECT * FROM persons where age > 21").show()
  32. //关闭连接
  33. spark.stop()
  34. }
  35. }

2.7 用户自定义函数

在Shell窗口中可以通过spark.udf功能用户可以自定义函数。

2.7.1 用户自定义UDF函数

  1. 1)创建DataFrame
  2. scala> val df = spark.read.json("examples/src/main/resources/people.json")
  3. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  4. 2)打印数据
  5. scala> df.show()
  6. +----+-------+
  7. | age| name|
  8. +----+-------+
  9. |null|Michael|
  10. | 30| Andy|
  11. | 19| Justin|
  12. +----+-------+
  13. 3)注册UDF,功能为在数据前添加字符串
  14. scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
  15. res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
  16. 4)创建临时表
  17. scala> df.createOrReplaceTempView("people")
  18. 5)应用UDF
  19. scala> spark.sql("Select addName(name), age from people").show()
  20. +-----------------+----+
  21. |UDF:addName(name)| age|
  22. +-----------------+----+
  23. | Name:Michael|null|
  24. | Name:Andy| 30|
  25. | Name:Justin| 19|
  26. +-----------------+----+

3章 Spark SQL数据的加载与保存

3.1 通用加载/保存方法

3.1.1 加载数据

  1. 1read直接加载数据
  2. scala> spark.read.
  3. csv jdbc json orc parquet textFile… …
  4. 注意:加载数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
  5. 2format指定加载数据类型
  6. scala> spark.read.format("…")[.option("…")].load("…")

用法详解:

(1)format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

(2)load("…"):在"csv""orc""parquet""textFile"格式下需要传入加载数据的路径。

(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

3.1.2 保存数据

  1. 1write直接保存数据
  2. scala> spark.write.
  3. csv jdbc json orc parquet textFile… …
  4. 注意:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
  5. 2format指定保存数据类型
  6. scala> spark.write.format("…")[.option("…")].save("…")

2)format指定保存数据类型

scala> spark.write.format("…")[.option("…")].save("…")

用法详解:

(1)format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

(2)save ("…"):在"csv""orc""parquet""textFile"格式下需要传入保存数据的路径。

(3)option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

3)文件保存选项

可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。SaveMode是一个枚举类,其中的常量包括:

(1)Append:当保存路径或者表已存在时,追加内容;

(2)Overwrite: 当保存路径或者表已存在时,覆写内容;

(3)ErrorIfExists:当保存路径或者表已存在时,报错;

(4)Ignore:当保存路径或者表已存在时,忽略当前的保存操作。

使用详解:

df.write.mode(SaveMode.Append).save("… …")

3.2 JSON文件

Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载一个 一个JSON 文件。

注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式如下:

  1. {"name":"Michael"}
  2. {"name":"Andy", "age":30}
  3. {"name":"Justin", "age":19}
  1. 1)导入隐式转换
  2. import spark.implicits._
  3. 2)加载JSON文件
  4. val path = "examples/src/main/resources/people.json"
  5. val peopleDF = spark.read.json(path)
  6. 3)创建临时表
  7. peopleDF.createOrReplaceTempView("people")
  8. 4)数据查询
  9. val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
  10. teenagerNamesDF.show()
  11. +------+
  12. | name|
  13. +------+
  14. |Justin|

3.3 MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

可在启动shell时指定相关的数据库驱动路径,或者将相关的数据库驱动放到spark的类路径下。

两种方式:1)使用read.jdbc加载数据 2)使用format形式加载数据

  1. 1)启动spark-shell
  2. bin/spark-shell --master spark://hadoop102:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
  3. 2)定义JDBC相关参数配置信息
  4. val prop = new java.util.Properties()
  5. prop.put("user", "root")
  6. prop.put("password", "123456")
  7. 3)使用read.jdbc加载数据
  8. val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd", "emp", prop)
  9. 4)使用format形式加载数据
  10. val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " emp").option("user", "root").option("password", "123456").load()
  11. 5)使用write.jdbc保存数据
  12. jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", prop)
  13. 6)使用format形式保存数据
  14. jdbcDF.write
  15. .format("jdbc")
  16. .option("url", "jdbc:mysql://hadoop102:3306/rdd")
  17. .option("dbtable", "rddtable2")
  18. .option("user", "root")
  19. .option("password", "123456")
  20. .save()

3.4 Hive

Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HQL)等。spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)

3.4.1 内嵌Hive应用(不是很懂)

如果要使用内嵌的Hive,什么都不用做,直接用就可以了。

可以修改其数据仓库地址,参数为:--conf spark.sql.warehouse.dir=./wear

注意如果你使用的是内部的Hive在Spark2.0之后spark.sql.warehouse.dir用于指定数据仓库的地址如果你需要是用HDFS作为路径那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录否则只会创建master节点上的warehouse目录查询时会出现文件找不到的问题这是需要使用HDFS则需要将metastore删除重启集群

3.4.2 外部Hive应用

如果想连接外部已经部署好的Hive,需要通过以下几个步骤。

1)将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下。

2)打开spark shell,注意带上访问Hive元数据库的JDBC客户端

bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

注意:启动时指定JDBC jar包路径很麻烦,我们可以选择将JDBC的驱动包放置在spark的lib目录下,一劳永逸。

3.4.3 运行Spark SQL CLI

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口。

/bin/spark-sql

3.4.4 代码中操作Hive

  1. 1)添加依赖
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-hive_2.11</artifactId>
  5. <version>2.1.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.hive</groupId>
  9. <artifactId>hive-exec</artifactId>
  10. <version>1.2.1</version>
  11. </dependency>
  12. 2)代码实现
  13. //创建SparkSession
  14. val spark: SparkSession = SparkSession
  15. .builder()
  16. .enableHiveSupport()
  17. .master("local[*]")
  18. .appName("SQLTest")
  19. .getOrCreate()
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号