当前位置:   article > 正文

分布式计算高手之路8-Spark SQL结构化数据文件处理_sql实现的数据处理用spark sql来实现

sql实现的数据处理用spark sql来实现

基础知识

SQL:结构化查询语言,增删改查

1. SparkSQL简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。

功能

  • Spark SQL可从各种结构化数据源中读取数据,进行数据分析
  • Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询

jdbc:java连接数据库比如mysql的一组组件(jar包)

odbc:微软定义的连接数据库的一组组件

  • Spark SQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

2. SparkSql的架构

DataFrame

DataFrame的创建

1. 准备好数据文件

 

启动hadoop

上传hdfs

2. 启动spark-shell

3. 通过读取文件创建DataFrame

4. RDD转换DataFrame

 

DataFram的常用操作

 

1. DSL风格操作

show:查看DF的内容信息

printSchema:查看DF的结构信息

select:选取部分列的信息

直接选择一列

对列进行重命名

filer:实现条件过滤

groupBy:按指定字段对记录进行分组

 

sort:按指定的字段进行排序

 

样例类:给每一列重命名

case class

 

2. SQL风格操作

把DF注册为一个临时表(视图View)

说明:DF.createOrReplaceTempView("视图名")

 

通过sql语句进行操作

Dataset的基础知识

1. RDD->DataFrame->Dataset

Dataset提供了特定域对象中的强类型集合,也就是在RDD的每行数据中添加了类型约束条件,只有约束条件的数据类型才能正常运行。Dataset结合了RDD和DataFrame的优点,并且可以调用封装的方法以并行方式进行转换等操作。

三者的区别

2.理解:

  • DataFrame = RDD + Schema
  • Dataset = DataFrame + 泛型
  • Dataset = RDD + Schema + 方便的SQL操作+优化
  • Dataset是特殊的DataFrame,DataFrame是特殊的RDD
  • Dataset是一个分布式的表

3.Dataset的创建

可以读取数据直接创建

通过RDD和DataFrame转换

RDD、DataFrame和Dataset转换

1.RDD转换为DataFrame和Dataset

RDD转换为DataFrame时,定义Schema

RDD转换为Dataset时,不仅需要Schema信息,还需要RDD数据类型为CaseClass类型

2.DataFrame或Dataset转换为RDD

由于Dataset或者DataFrame的底层就是RDD,所以直接调用RDD的函数可转换

dataframe.rdd或者dataset.rdd

3.DataFrame和Dataset之间的转换

由于Dataset是DataFrame特例,所以Dataset直接调用toDF函数转换为DataFrame

当把DataFrame转换为Dataset时,使用函数as[Type],指定CaseClass类型

RDD转换为DataFrame

编程实现

1.反射机制推断Schema

启动Idea,新建maven项目,添加SparkSQl及相关的依赖并导入

  1. <dependencies>
  2. <!--hadoop-->
  3. <dependency>
  4. <groupId>org.apache.hadoop</groupId>
  5. <artifactId>hadoop-client</artifactId>
  6. <version>3.2.2</version>
  7. </dependency>
  8. <!--spark sql-->
  9. <dependency>
  10. <groupId>org.apache.spark</groupId>
  11. <artifactId>spark-sql_2.13</artifactId>
  12. <version>3.2.4</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.spark</groupId>
  16. <artifactId>spark-core_2.13</artifactId>
  17. <version>3.2.4</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.commons</groupId>
  21. <artifactId>commons-lang3</artifactId>
  22. <version>3.12.0</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>com.fasterxml.jackson.core</groupId>
  26. <artifactId>jackson-databind</artifactId>
  27. <version>2.12.6</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.fasterxml.jackson.core</groupId>
  31. <artifactId>jackson-core</artifactId>
  32. <version>2.12.6</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.fasterxml.jackson.core</groupId>
  36. <artifactId>jackson-annotations</artifactId>
  37. <version>2.12.6</version>
  38. </dependency>
  39. </dependencies>

