当前位置:   article > 正文

大数据编程实验三:Spark SQL_请将数据复制保存到linux系统中,命名为employee.txt,实现从rdd转换得到datafr

请将数据复制保存到linux系统中,命名为employee.txt,实现从rdd转换得到dataframe,

一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;

2、熟悉RDD到DataFrame的转化方法;

3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1.Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。

2.编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

       请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3. 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表5-2所示的两行数据。

表5-2 employee表原有数据

id

name

gender

age

1

Alice

F

22

2

John

M

25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表5-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

5

你的名字

你的性别

你的年龄

三、实验步骤(实验过程)

1.(1)>>> sp=SparkSession.builder.getOrCreate()

>>> df=sp.read.json("file:///home/deeszechyi/employee.json")

>>> df.show()

(2)>>> df.distinct().show()

(3)>>> df.drop("id").show()

(4)>>> df.filter(df['age']>30).show()

(5)>>> df.groupBy(df['age']).count().show()

(6)>>> df.sort(df["name"]).show()

(7)>>> df.take(3)

(8)>>> df.select(df['name'].alias("username")).show()

(9)>>> df.agg({"age":"mean"}).show()

(10)>>> df.agg({"age":"min"}).sh

2.

from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:///home/deeszechyi/emloyee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
   personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

3.

mysql> create database sparktest

mysql> use sparktest;

mysql> create table employee(id int(4),name char(20),gender char(4),age int(4));

mysql> insert into employee values(1,'Alice','F',22);

mysql> insert into employee values(2,'John','M',25);

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":

    sc = SparkContext( 'local', 'test')
    spark=SQLContext(sc)
    jdbcDF=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/sparktest?useSSL=false").option("dbtable","employee").option("user", "root").option("password", "MYsql123!").load()
    jdbcDF.filter(jdbcDF.age>20).collect()      # 检测是否连接成功
    studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
    schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
    rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
    employeeDF = spark.createDataFrame(rowRDD, schema)
    prop = {}
    prop['user'] = 'root'
    prop['password'] = 'MYsql123!'
    prop['driver'] = "com.mysql.jdbc.Driver"
    employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
    jdbcDF.collect()
    jdbcDF.agg({"age": "max"}).show()
    jdbcDF.agg({"age": "sum"}).show()

四、实验结果

1.

(1)

(2).

(3).

(4).

(5).

(6).

(7).

(8).

(9).

(10).

2.

3.


 

五、结果分析与实验体会(列出遇到的问题和解决办法,列出没有解决的问题;写下本次实验的体会和感受)

  

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/498777
推荐阅读
相关标签
  

闽ICP备14008679号