当前位置:   article > 正文

实验三 Spark SQL基础编程_1.spark sql基本操作 将下列json格式数据复制到linux系统中,并保存命名为emplo

1.spark sql基本操作 将下列json格式数据复制到linux系统中,并保存命名为employee

实验三 Spark SQL基础编程

1.实验目的

1. 掌握 Spark SQL 的基本编程方法;

2. 熟悉 RDD 到 DataFrame 的转化方法;

3. 熟悉利用 Spark SQL 管理来自不同数据源的数据。

2.实验内容

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1, "name":"Ella", "age":36 }

{ "id":2, "name":"Bob", "age":29 }

{ "id":3, "name":"Jack", "age":29 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":4, "name":"Jim", "age":28 }

{ "id":5, "name":"Damon" }

{ "id":5, "name":"Damon" }

为 employee.json 创建 DataFrame,并写出 Python 语句完成下列操作:

(1)查询所有数据;

(2)查询所有数据,并去除重复的数据;

(3)查询所有数据,打印时去除 id 字段;

(4)筛选出 age>30 的记录;

(5)将数据按 age 分组;

(6)将数据按 name 升序排列

(7)取出前 3 行数据;

(8)查询所有记录的 name 列,并为其取别名为 username;

(9)查询年龄 age 的平均值;

(10)查询年龄 age 的最小值。

创建json文件

  1. echo '{ "id":1, "name":"Ella", "age":36 }' > employee.json
  2. echo '{ "id":2, "name":"Bob", "age":29 }' >> employee.json
  3. echo '{ "id":3, "name":"Jack", "age":29 }' >> employee.json
  4. echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
  5. echo '{ "id":4, "name":"Jim", "age":28 }' >> employee.json
  6. echo '{ "id":5, "name":"Damon" }' >> employee.json
  7. echo '{ "id":5, "name":"Damon" }' >> employee.json

参考代码:关键代码如下。

  1. #导入
  2. ....
  3. //创建sprak对象
  4. ....
  5. df = spark.read.json("employee.json")
  6. df.show()
  7. df_distinct = df.distinct()
  8. df_distinct.show()
  9. df_without_id = df.select("name", "age")
  10. df_without_id.show()
  11. df_age_gt_30 = df.filter(col("age") > 30)
  12. df_age_gt_30.show()
  13. df_grouped_by_age = df.groupBy("age").count()
  14. df_grouped_by_age.show()
  15. df_sorted_by_name = df.orderBy("name")
  16. df_sorted_by_name.show()
  17. df_top_3 = df.limit(3)
  18. df_top_3.show()
  19. df_with_username = df.select(col("name").alias("username"))
  20. df_with_username.show()
  21. print(df.select(avg("age")).first()[0])
  22. print(df.select(min("age")).first()[-3] )
  23. //关闭
  24. ...

 

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。

请写出 程序代码。

关键代码如下:

  1. # 创建SparkSession
  2. .......
  3. # 定义源文件的结构类型
  4. .......
  5. # 读取源文件并创建RDD
  6. ....
  7. # 将RDD转换为DataFrame
  8. df = spark.createDataFrame(rdd, schema)
  9. # 打印DataFrame的所有数据
  10. df.show(truncate=False)
  11. # 将DataFrame的数据按指定格式打印出来
  12. df_string = df.rdd \
  13. .map(lambda row: f"id:{row['id']},name:{row['name']},age:{row['age']}") \
  14. .collect()
  15. for data in df_string:
  16. print(data)
  17. .......

 

 

3. 编程实现利用 DataFrame 读写 MySQL 

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 1 所示 的两行数据

表 1 employee

 

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 2 所 示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

表 2 employee

sql语句:

  1. CREATE DATABASE sparktest;
  2. USE sparktest;
  3. CREATE TABLE employee (
  4. id INT PRIMARY KEY,
  5. name VARCHAR(50),
  6. gender CHAR(1),
  7. age INT
  8. );
  9. INSERT INTO employee (id, name, gender, age) VALUES
  10. (1, 'Alice', 'F', 22),
  11. (2, 'John', 'M', 25);

 python代码

  1. from pyspark.sql import SparkSession
  2. # 创建SparkSession
  3. spark = SparkSession.builder \
  4. .appName("MySQL Example") \
  5. .getOrCreate()
  6. # 定义MySQL连接信息
  7. mysql_host = "localhost"
  8. mysql_port = "3306"
  9. mysql_database = "sparktest"
  10. mysql_table = "employee"
  11. mysql_username = "root"
  12. mysql_password = "root"
  13. # 创建DataFrame
  14. data = [("3", "Mary", "F", 26), ("4", "Tom", "M", 23)]
  15. columns = ["id", "name", "gender", "age"]
  16. df = spark.createDataFrame(data, columns)
  17. # 写入MySQL数据库
  18. df.write.format("jdbc") \
  19. .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}&useSSL=false") \
  20. .option("dbtable", mysql_table) \
  21. .option("user", mysql_username) \
  22. .option("password", mysql_password) \
  23. .mode("append") \
  24. .save()
  25. # 从MySQL数据库读取数据
  26. df_mysql = spark.read.format("jdbc") \
  27. .option("url", f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}") \
  28. .option("dbtable", mysql_table) \
  29. .option("user", mysql_username) \
  30. .option("password", mysql_password) \
  31. .load()
  32. # 计算age的最大值和总和
  33. max_age = df_mysql.selectExpr("max(age)").collect()[0][0]
  34. sum_age = df_mysql.selectExpr("sum(age)").collect()[0][0]
  35. # 打印结果
  36. print("Max Age:", max_age)
  37. print("Sum of Age:", sum_age)
  38. # 关闭SparkSession
  39. spark.stop()

 

 3.参考代码

https://download.csdn.net/download/weixin_41957626/87780630

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/663309
推荐阅读
相关标签
  

闽ICP备14008679号