添加scala支持

添加包并添加scala类,并输入代码

 

  1. package cn.edu.hgu
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame,SparkSession}
  5. /**
  6. * description:通过反射机制推断Schema把RDD转换为DataFrame,样例类参数名称会被反射机制利用作为列名
  7. * author:不知名小元
  8. * data:2023-10-28
  9. */
  10. //定义样例类
  11. case class Person(name:String,age:Int)
  12. //主类
  13. object CaseClassSchema{
  14. def main(args: Array[String]):Unit = {
  15. //1.使用构造者模式构建SparkSession
  16. val spark:SparkSession = SparkSession
  17. .builder()
  18. .appName("CaseClassSchema")
  19. .master("local[*]")
  20. .getOrCreate()//单例模式
  21. //2.获取SparkContext
  22. val sc: SparkContext = spark.sparkContext
  23. //3.获取文件
  24. val data:RDD[Array[String]] = sc
  25. .textFile("d:/mydata/person.txt")
  26. .map(_.split(" "))
  27. //4.将RDD和样例类关联
  28. val personRDD: RDD[Person] = data.map(x => Person(x(0),x(1).toInt))
  29. //5.获取DataFrame
  30. //手动导入隐式转换
  31. import spark.implicits._
  32. val personDF:DataFrame = personRDD.toDF
  33. //6.DSL风格操作DF
  34. //6.1打印Schema
  35. personDF.printSchema()
  36. //6.2显示全部数据
  37. personDF.show()
  38. //6.3过滤和统计操作
  39. println(personDF.filter($"age">25).count())
  40. //7.SQL风格操作
  41. //7.1将DF注册为一个临时视图
  42. personDF.createOrReplaceTempView("v_person")
  43. spark.sql("select * from v_person where age >25").show()
  44. //7.关闭资源 ,谁先创建谁后关闭
  45. sc.stop()
  46. spark.stop()
  47. }
  48. }

 

运行,查看结果

 

2.编程方式定义Schema

(1)Row对象 

DataFrame中的每条数据封装在Row中,Row表示每行数据

  1. #通过row对象获取数据
  2. scala> val personDF = spark.read.text("hdfs://master:8020/mydata/person.txt")
  3. val personDF: org.apache.spark.sql.DataFrame = [value: string]
  4. scala> val r1 = personDF.first()
  5. val r1: org.apache.spark.sql.Row = [zhangsan 20]
  6. scala> val v1 = r1(0)
  7. val v1: Any = zhangsan 20
  8. scala> case class Person(name:String,age:Int)
  9. class Person
  10. scala> val personRDD = sc.textFile("hdfs://master:8020/mydata/person.txt")
  11. val personRDD: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/mydata/person.txt MapPartitionsRDD[4] at textFile at <console>:1
  12. scala> val lineRDD = personRDD.map(_.split(" "))
  13. val lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:1
  14. scala> val personSQLRDD = lineRDD.map(x => Person(x(0),x(1).toInt))
  15. val personSQLRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[6] at map at <console>:1
  16. scala> val personSQLDF = personSQLRDD.toDF()
  17. val personSQLDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
  18. scala> personSQLDF.printSchema()
  19. root
  20. |-- name: string (nullable = true)
  21. |-- age: integer (nullable = false)
  22. scala> personDF.printSchema()
  23. root
  24. |-- value: string (nullable = true)
  25. scala> val r2 = personSQLDF.first()
  26. val r2: org.apache.spark.sql.Row = [zhangsan,20]
  27. scala> val name = r2(0)
  28. val name: Any = zhangsan
  29. scala> val age = r2(1)
  30. val age: Any = 20

row对象的创建

Row(v1,v2,v3,...)

Row.fromSeq(Seq(v1,v2,v3,...))

(2)步骤

  • 创建一个Row对象结构的RDD
  • 根据Row中的数据定义Schema,其类型为StructType
  • 通过SparkSession中的方法createDataFrame将定义的Schema应用到RDD中

