赞
踩
将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json。
- { "id":1 ,"name":" Ella","age":36 }
- { "id":2,"name":"Bob","age":29 }
- { "id":3 ,"name":"Jack","age":29 }
首先为employee.json创建DataFrame,并写出Python语句完成下列操作:
创建DataFrame
答案:
>>> spark=SparkSession.builder().getOrCreate()
>>> df = spark.read.json("file:///usr/local/spark/employee.json")
答案:>>> df.show()
答案:>>> df.distinct().show()
答案:>>> df.drop("id").show()
答案:>>> df.filter(df.age > 30 ).show()
答案:>>> df.groupBy("name").count().show()
答案:>>> df.sort(df.name.asc()).show()
答案:>>> df.take(3) 或python> df.head(3)
答案:>>> df.select(df.name.alias("username")).show()
答案:>>> df.agg({"age": "mean"}).show()
答案:>>> df.agg({"age": "max"}).show()
方法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;
方法二:使用编程接口,构造一个schema并将其应用在已知的RDD上。
编写独立应用程序实现,将RDD转换为DataFrame,分别使用反射机制和编程方式,打印出记录的信息
1,Ella,36
2,Bob,29
3,Jack,29
- #/usr/local/spark
- from pyspark.conf import SparkConf
- from pyspark.sql.session import SparkSession
- from pyspark import SparkContext
- from pyspark.sql.types import Row
- from pyspark.sql import SQLContext
- if __name__ == "__main__":
- sc = SparkContext("local","Simple App")
- spark=SparkSession(sc)
- peopleRDD = spark.sparkContext.textFile("file:/usr/local/spark/employee.txt")
- rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
- rowRDD.createOrReplaceTempView("employee")
- personsDF = spark.sql("select * from employee")
- personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)
编写独立应用程序实现,配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,打印出记录的信息、age的最大值和age的总和(数据记录共3行,红色字体第三行为你的真实学号、姓名、姓别、年纪)。
employee表新增数据
id | name | gender | age |
1 | Mary | F | 26 |
2 | Tom | M | 23 |
id | name | M | 10 |
create database sparktest;
- #/usr/local/spark/mysqltest.py
- from pyspark.sql import Row
- from pyspark.sql.types import *
- from pyspark import SparkContext,SparkConf
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
-
- schema = StructType([StructField("id",LongType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
- employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","id name M 10"]).map(lambda x:x.split(" "))
- rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
-
- employeeDF = spark.createDataFrame(rowRDD, schema)
- prop = {}
- prop['user'] = 'root'
- prop['password'] = '123456'
- prop['driver'] = "com.mysql.jdbc.Driver"
- employeeDF.write.jdbc("jdbc:mysql://localhost:3306/employee?useSSL=false",'employee','append', prop)
- employeeDF.collect()
- employeeDF.agg({"age": "max"}).show()
- employeeDF.agg({"age": "sum"}).show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。