当前位置:   article > 正文

SparkSQL编程初级实践

sparksql编程初级实践

一、目的与要求
1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。
二、实验内容
1.Spark SQL基本操作
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{ “id”:1 , “name”:“Ella” , “age”:36 }
{ “id”:2, “name”:“Bob”,“age”:29 }
{ “id”:3 , “name”:“Jack”,“age”:29 }
{ “id”:4 , “name”:“Jim”,“age”:28 }
{ “id”:4 , “name”:“Jim”,“age”:28 }
{ “id”:5 , “name”:“Damon” }
{ “id”:5 , “name”:“Damon” }
为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id字段;
(4) 筛选出age>30的记录;
(5) 将数据按age分组;
(6) 将数据按name升序排列;
(7) 取出前3行数据;
(8) 查询所有记录的name列,并为其取别名为username;
(9) 查询年龄age的平均值;
(10) 查询年龄age的最小值。
2.编程实现将RDD转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
3. 编程实现利用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表5-2所示的两行数据。
表5-2 employee表原有数据
id name gender age
1 Alice F 22
2 John M 25
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表5-3 employee表新增数据
id name gender age
3 Mary F 26
4 Tom M 23
5 你的名字 你的性别 你的年龄

三、实验步骤(实验过程)
1.Spark SQL基本操作
(1) 查询所有数据;

df=spark.read.json("file:///home/master/mydemo/demo_05/employee.json")
df.show()
  • 1
  • 2


(2) 查询所有数据,并去除重复的数据;

df.distinct().show()
  • 1


(3) 查询所有数据,打印时去除id字段;

df.select(df["age"],df["name"]).show()
  • 1


(4) 筛选出age>30的记录;

df.filter(df["age"]>30).show()
  • 1


(5) 将数据按age分组;

df.groupBy("age").count().show()
  • 1


(6) 将数据按name升序排列;

df.sort("name").show()
  • 1


(7) 取出前3行数据;

df.take(3)
  • 1


(8) 查询所有记录的name列,并为其取别名为username;

df.select(df.name.alias("username")).show()
  • 1


(9) 查询年龄age的平均值;

df.agg({"age":"mean"}).show()
  • 1


(11) 查询年龄age的最小值。

df.agg({"age":"min"}).show()
  • 1

2.编程实现将RDD转换为DataFrame

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age", IntegerType(), True)])
#下面设置两条数据,表示两个学生的信息
studentRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda x:x.split(" "))

#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))

#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)

#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'zcw0210ZCW30!'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)

jdbcDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark?useSSL=false")\
.option("dbtable","student").option("user","root")\
.option("password","zcw0210ZCW30!").load()

jdbcDF.collect()
jdbcDF.distinct().show()
jdbcDF.agg({"age":"max"}).show()
jdbcDF.agg({"age":"sum"}).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3.编程实现利用DataFrame读写MySQL的数据
sql配置见另一篇文章:Spark SQL末尾部分,有详细的mysql安装布置

systemctl start mysqld.service
mysql -u root -p
  • 1
  • 2
mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4),name char(20),gender char(4),age int(4));
mysql> insert into employee values(1,"Alice","F",22);
mysql> insert into employee values(2,"John","M",25);
  • 1
  • 2
  • 3
  • 4
  • 5
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age", IntegerType(), True)])
#下面设置两条数据,表示两个学生的信息
studentRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 ZCW M 21"]).map(lambda x:x.split(" "))

#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))

#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)

#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'zcw0210ZCW30!'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)

jdbcDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark?useSSL=false")
.option("dbtable","student").option("user","root")
.option("password","zcw0210ZCW30!").load()

jdbcDF.collect()
jdbcDF.distinct().show()
jdbcDF.agg({"age":"max"}).show()
jdbcDF.agg({"age":"sum"}).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

实验结果:
1.Spark SQL基本操作
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
2.编程实现将RDD转换为DataFrame在这里插入图片描述
在这里插入图片描述
3.编程实现利用DataFrame读写MySQL的数据
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/589183
推荐阅读
相关标签
  

闽ICP备14008679号