当前位置:   article > 正文

Spark SQL 结构化数据文件处理 详解_spark sql——结构化数据文件处理总结

spark sql——结构化数据文件处理总结

Spark SQL简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。

Spark SQL主要提供了以下三个功能:

  1. Spark SQL可从各种结构化数据源中读取数据,进行数据分析。
  2. Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。
  3. Spark SQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

Spark SQL架构

Spark SQL架构与Hive架构相比,把底层的MapReduce执行引擎更改为Spark,还修改了Catalyst优化器,Spark SQL快速的计算效率得益于Catalyst优化器。从HiveQL被解析成语法抽象树起,执行计划生成和优化的工作全部交给Spark SQL的Catalyst优化器进行负责和管理。
在这里插入图片描述
Spark要想很好地支持SQL,需要完成解析(Parser)、优化(Optimizer)、执行(Execution)三大过程。
在这里插入图片描述
Catalyst优化器在执行计划生成和优化的工作时,离不开内部的五大组件。

  • SqlParse:完成SQL语法解析功能,目前只提供了一个简单的SQL解析器。
  • Analyze:主要完成绑定工作,将不同来源的Unresolved LogicalPlan和元数据进行绑定,生成Resolved LogicalPlan。
  • Optimizer:对Resolved Lo;gicalPlan进行优化,生成OptimizedLogicalPlan。
  • Planner:将LogicalPlan转换成PhysicalPlan。
  • CostModel:主要根据过去的性能统计数据,选择最佳的物理执行计划。

Spark SQL工作流程:

  1. 下在解析SQL语句之前,会创建SparkSession,涉及到表名、字段名称和字段类型的元数据都将保存在SessionCatalog中;
  2. 当调用SparkSession的sql()方法时就会使用SparkSqlParser进行解析SQL语句,解析过程中使用的ANTLR进行词法解析和语法解析;
  3. 使用Analyzer分析器绑定逻辑计划,在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划;
  4. 使用Optimizer优化器优化逻辑计划,该优化器同样定义了一套规则(Rules),利用这些规则对逻辑计划和语句进行迭代处理;
  5. 使用SparkPlanner对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan;
  6. 使用QueryExecution执行物理计划,此时则调用SparkPlan的execute()方法,返回RDDs。

DataFrame简介

Spark SQL使用的数据抽象并非是RDD,而是DataFrame。在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。DataFrame使Spark具备处理大规模结构化数据的能力。在Spark中,DataFrame是一种以RDD为基础的分布式数据集。DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。

DataFrame创建

创建DataFrame的两种基本方式:

  • 已存在的RDD调用toDF()方法转换得到DataFrame。
  • 通过Spark读取数据源直接创建DataFrame。

直接创建DataFarme对象

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,如下所示。

方法名描述
spark.read.text(“people.txt”)读取txt格式文件,创建DataFrame
spark.read.csv (“people.csv”)读取csv格式文件,创建DataFrame
spark.read.text(“people.json”)读取json格式文件,创建DataFrame
spark.read.text(“people.parquet”)读取parquet格式文件,创建DataFrame

在本地创建一个txt文本文档,用于读取:运行spark-shell:
使用以下代码进行读取:

val PDF = spark.read.text("f:/person.txt")
  • 1

在这里插入图片描述
把RDD对象转换成DataFarme对象

依旧在spark-shell脚本下运行:

	//读取文本文档,这时第一个RDD的数据类型,按空格分割开来
    val lineRDD = sc.textFile("f:/person.txt").map(_.split(" "))
    //使用一个样式类
    case class Person(id:Int,name:String,age:Int)
    //按照样式类对RDD数据进行分割成map
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //把RDD数据类型转成DataFarme
    val personDF = personRDD.toDF()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

相同的:最后的personDF是一个DataFarme的数据类型
在这里插入图片描述
DataFrame的常用操作

DataFrame提供了两种语法风格,即DSL风格语法和SQL风格语法。二者在功能上并无区别,仅仅是根据用户习惯,自定义选择操作方式。

  • DSL风格:DataFrame提供了一个领域特定语言(DSL)以方便操作结构化数据。
  • SQL风格:在程序中直接使用spark.sql()方式执行SQL查询,结果将作为一个DataFrame返回,使用SQL风格操作的前提是将DataFrame注册成一个临时表。

DSL的风格

DSL风格操作DataFrame的常用方法,具体如下表如示。

方法名称方法说明
show()查看DataFrame中的具体内容信息
printSchema()查看DataFrame的Schema信息
select()查看DataFrame中选取部分列的数据及进行重命名
filter()实现条件查询,过滤出想要的结果
groupBy()对记录进行分组
sort()对特定字段进行排序操作

演示几种方法:

// 查看这个表
personDF.show()  
//查看Schema数据
personDF.printSchema()
//查看列
personDF.select(personDF.col("name")).show
//过滤年龄小于25的
personDF.filter(col("age") >= 25).show
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述
SQL的风格

//注册一个临时表
personDF.registerTempTable("t_person")
//查询信息
sparkSession.sql("select * from t_person").show()
  • 1
  • 2
  • 3
  • 4

DataSet简介

Dataset是从Spark1.6 Alpha版本中引入的一个新的数据抽象结构,最终在Spark2.0版本被定义成Spark新特性。Dataset提供了特定域对象中的强类型集合,也就是在RDD的每行数据中添加了类型约束条件,只有约束条件的数据类型才能正常运行。Dataset结合了RDD和DataFrame的优点,并且可以调用封装的方法以并行方式进行转换等操作。