(3)添加类,输入代码

  1. package cn.edu.hgu
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  4. import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
  5. /**
  6. * description:通过自定义Schema把RDD转换为DataFrame
  7. * author:不知名小元
  8. * data:2023-10-28
  9. */
  10. object SparkRDDSchema {
  11. def main(args: Array[String]): Unit = {
  12. //1.创建SparkSession实例对象
  13. val spark: SparkSession = SparkSession
  14. .builder()//使用建造者模式构建对象
  15. .appName(this.getClass.getSimpleName.stripSuffix("$"))
  16. .master("local[*]")
  17. .getOrCreate()
  18. //2.导入隐含的转换
  19. import spark.implicits._
  20. //3.加载数据
  21. val dataRDD :RDD[String] = spark.sparkContext
  22. .textFile("d:/mydata/person.txt")
  23. println(dataRDD.count())
  24. //4.切分每一行
  25. val dataArrayRDD: RDD[Array[String]] =dataRDD.map(_.split(" "))
  26. //5.加载数据到Row对象中
  27. val personRDD:RDD[Row] =dataArrayRDD.map(x => Row(x(0).toString,x(1).toInt))
  28. //6.创建Schema
  29. val schema: StructType = StructType(Seq(
  30. StructField("name",StringType,false),
  31. StructField("age",IntegerType,false)
  32. ))
  33. //7.利用personRDD与Schema创建DataFrame
  34. val personDF:DataFrame =spark.createDataFrame(personRDD,schema)
  35. //8.操作DF
  36. //8.1DSL风格
  37. personDF.show()
  38. //8.2SQL风格
  39. //8.2.1将DF注册为临时视图
  40. personDF.createOrReplaceTempView("v_person")
  41. //8.2.2sql语句操作
  42. spark.sql("select * from v_person where age > 25").show()
  43. //关闭资源
  44. spark.stop()
  45. }
  46. }

 

(4)运行查看结果

作业:分别利用基于DSL编程和SQL编程实现单词计数

  1. package cn.edu.hgu
  2. import org.apache.spark.sql.{Dataset, SparkSession}
  3. /**
  4. * description:分别利用基于DSL编程和SQL编程实现单词计数
  5. * author:不知名小元
  6. * date:2023/10/29 13:44
  7. */
  8. object WordCountSparkSQLAndDSL {
  9. def main(args: Array[String]): Unit = {
  10. //1.创建SparkSession实例对象
  11. val spark: SparkSession = SparkSession
  12. .builder() //使用建造者模式构建对象
  13. .appName(this.getClass.getSimpleName.stripSuffix("$"))
  14. .master("local[*]")
  15. .getOrCreate()
  16. //2.导入隐含的转换
  17. import spark.implicits._
  18. //3.加载数据
  19. val fileDS: Dataset[String] = spark.read.textFile("d:/mydata/a.txt")
  20. //4.切分
  21. val wordDS:Dataset[String] = fileDS.flatMap(_.split("\\s+"))// \s+:正则表达式"\\s+" - 匹配任意空白字符
  22. //5.使用DSL风格完成单词统计计数
  23. wordDS.printSchema()
  24. wordDS.show()
  25. wordDS.groupBy("value").count().orderBy($"count".desc).show()
  26. //6.使用SQL风格完成单词统计计数
  27. //注册为临时视图
  28. wordDS.createOrReplaceTempView("v_wordcount")
  29. //写sql语句
  30. val sql: String =
  31. """
  32. |select value,count(value) as count_value
  33. |from v_wordcount
  34. |group by value
  35. |order by count_value desc
  36. |""".stripMargin
  37. spark.sql(sql).show
  38. //SQL的方式2 createOrReplaceGlobalTempView (了解)
  39. wordDS.createOrReplaceGlobalTempView("global_table_view")
  40. spark.sql("select * from global_temp.global_table_view").show()
  41. //全局session可访问
  42. spark.newSession().sql("select * from global_temp.global_table_view").show()
  43. spark.newSession().sql("select value,count(value) as count_value from global_temp.global_table_view group by value order by count_value desc").show()
  44. //关闭spqrkSession
  45. spark.stop()
  46. }
  47. }


