赞
踩
1. 掌握 Spark SQL 的基本编程方法;
2. 熟悉 RDD 到 DataFrame 的转化方法;
3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。
1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
{ "id":1, "name":"Ella", "age":36 }
{ "id":2, "name":"Bob", "age":29 }
{ "id":3, "name":"Jack", "age":29 }
{ "id":4, "name":"Jim", "age":28 }
{ "id":4, "name":"Jim", "age":28 }
{ "id":5, "name":"Damon" }
{ "id":5, "name":"Damon" }
为 employee.json 创建 DataFrame,并写出 Python 语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除 id 字段;
(4)筛选出 age>30 的记录;
(5)将数据按 age 分组;
(6)将数据按 name 升序排列;
(7)取出前 3 行数据;
(8)查询所有记录的 name 列,并为其取别名为 username;
(9)查询年龄 age 的平均值;
(10)查询年龄 age 的最小值。
创建json文件
- echo '{ "id":1, "name":"Ella", "age":36 }' > employee.json
- echo '{ "id":2, "name":"Bob", "age":29 }' >> employee.json
- echo '{ "id":3, "name":"Jack", "age":29 }' >> employee.json
- echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
- echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
- echo '{ "id":5, "name":"Damon" }' >> employee.json
- echo '{ "id":5, "name":"Damon" }' >> employee.json
参考代码:关键代码如下。
- #导入
- ....
- //创建sprak对象
- ....
-
- df = spark.read.json("employee.json")
-
-
- df.show()
-
-
- df_distinct = df.distinct()
- df_distinct.show()
-
-
- df_without_id = df.select("name", "age")
- df_without_id.show()
-
- df_age_gt_30 = df.filter(col("age") > 30)
- df_age_gt_30.show()
-
- df_grouped_by_age = df.groupBy("age").count()
- df_grouped_by_age.show()
-
- df_sorted_by_name = df.orderBy("name")
- df_sorted_by_name.show()
- df_top_3 = df.limit(3)
- df_top_3.show()
-
- df_with_username = df.select(col("name").alias("username"))
- df_with_username.show()
-
-
- print(df.select(avg("age")).first()[0])
-
-
- print(df.select(min("age")).first()[-3] )
-
- //关闭
- ...
2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。
请写出 程序代码。
关键代码如下:
- # 创建SparkSession
- .......
-
- # 定义源文件的结构类型
-
- .......
- # 读取源文件并创建RDD
- ....
-
- # 将RDD转换为DataFrame
- df = spark.createDataFrame(rdd, schema)
-
- # 打印DataFrame的所有数据
- df.show(truncate=False)
-
- # 将DataFrame的数据按指定格式打印出来
- df_string = df.rdd \
- .map(lambda row: f"id:{row['id']},name:{row['name']},age:{row['age']}") \
- .collect()
-
- for data in df_string:
- print(data)
-
- .......
3. 编程实现利用 DataFrame 读写 MySQL
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 1 所示 的两行数据
表 1 employee
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 2 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
表 2 employee
sql语句:
- CREATE DATABASE sparktest;
-
- USE sparktest;
-
- CREATE TABLE employee (
- id INT PRIMARY KEY,
- name VARCHAR(50),
- gender CHAR(1),
- age INT
- );
-
- INSERT INTO employee (id, name, gender, age) VALUES
- (1, 'Alice', 'F', 22),
- (2, 'John', 'M', 25);
python代码
- from pyspark.sql import SparkSession
-
- # 创建SparkSession
- spark = SparkSession.builder \
- .appName("MySQL Example") \
- .getOrCreate()
-
- # 定义MySQL连接信息
- mysql_host = "localhost"
- mysql_port = "3306"
- mysql_database = "sparktest"
- mysql_table = "employee"
- mysql_username = "root"
-
- mysql_password = "root"
-
- # 创建DataFrame
- data = [("3", "Mary", "F", 26), ("4", "Tom", "M", 23)]
- columns = ["id", "name", "gender", "age"]
- df = spark.createDataFrame(data, columns)
-
- # 写入MySQL数据库
- df.write.format("jdbc") \
- .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}&useSSL=false") \
- .option("dbtable", mysql_table) \
- .option("user", mysql_username) \
- .option("password", mysql_password) \
- .mode("append") \
- .save()
-
- # 从MySQL数据库读取数据
- df_mysql = spark.read.format("jdbc") \
- .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}") \
- .option("dbtable", mysql_table) \
- .option("user", mysql_username) \
- .option("password", mysql_password) \
- .load()
-
- # 计算age的最大值和总和
- max_age = df_mysql.selectExpr("max(age)").collect()[0][0]
- sum_age = df_mysql.selectExpr("sum(age)").collect()[0][0]
-
- # 打印结果
- print("Max Age:", max_age)
- print("Sum of Age:", sum_age)
-
- # 关闭SparkSession
- spark.stop()
https://download.csdn.net/download/weixin_41957626/87780630
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。