赞
踩
目录
在很多情况下,开发工程师并不了解Scala语言,也不了解Spark常用API,但又非常想要使用Spark框架提供的强大的数据分析能力。Spark的开发工程师们考虑到了这个问题,利用SQL语言的语法简洁、学习门槛低以及在编程语言普及程度和流行程度高等诸多优势,从而开发了Spark SQL模块,通过Spark SQL,开发人员能够通过使用SQL语句,实现对结构化数据的处理。
Spark SQL架构与Hive架构相比,把底层的MapReduce执行引擎更改为Spark,还修改了Catalyst优化器,Spark SQL快速的计算效率得益于Catalyst优化器。从HiveQL被解析成语法抽象树起,执行计划生成和优化的工作全部交给Spark SQL的Catalyst优化器进行负责和管理。
val res = spark.sql( "SELECT * FROM student")
Hive
、Avro
、Parquet
、ORC
、JSON
、JDBC
等。// 读取JSON文件
val userScoreDF = spark.read.json("hdfs://master:9000/users.json")
// 创建临时视图user_score
userScoreDF.createTempView("user_score")
// 根据name关联查询
val resDF = spark.sql("SELECT i.age, i.name, c.score FROM user_info i INNER JOIN user_score c ON i.name = c.name")
HiveQL
语法以及Hive SerDes
和UDF
(用户自定义函数),允许访问现有的Hive仓库。DataFrame在RDD的基础上添加了数据描述信息(Schema,模式,即元信息),因此看起来更像是一张数据库表。
一个RDD中有5
行数据
使用DataFrame API结合SQL处理结构化数据比RDD更加容易,而且通过DataFrame API或SQL处理数据,Spark优化器会自动对其优化,即使写的程序或SQL不高效,也可以运行得很快。
DataFrame
所代表的是一个元素类型为Row
的Dataset
,即DataFrame
只是Dataset[Row]
的一个类型别名。sc
的SparkContext
的实例外,还创建了一个名为spark
的SparkSession
实例,该spark
变量可以在Spark Shell
中直接使用。1,郑秀芸,女,20
2,王志峰,男,18
3,陈燕文,女,21
4,郑国栋,男,19
5,肖雨涵,男,20
vim student.txt
,创建student.txt
文件student.txt
上传到HDFS的/input
目录(如果目录不存在,就创建起来)spark-shell --master spark://master:7077
调用SparkSession对象的read.textFile()
可以读取指定路径中的文件内容,并加载为一个Dataset
执行命令:val ds = spark.read.textFile("hdfs://master:9000/input/student.txt")
从变量ds的类型可以看出,textFile()方法将读取的数据转为了Dataset。除了使用textFile()方法读取文本内容外,还可以使用csv()、jdbc()、json()等方法读取CSV文件、JDBC数据源、JSON文件等数据。(csv: comma separated value)
ds.show()
Dataset
将文件中的每一行看作一个元素,并且所有元素组成了一列,列名默认为value
。ds.printSchema()
Student
,用于存放数据描述信息(Schema
)case class Student(id: Int, name: String, gender: String, age: Int)
new
关键字,只需要传入相应参数即可创建对象
隐式转换
,以便后续可以使用Dataset的算子import spark.implicits._
(_
表示implicits包里所有的类,类似于Java里的*
)
map()
算子将每一个元素拆分并存入Student
样例对象:paste
进入粘贴模式,然后执行红框类的命令val studentDS = ds.map(line => {
val fields = line.split(",")
val id = fields(0).toInt
val name = fields(1)
val gender = fields(2)
val age = fields(3).toInt
Student(id, name, gender, age)
})
studentDS.show()
studentDS
中的数据类似于一张关系型数据库的表。studentDS.printSchema()
(3)对数据集进行投影操作
studentDS.select("name", "age").show()
(4)对数据集进行过滤操作
studentDS.filter("gender == '女'").show()
studentDS.filter("age >= 19 and age <= 20").show()
(5)对数据集进行统计操作
studentDS.groupBy("gender").sum("age").show()
studentDS.groupBy("gender").avg("age").show()
studentDS.groupBy("gender").max("age").show()
studentDS.groupBy("gender").min("age").show()
(6)对数据集进行排序操作
studentDS.sort("age").show()
studentDS.sort("gender", studentDS("age").desc).show()
studentDS.sort(studentDS("gender"), studentDS("age").desc).show()
(7)重命名数据集字段
执行命令:studentDS.select(studentDS("id").as("学号"), studentDS("name").as("姓名"), studentDS("gender").as("性别"), studentDS("age").as("年龄")).show()
toDF()
方法,将存有元数据的Dataset转为DataFrame。val studentDF = studentDS.toDF()
studentDF.show()
(2)显示数据帧模式信息
studentDF.printSchema()
(3)对数据帧进行投影操作
studentDF.select(studentDF("name"), studentDF("age") + 1).show()
(4)对数据帧进行过滤操作
studentDF.filter(studentDF("age") > 19).show()
studentDF.filter("age > 20 and gender == '女'").show()
(5)对数据帧进行统计操作
studentDF.count()
studentDF.groupBy("gender").sum("age").show()
,
studentDF.groupBy("gender").avg("age").show()
,
studentDF.groupBy("gender").max("age").show()
,
studentDF.groupBy("gender").min("age").show()
studentDF.groupBy("gender").count().show()
(6)对数据帧进行排序操作
studentDF.sort("age").show()
studentDF.sort(studentDF("age").desc).show()
studentDF.sort(studentDF("gender"), studentDF("age").desc).show()
(7)重命名数据帧字段
studentDF.select(studentDF("id").as("学号"), studentDF("name").as("姓名"), studentDF("gender").as("性别"), studentDF("age").as("年龄")).show()
studentDF
,创建一个临时视图student
,就可以对student
视图进行SQL操作studentDF.createTempView("student")
studentDF.createOrReplaceTempView("student")
spark
的SparkSession
对象(1)查询全部表记录
spark.sql("select * from student").show()
(2)显示数据表结构
spark.sql("describe student").show()
(3)对表进行投影操作
spark.sql("select name, age + 1 from student").show()
(4)对表进行选择操作
spark.sql("select * from student where age > 19").show()
spark.sql("select * from student where age > 20 and gender = '女'").show()
(5)对表进行统计操作
查询学生表总记录数,执行命令:spark.sql("select count(*) count from student").show()
spark.sql("select gender, sum(age) from student group by gender").show()
spark.sql("select gender, avg(age) from student group by gender").show()
spark.sql("select gender, max(age) from student group by gender").show()
spark.sql("select gender, min(age) from student group by gender").show()
spark.sql("select gender, count(id) from student group by gender").show()
(6)对表进行排序操作
spark.sql("select * from student order by age").show()
spark.sql("select * from student order by age desc").show()
spark.sql("select * from student order by gender asc, age desc").show()
(7)重命名数据表字段
spark.sql("select id stu_id, name stu_name, gender stu_gender, age stu_age from student").show()
spark.sql("select id 学号, name 姓名, gender 性别, age 年龄 from student").show()
,无法解析中文别名
姓名 | 语文 | 数学 | 英语 |
---|---|---|---|
张钦林 | 78 | 90 | 76 |
陈燕文 | 95 | 88 | 98 |
卢志刚 | 78 | 80 | 60 |
/input/scores.txt
文件
scores.txt
文件,创建scoreDF
数据帧scoreDF
数据帧进行下列操作(1)显示数据帧内容
(2)显示数据帧模式信息
(3)对数据帧进行投影操作
(4)对数据帧进行过滤操作
(5)对数据帧进行统计操作
(6)对数据帧进行排序操作
(7)重命名数据帧字段
scoreDF
数据帧创建临时表score
score
数据表进行下列操作Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。