当前位置:   article > 正文

SparkSQL(3)——Spark SQL DataFrame操作_sparksql中,使用sql()方法可以执行sql语句并返回dataframe

sparksql中,使用sql()方法可以执行sql语句并返回dataframe

读取数据源创建DataFrame

在spark2.0之后,SparkSession 封装了 SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。

读取文本文件创建DataFrame

(1)在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上。
vim person.txt

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 wangwang 33
6 hahaha 55
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

然后上传到hdfs:hdfs dfs -put person.txt /
(2)执行以下命令,读取数据,将每一行的数据使用列分隔符分割
启动spark-shell
spark-shell --master local[2]
val line=sc.textFile("/person.txt").map(_.split(" "))

(3)定义case class(相当于表的schema)
case class Person(id:Int,name:String,age:Int)

在这里插入图片描述
(4)将RDD和case class关联

val personRDD=line.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
  • 1

(5)通过RDD转换成DataFrame:将personRDD转换成personDF

val personDF=personRDD.toDF
  • 1

(6)展示DataFrame
查看DataFrame表格数据

personDF.show
  • 1

在这里插入图片描述
打印personDF的Schema信息

personDF.printSchema
  • 1

在这里插入图片描述


注意:"/person.txt"这种目录格式可以使用,是在spark已经整合了HDFS的情况下方可。
https://blog.csdn.net/Fenggms/article/details/82860519


也可以通过SparkSession构建DataFrame
使用spark-shell中已经初始化好的SparkSession对象spark生成DataFrame
可以调用spark.read的方法,主要共有以下一些情况:
在这里插入图片描述

val df = spark.read.text("/person.txt")
//打印schema
df.printSchema
//展示数据
df.show
  • 1
  • 2
  • 3
  • 4
  • 5
读取json 文件创建DataFrame

使用spark中的examples文件,上传文件到hdfs

hdfs dfs -put /export/servers/spark/examples/src/main/resources/people.json /
  • 1

回到spark shell窗口,进行读取json文件

val jsonDF=spark.read.json("/people.json")
  • 1

数据展示:

jsonDF.show
jsonDF.printSchema
  • 1
  • 2

在这里插入图片描述

其他类型文件,也可如此进行读取。如parquet,orc,table等。体现了Spark SQL统一的数据访问方式的特点。
SparkSession.read.文件格式(该格式的文件路径)

DataFrame常用操作

DSL风格语法

DataFrame自身提供了一个领域特定语言(DSL)来操作结构化数据。可以通过dataFrame自身提供的这套API操作DataFrame
查看DataFrame中的内容,通过调用show方法,同上面演示过的内容
查看DataFrame部分列中的内容,通过select方法

personDF.select("name").show
personDF.select("name","age").show
  • 1
  • 2

查询所有的name和age,并将age+1

personDF.select(col("id"), col("name"), col("age") + 1).show
//或者通过$来进行
personDF.select($"name",$"age",$"age"+1).show
  • 1
  • 2
  • 3

查询所有age大于30的,通过filter方法

personDF.filter($"age">30).show
  • 1

在这里插入图片描述
按年龄进行分组并统计相同年龄的人数,通过先分组再统计的方法。

personDF.groupBy("age").count().show
  • 1
SQL风格语法

可以将DataFrame看作是一个关系型数据表。然后使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
步骤一:先将DataFrame注册成一个表

personDF.registerTempTable("person")
  • 1

步骤二:通过sparkSession.sql(sql语句)进行各种查询操作

spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select * from person where id =1").show
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/593280
推荐阅读
相关标签
  

闽ICP备14008679号