赞
踩
Spark SQL 是 Apache Spark 的一个模块,专门用于处理结构化数据。它集成了 SQL 查询和 Spark 编程的强大功能,使得处理大数据变得更加高效和简便。通过 Spark SQL,用户可以直接在 Spark 中使用 SQL 查询,或者使用 DataFrame 和 DataSet API 进行数据操作。
Spark SQL 的架构主要由以下几个组件组成:
查询解析(Query Parsing):将 SQL 查询解析成抽象语法树(AST)。
逻辑计划生成(Logical Plan Generation):将 AST 转换为未优化的逻辑计划。
逻辑计划优化(Logical Plan Optimization):使用 Catalyst 优化器对逻辑计划进行一系列规则优化。
物理计划生成(Physical Plan Generation):将优化后的逻辑计划转换为一个或多个物理计划,并选择最优的物理计划。
执行(Execution):将物理计划转换为 RDD,并在集群上并行执行。
SparkContext:SparkContext 是 Spark 应用程序的主入口点,负责连接到 Spark 集群,管理资源和任务调度。在 Spark 2.0 之后,推荐使用 SparkSession 取代 SparkContext。
SQLContext:SQLContext 是 Spark SQL 的编程入口点,允许用户通过 SQL 查询或 DataFrame API 进行数据处理。它提供了基本的 Spark SQL 功能。
HiveContext:HiveContext 是 SQLContext 的子集,增加了对 Hive 的集成支持,可以直接访问 Hive 中的数据和元数据,使用 Hive 的 UDF 和 UDAF。
SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了统一的编程接口。SparkSession 是 Spark SQL 的建议入口点,支持使用 DataFrame 和 Dataset API 进行数据处理。
创建 SparkContext 和 SparkSession 的注意事项:如果同时需要创建 SparkContext 和 SparkSession,必须先创建 SparkContext,再创建 SparkSession。如果先创建 SparkSession,再创建 SparkContext,会导致异常,因为在同一个 JVM 中只能运行一个 SparkContext。
<properties>
<spark.version>3.1.2</spark.version>
<spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
在 Spark SQL 中,数据集主要分为以下几种类型:DataFrame 和 Dataset。它们是处理和操作结构化和半结构化数据的核心抽象。
Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:
struct
、map
和 array
。Dataset 是在 Spark 2.0 中引入的新的抽象数据结构,它是强类型的,可以存储 JVM 对象。Dataset API 结合了 DataFrame 的操作简便性和类型安全性,适用于需要更高级别数据类型控制和面向对象编程风格的场景。具体特点如下:
DataFrame = Dataset[Row]
。import org.apache.spark.sql.SparkSession object SparkSqlContext { def main(args: Array[String]): Unit = { // 创建 SparkConf 对象,设置应用程序的配置 val conf: SparkConf = new SparkConf() .setMaster("local[4]") // 设置本地运行模式,使用 4 个线程 .setAppName("spark sql") // 设置应用程序名称为 "spark sql" // 创建 SparkSession 对象,用于 Spark SQL 的编程入口 val spark: SparkSession = SparkSession.builder() .config(conf) // 将 SparkConf 配置应用于 SparkSession .getOrCreate() // 获取现有的 SparkSession,或者新建一个 // 获取 SparkContext 对象,可以直接从 SparkSession 中获取 val sc: SparkContext = spark.sparkContext // 导入 SparkSession 的隐式转换,可以使用 DataFrame API 的方法 import spark.implicits._ // 在这里可以编写数据处理代码,例如创建 DataFrame 和 Dataset,进行数据操作等... // 停止 SparkSession,释放资源 spark.stop() } }
1、从集合创建
case class Person(name: String, age: Int) // 下同
val data1 = Seq(Person("Alice", 25), Person("Bob", 30))
val ds: Dataset[Person] = spark.createDataset(data) // 这里的spark是SparkSession对象(如上代码),下同
val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
1、从文件系统读取
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]
val dfCsv: DataFrame = spark.read
// 使用.schema方法指定CSV文件的模式(schema)其定义了DataFrame的列名和类型。
// 这是一个可选步骤,但如果CSV文件没有头部行,或者你想覆盖文件中的头部行,则必须指定。
.schema(schema)
// 这里设置"header"为"true",表示CSV文件的第一行是列名,不需要Spark从文件中自动推断。
.option("header", "true")
.csv("/path/to/csv/file")
3、从关系型数据库读取
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")
val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]
val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
4、从非结构化数据源读取
val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]
val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
5、手动创建 Dataset
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]
val dfManual: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(data), schema
)
语法示例一
模拟数据(1000条):
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子异,男,48,广州市
3,孟秀英,女,46,上海市
4,金嘉伦,男,8,北京市
...
需求:哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。
// 导入SparkSession的隐式转换,这样可以使用DataFrame的便捷方法(例如下面的'$'符号) import spark.implicits._ // 定义了一个DataFrame的schema,但在这个例子中,使用了CSV的header来自动推断schema val schema = StructType(Seq( StructField("id", LongType), StructField("name", StringType), StructField("gender", StringType), StructField("age", IntegerType), StructField("city", StringType), )) // 定义WindowSpec,用于后续的窗口函数操作,按gender分区,按avg_age降序排序,(复用使用此) val WindowSpec: WindowSpec = Window .partitionBy($"gender") .orderBy($"avg_age".desc) // 从CSV文件中读取数据,使用header作为列名,然后选择特定的列,进行分组和聚合操作 // 哪些城市和性别组合在人口较多(ID数量>50)的情况下具有最高的平均年龄,以及这些组合在各自性别中的排名。 spark.read // .schema(schema) // 应用我们定义的schema .option("header", "true") // 使用CSV的header作为列名 .csv("D:\\projects\\sparkSql\\people.csv") // DataFrame .select($"id", $"name", $"age", $"city", $"gender") // 选择需要的列(不写默认就是全选) .groupBy($"city", $"gender") // 按城市和性别分组 .agg( // 多重聚合 count($"id").as("count"), // 计算每个组的ID数量 round(avg($"age"), 2).as("avg_age") // 计算每个组的平均年龄,并保留两位小数 ) .where($"count".gt(50)) // 过滤出ID数量大于(可以使用>)50的组 .orderBy($"avg_age".desc) // 按平均年龄降序排序 .select($"city", $"gender", $"avg_age", dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank")) .show() // 显示结果
结果:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 东莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|广州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
语法示例二:视图,sql
// 读取CSV文件到DataFrame,使用header作为列名 val dfPeople: DataFrame = spark.read .option("header", "true") // 使用CSV的header作为列名 .csv("D:\\projects\\sparkSql\\people.csv") // 将DataFrame注册为临时视图 dfPeople.createOrReplaceTempView("people_view") // 可以使用Spark SQL来查询这个视图了 // 例如,查询所有人的姓名和年龄 spark.sql("SELECT name, age FROM people_view").show() // 二 spark.sql( """ |select * from people_view |where gender = '男' |""".stripMargin ).show()
语法示例三:join
case class Student(name: String, classId: Int) case class Class(classId: Int, className: String) val frmStu = spark.createDataFrame( Seq( Student("张三", 1), Student("李四", 1), Student("王五", 2), Student("赵六", 2), Student("李明", 2), Student("王刚", 4), Student("王朋", 5), ) ) val frmClass = spark.createDataFrame( Seq( Class(1, "name1"), Class(2, "name2"), Class(3, "name3"), Class(4, "name4") ) )
left
左连接,rignt
右连接, full
全外连接,anti
左差集,semi
左交集
// 别名 + inner 内连接 frmStu.as("S") .join(frmClass.as("C"), $"S.classId" === $"C.classId") // joinType 默认 inner内连接 .show() // 使用左外连接将df和frmClass根据classId合并 frmStu .join(frmClass, Seq("classId"), "left") .show() // 左差集 frmStu .join(frmClass, Seq("classId"), "anti") .show() // 左交集 frmStu .join(frmClass, Seq("classId"), "semi") .show()
结果
别名 + inner 内连接 +----+-------+-------+---------+ |name|classId|classId|className| +----+-------+-------+---------+ |张三| 1| 1| name1| |李四| 1| 1| name1| |王五| 2| 2| name2| |赵六| 2| 2| name2| |李明| 2| 2| name2| |王刚| 4| 4| name4| +----+-------+-------+---------+ 使用左外连接将df和frmClass根据classId合并 +-------+----+---------+ |classId|name|className| +-------+----+---------+ | 1|张三| name1| | 1|李四| name1| | 2|王五| name2| | 2|赵六| name2| | 2|李明| name2| | 4|王刚| name4| | 5|王朋| null| +-------+----+---------+ 左差集 +-------+----+ |classId|name| +-------+----+ | 5|王朋| +-------+----+ 左交集 +-------+----+ |classId|name| +-------+----+ | 1|张三| | 1|李四| | 2|王五| | 2|赵六| | 2|李明| | 4|王刚| +-------+----+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。