当前位置:   article > 正文

spark实验三 Spark SQL编程初级实践

spark sql编程初级实践

Spark SQL基本操作

将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json。

  1. { "id":1 ,"name":" Ella","age":36 }
  2. { "id":2,"name":"Bob","age":29 }
  3. { "id":3 ,"name":"Jack","age":29 }

首先为employee.json创建DataFrame,并写出Python语句完成下列操作:

创建DataFrame

答案:

>>> spark=SparkSession.builder().getOrCreate()

>>> df = spark.read.json("file:///usr/local/spark/employee.json")

  1. 查询DataFrame的所有数据

答案:>>> df.show()

  1. 查询所有数据,并去除重复的数据

答案:>>> df.distinct().show()

  1. 查询所有数据,打印时去除id字段

答案:>>> df.drop("id").show()

  1. 筛选age>20的记录

答案:>>> df.filter(df.age > 30 ).show()

  1. 将数据按name分组

答案:>>> df.groupBy("name").count().show()

  1. 将数据按name升序排列

答案:>>> df.sort(df.name.asc()).show()

取出前3行数据

答案:>>> df.take(3) 或python> df.head(3)

查询所有记录的name列,并为其取别名为username

答案:>>> df.select(df.name.alias("username")).show()

查询年龄age的平均值

答案:>>> df.agg({"age": "mean"}).show()

查询年龄age的最大值

答案:>>> df.agg({"age": "max"}).show()

编程实现将RDD转换为DataFrame

方法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;

方法二:使用编程接口,构造一个schema并将其应用在已知的RDD上。

练习

编写独立应用程序实现,将RDD转换为DataFrame,分别使用反射机制和编程方式,打印出记录的信息

1,Ella,36

2,Bob,29

3,Jack,29

  1. #/usr/local/spark
  2. from pyspark.conf import SparkConf
  3. from pyspark.sql.session import SparkSession
  4. from pyspark import SparkContext
  5. from pyspark.sql.types import Row
  6. from pyspark.sql import SQLContext
  7. if __name__ == "__main__":
  8. sc = SparkContext("local","Simple App")
  9. spark=SparkSession(sc)
  10. peopleRDD = spark.sparkContext.textFile("file:/usr/local/spark/employee.txt")
  11. rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
  12. rowRDD.createOrReplaceTempView("employee")
  13. personsDF = spark.sql("select * from employee")
  14. personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

编写独立应用程序实现,配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,打印出记录的信息、age的最大值和age的总和(数据记录共3行,红色字体第三行为你的真实学号、姓名、姓别、年纪)。

   employee表新增数据

id

name

gender

age

1

Mary

F

26

2

Tom

M

23

id

name

M

10

 

create database sparktest;

  1. #/usr/local/spark/mysqltest.py
  2. from pyspark.sql import Row
  3. from pyspark.sql.types import *
  4. from pyspark import SparkContext,SparkConf
  5. from pyspark.sql import SparkSession
  6. spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
  7. schema = StructType([StructField("id",LongType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
  8. employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","id name M 10"]).map(lambda x:x.split(" "))
  9. rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
  10. employeeDF = spark.createDataFrame(rowRDD, schema)
  11. prop = {}
  12. prop['user'] = 'root'
  13. prop['password'] = '123456'
  14. prop['driver'] = "com.mysql.jdbc.Driver"
  15. employeeDF.write.jdbc("jdbc:mysql://localhost:3306/employee?useSSL=false",'employee','append', prop)
  16. employeeDF.collect()
  17. employeeDF.agg({"age": "max"}).show()
  18. employeeDF.agg({"age": "sum"}).show()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/589163
推荐阅读
相关标签
  

闽ICP备14008679号