赞
踩
SQL:结构化查询语言,增删改查
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。
功能:
jdbc:java连接数据库比如mysql的一组组件(jar包)
odbc:微软定义的连接数据库的一组组件
DataFrame
直接选择一列
对列进行重命名
case class
说明:DF.createOrReplaceTempView("视图名")
Dataset提供了特定域对象中的强类型集合,也就是在RDD的每行数据中添加了类型约束条件,只有约束条件的数据类型才能正常运行。Dataset结合了RDD和DataFrame的优点,并且可以调用封装的方法以并行方式进行转换等操作。
三者的区别
RDD转换为DataFrame时,定义Schema
RDD转换为Dataset时,不仅需要Schema信息,还需要RDD数据类型为CaseClass类型
由于Dataset或者DataFrame的底层就是RDD,所以直接调用RDD的函数可转换
dataframe.rdd或者dataset.rdd
由于Dataset是DataFrame特例,所以Dataset直接调用toDF函数转换为DataFrame
当把DataFrame转换为Dataset时,使用函数as[Type],指定CaseClass类型
编程实现
- <dependencies>
- <!--hadoop-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.2.2</version>
- </dependency>
- <!--spark sql-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.13</artifactId>
- <version>3.2.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.13</artifactId>
- <version>3.2.4</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.12.0</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.12.6</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.12.6</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <version>2.12.6</version>
- </dependency>
- </dependencies>
- package cn.edu.hgu
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{DataFrame,SparkSession}
- /**
- * description:通过反射机制推断Schema把RDD转换为DataFrame,样例类参数名称会被反射机制利用作为列名
- * author:不知名小元
- * data:2023-10-28
- */
- //定义样例类
- case class Person(name:String,age:Int)
- //主类
- object CaseClassSchema{
- def main(args: Array[String]):Unit = {
- //1.使用构造者模式构建SparkSession
- val spark:SparkSession = SparkSession
- .builder()
- .appName("CaseClassSchema")
- .master("local[*]")
- .getOrCreate()//单例模式
- //2.获取SparkContext
- val sc: SparkContext = spark.sparkContext
- //3.获取文件
- val data:RDD[Array[String]] = sc
- .textFile("d:/mydata/person.txt")
- .map(_.split(" "))
- //4.将RDD和样例类关联
- val personRDD: RDD[Person] = data.map(x => Person(x(0),x(1).toInt))
- //5.获取DataFrame
- //手动导入隐式转换
- import spark.implicits._
- val personDF:DataFrame = personRDD.toDF
- //6.DSL风格操作DF
- //6.1打印Schema
- personDF.printSchema()
- //6.2显示全部数据
- personDF.show()
- //6.3过滤和统计操作
- println(personDF.filter($"age">25).count())
- //7.SQL风格操作
- //7.1将DF注册为一个临时视图
- personDF.createOrReplaceTempView("v_person")
- spark.sql("select * from v_person where age >25").show()
- //7.关闭资源 ,谁先创建谁后关闭
- sc.stop()
- spark.stop()
- }
- }
DataFrame中的每条数据封装在Row中,Row表示每行数据
- #通过row对象获取数据
- scala> val personDF = spark.read.text("hdfs://master:8020/mydata/person.txt")
- val personDF: org.apache.spark.sql.DataFrame = [value: string]
-
- scala> val r1 = personDF.first()
- val r1: org.apache.spark.sql.Row = [zhangsan 20]
-
- scala> val v1 = r1(0)
- val v1: Any = zhangsan 20
-
- scala> case class Person(name:String,age:Int)
- class Person
-
- scala> val personRDD = sc.textFile("hdfs://master:8020/mydata/person.txt")
- val personRDD: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/mydata/person.txt MapPartitionsRDD[4] at textFile at <console>:1
-
- scala> val lineRDD = personRDD.map(_.split(" "))
- val lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:1
-
- scala> val personSQLRDD = lineRDD.map(x => Person(x(0),x(1).toInt))
- val personSQLRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[6] at map at <console>:1
-
- scala> val personSQLDF = personSQLRDD.toDF()
- val personSQLDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
-
- scala> personSQLDF.printSchema()
- root
- |-- name: string (nullable = true)
- |-- age: integer (nullable = false)
-
-
- scala> personDF.printSchema()
- root
- |-- value: string (nullable = true)
-
-
- scala> val r2 = personSQLDF.first()
- val r2: org.apache.spark.sql.Row = [zhangsan,20]
-
- scala> val name = r2(0)
- val name: Any = zhangsan
-
- scala> val age = r2(1)
- val age: Any = 20
Row(v1,v2,v3,...)
Row.fromSeq(Seq(v1,v2,v3,...))
- package cn.edu.hgu
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
-
- /**
- * description:通过自定义Schema把RDD转换为DataFrame
- * author:不知名小元
- * data:2023-10-28
- */
- object SparkRDDSchema {
- def main(args: Array[String]): Unit = {
- //1.创建SparkSession实例对象
- val spark: SparkSession = SparkSession
- .builder()//使用建造者模式构建对象
- .appName(this.getClass.getSimpleName.stripSuffix("$"))
- .master("local[*]")
- .getOrCreate()
- //2.导入隐含的转换
- import spark.implicits._
- //3.加载数据
- val dataRDD :RDD[String] = spark.sparkContext
- .textFile("d:/mydata/person.txt")
- println(dataRDD.count())
- //4.切分每一行
- val dataArrayRDD: RDD[Array[String]] =dataRDD.map(_.split(" "))
- //5.加载数据到Row对象中
- val personRDD:RDD[Row] =dataArrayRDD.map(x => Row(x(0).toString,x(1).toInt))
- //6.创建Schema
- val schema: StructType = StructType(Seq(
- StructField("name",StringType,false),
- StructField("age",IntegerType,false)
- ))
- //7.利用personRDD与Schema创建DataFrame
- val personDF:DataFrame =spark.createDataFrame(personRDD,schema)
- //8.操作DF
- //8.1DSL风格
- personDF.show()
- //8.2SQL风格
- //8.2.1将DF注册为临时视图
- personDF.createOrReplaceTempView("v_person")
- //8.2.2sql语句操作
- spark.sql("select * from v_person where age > 25").show()
- //关闭资源
- spark.stop()
-
- }
- }
作业:分别利用基于DSL编程和SQL编程实现单词计数
- package cn.edu.hgu
-
- import org.apache.spark.sql.{Dataset, SparkSession}
- /**
- * description:分别利用基于DSL编程和SQL编程实现单词计数
- * author:不知名小元
- * date:2023/10/29 13:44
- */
- object WordCountSparkSQLAndDSL {
- def main(args: Array[String]): Unit = {
- //1.创建SparkSession实例对象
- val spark: SparkSession = SparkSession
- .builder() //使用建造者模式构建对象
- .appName(this.getClass.getSimpleName.stripSuffix("$"))
- .master("local[*]")
- .getOrCreate()
- //2.导入隐含的转换
- import spark.implicits._
- //3.加载数据
- val fileDS: Dataset[String] = spark.read.textFile("d:/mydata/a.txt")
- //4.切分
- val wordDS:Dataset[String] = fileDS.flatMap(_.split("\\s+"))// \s+:正则表达式"\\s+" - 匹配任意空白字符
- //5.使用DSL风格完成单词统计计数
- wordDS.printSchema()
- wordDS.show()
- wordDS.groupBy("value").count().orderBy($"count".desc).show()
- //6.使用SQL风格完成单词统计计数
- //注册为临时视图
- wordDS.createOrReplaceTempView("v_wordcount")
- //写sql语句
- val sql: String =
- """
- |select value,count(value) as count_value
- |from v_wordcount
- |group by value
- |order by count_value desc
- |""".stripMargin
- spark.sql(sql).show
-
- //SQL的方式2 createOrReplaceGlobalTempView (了解)
- wordDS.createOrReplaceGlobalTempView("global_table_view")
- spark.sql("select * from global_temp.global_table_view").show()
- //全局session可访问
- spark.newSession().sql("select * from global_temp.global_table_view").show()
- spark.newSession().sql("select value,count(value) as count_value from global_temp.global_table_view group by value order by count_value desc").show()
- //关闭spqrkSession
- spark.stop()
- }
- }
作业代码参考原文链接:https://blog.csdn.net/m0_49834705/article/details/112801436
运行结果:
在SparkSQL中,提供一套完整的Api,用于方便的读写外部数据源,比如csv、jdbc、json、orc、parquet、text等,在2.4版本中还支持iamge Source(图像数据源)和Avro Source
数据分为结构化数据、非结构化数据和半结构化数据,sparkSql对它们提供了加载、保存等api
加载json格式数据
上传hadoop集群
启动spark-shell
- #2. 加载、保存数据
- scala> val peopleDF = spark.read.format("json").load("hdfs://master:8020/mydata/people.json")
- val peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
-
- scala> peopleDF.count()
- val res0: Long = 3
-
- scala> peopleDF.show()
- +----+-------+
- | age| name|
- +----+-------+
- |null|Michael|
- | 30| Andy|
- | 19| Justin|
- +----+-------+
-
-
- scala> peopleDF.select("name","age").show()
- +-------+----+
- | name| age|
- +-------+----+
- |Michael|null|
- | Andy| 30|
- | Justin| 19|
- +-------+----+
-
-
- scala> val resultDF = peopleDF.select("name","age")
- val resultDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
-
- scala> resultDF.show
- warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
- +-------+----+
- | name| age|
- +-------+----+
- |Michael|null|
- | Andy| 30|
- | Justin| 19|
- +-------+----+
-
-
- scala> resultDF.write.format("parquet").save("hdfs://master:8020/mydata/people.parquet")
-
- scala> val peopleParqueDF = spark.read.format("parquet").load("hdfs://master:8020/mydata/people.parquet")
- |
- val peopleParqueDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
-
- scala> peopleParqueDF.show()
- +-------+----+
- | name| age|
- +-------+----+
- |Michael|null|
- | Andy| 30|
- | Justin| 19|
- +-------+----+
-
-
- scala> val peopleParqueDF1 = spark.read.parquet("hdfs://master:8020/mydata/people.parquet")
- val peopleParqueDF1: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
-
- scala> peopleParqueDF1.show()
- +-------+----+
- | name| age|
- +-------+----+
- |Michael|null|
- | Andy| 30|
- | Justin| 19|
- +-------+----+
- scala> val peopleCSVDF = spark.read.csv("hdfs://master:8020/mydata/people.csv")
- val peopleCSVDF: org.apache.spark.sql.DataFrame = [_c0: string]
-
- scala> peopleCSVDF.show()
- +------------------+
- | _c0|
- +------------------+
- | name;age;job|
- |Jorge;30;Developer|
- | Bob;32;Developer|
- +------------------+
-
-
- scala> val peopleCSVDF1 = spark.read.option("sep",";").option("header","true").csv("hdfs://master:8020/mydata/people.csv")
- val peopleCSVDF1: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]
-
- scala> peopleCSVDF1.show()
- +-----+---+---------+
- | name|age| job|
- +-----+---+---------+
- |Jorge| 30|Developer|
- | Bob| 32|Developer|
- +-----+---+---------+
-
- scala> peopleCSVDF1.select("name","age").show()
- +-----+---+
- | name|age|
- +-----+---+
- |Jorge| 30|
- | Bob| 32|
- +-----+---+
关系数据库管理系统:mysql、sqlserver、oracle、db2,以mysql为例
- package cn.edu.hgu
- import org.apache.spark.sql.{DataFrame,SparkSession}
- import java.util.Properties
-
- /**
- * description:使用spark从mysql中读取数据
- * author:不知名小元
- * date:2023/10/29 20:22
- */
- object SparkDatafromMysql {
- def main(args: Array[String]): Unit = {
- //1.创建sparksession实例对象
- val spark:SparkSession = SparkSession.builder()
- .master("local[*]")
- .appName("SparkDatafromMysql")
- .getOrCreate()
-
- //2.导入隐式转换
- import spark.implicits._
- //3.连接数据库,三要素:url,表名,属性(包括驱动、用户名,密码)
- val url:String ="jdbc:mysql://localhost:3307/edu"
- val tableName: String = "students"
- val properties :Properties =new Properties()
- properties.setProperty("dirver","com.mysql.jdbc.Driver")
- properties.setProperty("user","root")
- properties.setProperty("password","root")
-
- //4.读取mysql数据
- val studentDF:DataFrame = spark.read.jdbc(url,"students",properties)
- //5.操作DataFrame
- //5.1 DSL风格
- studentDF.printSchema()
- studentDF.show()
- //5.2 SQL风格
- //7.1将DF注册为一个临时视图
- studentDF.createOrReplaceTempView("v_student")
- spark.sql("select * from v_student where age >19").show()
- //6. 关闭资源
- spark.stop()
- }
- }
Start-yarn.sh
- <property>
- <name>dfs.webhdfs.enabled</name>
- <value>true</value>
- </property>
-
- #重启Hadoop集群
- [root@master hadoop]# scp hdfs-site.xml slave1:$PWD
- hdfs-site.xml 100% 1125 347.4KB/s 00:00
- [root@master hadoop]# scp hdfs-site.xml slave2:$PWD
- hdfs-site.xml 100% 1125 710.1KB/s 00:00
- [root@master hadoop]# scp core-site.xml slave1:$PWD
- core-site.xml 100% 1472 809.8KB/s 00:00
- [root@master hadoop]# scp core-site.xml slave2:$PWD
- core-site.xml 100% 1472 803.6KB/s 00:00
-
-
- sbin/stop-dfs.sh
- sbin/stop-yarn.sh
-
- sbin/start-dfs.sh
- sbin/stast-yarn.sh
-
- 启动hiveserver2服务
- 前台启动
- cd /export/servers/apache-hive-2.1.1-bin/
- bin/hive --service hiveserver2
- 后台启动
- nohup bin/hive --service hiveserver2 > /dev/null 2>&1 &
-
- 使用beeline工具连接hiveserver2
- bin/beeline
- beeline> !connect jdbc:hive2://master:10000
1.修改Hadoop的hdfs-site.xml文件
2.修改Hadoop的core-site.xml文件
#重启Hadoop集群
sbin/stop-dfs.sh
sbin/stop-yarn.sh
sbin/start-dfs.sh
sbin/stast-yarn.sh
启动beeline
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。