赞
踩
1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
按实验要求完成实验。
1.Spark SQL基本操作
Win10更新后,linux系统的Hadoop环境已崩溃,故在此使用datadatabricks
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{ “id”:1 , “name”:" Ella" , “age”:36 }
{ “id”:2, “name”:“Bob”,“age”:29 }
{ “id”:3 , “name”:“Jack”,“age”:29 }
{ “id”:4 , “name”:“Jim”,“age”:28 }
{ “id”:4 , “name”:“Jim”,“age”:28 }
{ “id”:5 , “name”:“Damon” }
{ “id”:5 , “name”:“Damon” }
#读取json文件 \# File location and type file_location = "/FileStore/tables/employee.json" file_type = "json" \# CSV options infer_schema = "false" first_row_is_header = "false" delimiter = "," \# The applied options are for CSV files. For other file types, these will be ignored. df = spark.read.format(file_type) \ .option("inferSchema", infer_schema) \ .option("header", first_row_is_header) \ .option("sep", delimiter) \ .load(file_location) display(df) #打印出文件
# 创建employee临时表
df.registerTempTable("employee")
sqlDF_01 = spark.sql("SELECT * FROM employee");
sqlDF_01.show();
]
2.查询所有的数据,并去重
sqlDF_02 = spark.sql("SELECT DISTINCT * FROM employee");
sqlDF_02.show();
3.查询所有的数据,打印去除id字段(这里只查询了name age)
sqlDF_03 = spark.sql("SELECT name,age FROM employee");
sqlDF_03.show();
4.查询age>30的数据
sqlDF_04 = spark.sql("SELECT * FROM employee WHERE age > 30")
sqlDF_04.show();
]
5.age分组
sqlDF_05 = spark.sql("SELECT * FROM employee GROUP BY age");
sqlDF_05.show();
]
6.name升序
sqlDF_06 = spark.sql("SELECT * FROM employee ORDER BY name")
sqlDF_06.show();
]
7.前三行
sqlDF_07 = spark.sql(“SELECT * FROM employee”)
sqlDF_07.show(3);
8.修改列名,name改成uersname
sqlDF_08 = spark.sql("SELECT name AS username FROM employee")
sqlDF_08.show();
9.查询age平均值
sqlDF_09 = spark.sql("SELECT AVG(age) AS ageAverage FROM employee")
sqlDF_09.show();
10.查询age最小值
sqlDF_10 = spark.sql("SELECT MIN(age) AS ageAverage FROM employee")
sqlDF_10.show();
]
2.编程实现将RDD转换为DataFrame. (Scala)
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
case class students_data(id:Int,name:String,gender:String,age:Int,course_id:Int,score:Double,classes:String) //定义一个case class
val stuDF = spark.sparkContext.textFile("/FileStore/tables/students_data-7.txt").map(_.split(",")).map(t => students_data(t(0).trim.toInt, t(1), t(2), t(3).trim.toInt, t(4).trim.toInt, t(5).trim.toDouble, t(6))).toDF()
stuDF.createOrReplaceTempView("stu") //必须注册为临时表才能供下面的查询
val stuRDD = spark.sql("select * from stu")//最终生成一个DataFrame
stuRDD.show()
]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。