作业代码参考原文链接:https://blog.csdn.net/m0_49834705/article/details/112801436 

运行结果: 

外部数据源

在SparkSQL中,提供一套完整的Api,用于方便的读写外部数据源,比如csv、jdbc、json、orc、parquet、text等,在2.4版本中还支持iamge Source(图像数据源)和Avro Source

1.数据源与格式

数据分为结构化数据、非结构化数据和半结构化数据,sparkSql对它们提供了加载、保存等api

2.加载、保存数据

加载json格式数据

上传hadoop集群

启动spark-shell

  1. #2. 加载、保存数据
  2. scala> val peopleDF = spark.read.format("json").load("hdfs://master:8020/mydata/people.json")
  3. val peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  4. scala> peopleDF.count()
  5. val res0: Long = 3
  6. scala> peopleDF.show()
  7. +----+-------+
  8. | age| name|
  9. +----+-------+
  10. |null|Michael|
  11. | 30| Andy|
  12. | 19| Justin|
  13. +----+-------+
  14. scala> peopleDF.select("name","age").show()
  15. +-------+----+
  16. | name| age|
  17. +-------+----+
  18. |Michael|null|
  19. | Andy| 30|
  20. | Justin| 19|
  21. +-------+----+
  22. scala> val resultDF = peopleDF.select("name","age")
  23. val resultDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
  24. scala> resultDF.show
  25. warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
  26. +-------+----+
  27. | name| age|
  28. +-------+----+
  29. |Michael|null|
  30. | Andy| 30|
  31. | Justin| 19|
  32. +-------+----+
  33. scala> resultDF.write.format("parquet").save("hdfs://master:8020/mydata/people.parquet")
  34. scala> val peopleParqueDF = spark.read.format("parquet").load("hdfs://master:8020/mydata/people.parquet")
  35. |
  36. val peopleParqueDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
  37. scala> peopleParqueDF.show()
  38. +-------+----+
  39. | name| age|
  40. +-------+----+
  41. |Michael|null|
  42. | Andy| 30|
  43. | Justin| 19|
  44. +-------+----+
  45. scala> val peopleParqueDF1 = spark.read.parquet("hdfs://master:8020/mydata/people.parquet")
  46. val peopleParqueDF1: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
  47. scala> peopleParqueDF1.show()
  48. +-------+----+
  49. | name| age|
  50. +-------+----+
  51. |Michael|null|
  52. | Andy| 30|
  53. | Justin| 19|
  54. +-------+----+

3.CSV格式的数据:

  • 可以用Excel或者记事本打开
  • 首行是结构Schema
  • 数据用分隔符分隔
  1. scala> val peopleCSVDF = spark.read.csv("hdfs://master:8020/mydata/people.csv")
  2. val peopleCSVDF: org.apache.spark.sql.DataFrame = [_c0: string]
  3. scala> peopleCSVDF.show()
  4. +------------------+
  5. | _c0|
  6. +------------------+
  7. | name;age;job|
  8. |Jorge;30;Developer|
  9. | Bob;32;Developer|
  10. +------------------+
  11. scala> val peopleCSVDF1 = spark.read.option("sep",";").option("header","true").csv("hdfs://master:8020/mydata/people.csv")
  12. val peopleCSVDF1: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]
  13. scala> peopleCSVDF1.show()
  14. +-----+---+---------+
  15. | name|age| job|
  16. +-----+---+---------+
  17. |Jorge| 30|Developer|
  18. | Bob| 32|Developer|
  19. +-----+---+---------+
  20. scala> peopleCSVDF1.select("name","age").show()
  21. +-----+---+
  22. | name|age|
  23. +-----+---+
  24. |Jorge| 30|
  25. | Bob| 32|
  26. +-----+---+

