赞
踩
一、目的与要求
1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。
1.Spark SQL基本操作
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{ "id":1 , "name":"Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } |
为employee.json创建DataFrame,并写出Python语句完成下列操作:
2.编程实现将RDD转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29 |
请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
3. 编程实现利用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表5-2所示的两行数据。
表5-2 employee表原有数据
id | name | gender | age |
1 | Alice | F | 22 |
2 | John | M | 25 |
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表5-3 employee表新增数据
id | name | gender | age |
3 | Mary | F | 26 |
4 | Tom | M | 23 |
5 | 你的名字 | 你的性别 | 你的年龄 |
三、实验步骤(实验过程)
1.(1)>>> sp=SparkSession.builder.getOrCreate()
>>> df=sp.read.json("file:///home/deeszechyi/employee.json")
>>> df.show()
(2)>>> df.distinct().show()
(3)>>> df.drop("id").show()
(4)>>> df.filter(df['age']>30).show()
(5)>>> df.groupBy(df['age']).count().show()
(6)>>> df.sort(df["name"]).show()
(7)>>> df.take(3)
(8)>>> df.select(df['name'].alias("username")).show()
(9)>>> df.agg({"age":"mean"}).show()
(10)>>> df.agg({"age":"min"}).sh
2.
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext("local","Simple App")
spark=SparkSession(sc)
peopleRDD = spark.sparkContext.textFile("file:///home/deeszechyi/emloyee.txt")
rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
rowRDD.createOrReplaceTempView("employee")
personsDF = spark.sql("select * from employee")
personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)
3.
mysql> create database sparktest
mysql> use sparktest;
mysql> create table employee(id int(4),name char(20),gender char(4),age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":
sc = SparkContext( 'local', 'test')
spark=SQLContext(sc)
jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/sparktest?useSSL=false").option("dbtable","employee").option("user", "root").option("password", "MYsql123!").load()
jdbcDF.filter(jdbcDF.age>20).collect() # 检测是否连接成功
studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
jdbcDF.collect()
jdbcDF.agg({"age": "max"}).show()
jdbcDF.agg({"age": "sum"}).show()
四、实验结果
1.
(1)
(2).
(3).
(4).
(5).
(6).
(7).
(8).
(9).
(10).
2.
3.
五、结果分析与实验体会(列出遇到的问题和解决办法,列出没有解决的问题;写下本次实验的体会和感受)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。