RDD数据的表现形式,此时RDD数据没有数据类型和元数据信息。
DataFrame数据的表现形式,此时DataFrame数据中添加Schema元数据信息(列名和数据类型,如ID:String),DataFrame每行类型固定为Row类型,每列的值无法直接访问,只有通过解析才能获取各个字段的值。
Dataset数据的表现形式,其中前者是在RDD每行数据的基础之上,添加一个数据类型(value:String)作为Schema元数据信息。而后者每行数据添加People强数据类型,在Dataset[Person]中里存放了3个字段和属性,Dataset每行数据类型可自定义,一旦定义后,就具有错误检查机制。
在这里插入图片描述
通过SparkSession中的createDataset来创建Dataset

val personDS =spark.createDataset(sc.textFile("f:/person.txt"))
personDS.show()
  • 1
  • 2

如下图所示:
在这里插入图片描述
通过“as[ElementType]”方法转换得到Dataset

    val DS=spark.read.text("f:/person.txt").as[String]
    DS.toDF()
  • 1
  • 2

运行结果如下:获取到DataFarme
在这里插入图片描述
RDD转DataFarme

Spark官方提供了两种方法实现从RDD转换得到DataFrame。

  • 第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知数据结构的RDD转换
  • 第二种方法通过编程接口构造一个Schema,并将其应用在已知的RDD数据中。

反射机制推断Schema

Windows系统开发Scala代码,可使用本地环境测试(需要先准备本地数据文件)。我们可以很容易的分析出当前数据文件中字段的信息,但计算机无法直观感受字段的实际含义,因此需要通过反射机制来推断包含特定类型对象的Schema信息,实现将RDD转换成DataFrame。

实现思路: 添加Spark SQL依赖。定义case class样例类、字段和属性,样例类的参数名会被利用反射机制作为列名。通过sc对象读取文件生成一个RDD,将RDD 与样例类匹配,调用toDF()方法将RDD转换为DataFrame。

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2.3.1.0.0-78</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

在Maven projects当中进行查看。会有新添加的sql依赖。
在这里插入图片描述
编程方式定义Schema

当Case类不能提前定义Schema时,就需要采用编程方式定义Schema信息,实现RDD转换DataFrame的功能。
实现思路: 通过SparkSession提供的createDataFrame()方法来拼接Schema,来实现RDD转换成DataFrame
代码实现如下:

package SparkSQL

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}

case class Person(id:Int,name:String,age:Int)

object SparkSQL {

  def main(args: Array[String]): Unit = {
    //第一步:获取sparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[2]").getOrCreate()
    //第二步:获取sparkContext
    val sparkContext: SparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("WARN")
    //读取文件,生成RDD
    val rddFile: RDD[String] = sparkContext.textFile("file:///F:\\person.txt")
    //将每一行的内容按照空格切分,生成一个单词数组
    val wordArray: RDD[Array[String]] = rddFile.map(_.split(" "))
    val personRDD: RDD[Person] = wordArray.map(x => Person(x(0).toInt,x(1),x(2).toInt))

    //导入隐式转换的类,我们的RDD才可以转换成DataFrame
    import sparkSession.implicits._
    //将我们的RDD转换成DataFrame
    val personDF: DataFrame = personRDD.toDF()
	println("sparkSql DSL风格语法 开始")
    personDF.printSchema()

    personDF.show()

    personDF.show(2)

    println(personDF.head())
    //查询name字段的所有值
    personDF.select("name").show()
    personDF.select($"name").show()
    personDF.select(personDF.col("name")).show()
    personDF.select(new Column("name")).show()
    //将年龄的值进行 + 1
    personDF.select($"id",$"name",$"age",$"age"+1).show()
    //按照年龄进行分组
    personDF.groupBy($"age").count().show()
    println("*************sparkSql DSL风格语法 结束*************")
    println()
    println("*************sparkSql sql风格语法 开始*************")
    val personTable: Unit = personDF.registerTempTable("t_person")
    val personView: Unit = personDF.createTempView("t_person_view")
    //查询表当中所有的数据
    println("查询表中所有信息")
    sparkSession.sql("select * from t_person").show()
    println("查询表中所有信息 => t_person_view")
    sparkSession.sql("select * from t_person_view").show()
    println("查询名字为zhangsan的数据")
    sparkSession.sql("select * from t_person_view where name = 'zhangsan'").show()
    println("***************sparkSql sql风格语法 结束*********************")
    sparkContext.stop()
    sparkSession.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

Spark SQL操作MySQL数据库

在使用连接数据库之前定义一个用于Spark测试的简单数据库

CREATE DATABASE spark

USE spark

CREATE TABLE person(
id INT NOT NULL,
NAME CHAR(10) NOT NULL,
age INT NOT NULL
)ENGINE=INNODB DEFAULT CHARSET=utf8mb4;

INSERT INTO person VALUES(1,"zhangsan",28)
INSERT INTO person VALUES(2,"lisi",38)

SELECT * FROM person
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

使用以下语句进行连接数据库

val mysqlDF : DataFrame =spark.read.jdbc ("jdbc:mysql://192.168.121.134:3306/spark","person",properties)
  • 1

完整代码如下:

package SparkSQL
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
object test {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLmysql").master("local[2]").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    sparkContext.setLogLevel("WARN")
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    val mysqlDF  =sparkSession.read.jdbc ("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC","person",properties)

    mysqlDF.printSchema()
    mysqlDF.show()

    sparkContext.stop()
    sparkSession.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

运行项目查看结果:
在这里插入图片描述
需要注意的是在连接数据库的时候要加上jdbc的驱动包,添加到环境当中即可
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/572495
推荐阅读
相关标签
  

闽ICP备14008679号