4.rdbms数据

关系数据库管理系统:mysql、sqlserver、oracle、db2,以mysql为例

启动mysql

远程连接mysql,并创建数据库,添加表,录入数据

在项目中添加mysql数据库的java驱动的依赖

添加类,输入代码:

  1. package cn.edu.hgu
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. import java.util.Properties
  4. /**
  5. * description:使用spark从mysql中读取数据
  6. * author:不知名小元
  7. * date:2023/10/29 20:22
  8. */
  9. object SparkDatafromMysql {
  10. def main(args: Array[String]): Unit = {
  11. //1.创建sparksession实例对象
  12. val spark:SparkSession = SparkSession.builder()
  13. .master("local[*]")
  14. .appName("SparkDatafromMysql")
  15. .getOrCreate()
  16. //2.导入隐式转换
  17. import spark.implicits._
  18. //3.连接数据库,三要素:url,表名,属性(包括驱动、用户名,密码)
  19. val url:String ="jdbc:mysql://localhost:3307/edu"
  20. val tableName: String = "students"
  21. val properties :Properties =new Properties()
  22. properties.setProperty("dirver","com.mysql.jdbc.Driver")
  23. properties.setProperty("user","root")
  24. properties.setProperty("password","root")
  25. //4.读取mysql数据
  26. val studentDF:DataFrame = spark.read.jdbc(url,"students",properties)
  27. //5.操作DataFrame
  28. //5.1 DSL风格
  29. studentDF.printSchema()
  30. studentDF.show()
  31. //5.2 SQL风格
  32. //7.1将DF注册为一个临时视图
  33. studentDF.createOrReplaceTempView("v_student")
  34. spark.sql("select * from v_student where age >19").show()
  35. //6. 关闭资源
  36. spark.stop()
  37. }
  38. }

运行,查看结果

5.hive

启动yarn

Start-yarn.sh

java连接mysql的jar包放在hive的lib文件夹下

启动hive服务

使用beeline工具连接hive

  1. <property>
  2. <name>dfs.webhdfs.enabled</name>
  3. <value>true</value>
  4. </property>
  5. #重启Hadoop集群
  6. [root@master hadoop]# scp hdfs-site.xml slave1:$PWD
  7. hdfs-site.xml 100% 1125 347.4KB/s 00:00
  8. [root@master hadoop]# scp hdfs-site.xml slave2:$PWD
  9. hdfs-site.xml 100% 1125 710.1KB/s 00:00
  10. [root@master hadoop]# scp core-site.xml slave1:$PWD
  11. core-site.xml 100% 1472 809.8KB/s 00:00
  12. [root@master hadoop]# scp core-site.xml slave2:$PWD
  13. core-site.xml 100% 1472 803.6KB/s 00:00
  14. sbin/stop-dfs.sh
  15. sbin/stop-yarn.sh
  16. sbin/start-dfs.sh
  17. sbin/stast-yarn.sh
  18. 启动hiveserver2服务
  19. 前台启动
  20. cd /export/servers/apache-hive-2.1.1-bin/
  21. bin/hive --service hiveserver2
  22. 后台启动
  23. nohup bin/hive --service hiveserver2 > /dev/null 2>&1 &
  24. 使用beeline工具连接hiveserver2
  25. bin/beeline
  26. beeline> !connect jdbc:hive2://master:10000

1.修改Hadoop的hdfs-site.xml文件

2.修改Hadoop的core-site.xml文件

 

#重启Hadoop集群

sbin/stop-dfs.sh

sbin/stop-yarn.sh

sbin/start-dfs.sh

sbin/stast-yarn.sh

启动beeline

hive操作

 

启动hive的metastore服务

6.Idea操作Hive

添加hive依赖

添加类,输入代码

运行,查看结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/588906
推荐阅读
相关标签
  

闽ICP备14008679号