赞
踩
在spark2.0之后,SparkSession 封装了 SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。
(1)在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上。
vim person.txt
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 wangwang 33
6 hahaha 55
然后上传到hdfs:hdfs dfs -put person.txt /
(2)执行以下命令,读取数据,将每一行的数据使用列分隔符分割
启动spark-shell
spark-shell --master local[2]
val line=sc.textFile("/person.txt").map(_.split(" "))
(3)定义case class(相当于表的schema)
case class Person(id:Int,name:String,age:Int)
(4)将RDD和case class关联
val personRDD=line.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
(5)通过RDD转换成DataFrame:将personRDD转换成personDF
val personDF=personRDD.toDF
(6)展示DataFrame
查看DataFrame表格数据
personDF.show
打印personDF的Schema信息
personDF.printSchema
注意:"/person.txt"这种目录格式可以使用,是在spark已经整合了HDFS的情况下方可。
https://blog.csdn.net/Fenggms/article/details/82860519
也可以通过SparkSession构建DataFrame
使用spark-shell中已经初始化好的SparkSession对象spark生成DataFrame
可以调用spark.read的方法,主要共有以下一些情况:
val df = spark.read.text("/person.txt")
//打印schema
df.printSchema
//展示数据
df.show
使用spark中的examples文件,上传文件到hdfs
hdfs dfs -put /export/servers/spark/examples/src/main/resources/people.json /
回到spark shell窗口,进行读取json文件
val jsonDF=spark.read.json("/people.json")
数据展示:
jsonDF.show
jsonDF.printSchema
其他类型文件,也可如此进行读取。如parquet,orc,table等。体现了Spark SQL统一的数据访问方式的特点。
SparkSession.read.文件格式(该格式的文件路径)
DataFrame自身提供了一个领域特定语言(DSL)来操作结构化数据。可以通过dataFrame自身提供的这套API操作DataFrame
查看DataFrame中的内容,通过调用show方法,同上面演示过的内容
查看DataFrame部分列中的内容,通过select方法
personDF.select("name").show
personDF.select("name","age").show
查询所有的name和age,并将age+1
personDF.select(col("id"), col("name"), col("age") + 1).show
//或者通过$来进行
personDF.select($"name",$"age",$"age"+1).show
查询所有age大于30的,通过filter方法
personDF.filter($"age">30).show
按年龄进行分组并统计相同年龄的人数,通过先分组再统计的方法。
personDF.groupBy("age").count().show
可以将DataFrame看作是一个关系型数据表。然后使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
步骤一:先将DataFrame注册成一个表
personDF.registerTempTable("person")
步骤二:通过sparkSession.sql(sql语句)进行各种查询操作
spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select * from person where id =1").show
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。