当前位置:   article > 正文

Spark SQL 基本操作_将下列json格式数据复制到linux系统中,并保存命名为employee.json。 { "id"

将下列json格式数据复制到linux系统中,并保存命名为employee.json。 { "id":1 ,

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。
  1. from pyspark.sql import SparkSession
  2. # 创建SparkSession对象
  3. spark = SparkSession.builder.appName("Employee").getOrCreate()
  4. # 读取JSON文件并创建DataFrame
  5. df = spark.read.json("file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/employee.json")
  6. # (1) 查询所有数据
  7. df.show()
  8. # (2) 查询所有数据,并去除重复的数据
  9. df_drop_duplicates = df.dropDuplicates()
  10. df_drop_duplicates.show()
  11. # (3) 查询所有数据,打印时去除id字段
  12. df_no_id = df.select([c for c in df.columns if c != "id"])
  13. df_no_id.show()
  14. # (4) 筛选出age>30的记录
  15. df_age_gt_30 = df.filter(df.age > 30)
  16. df_age_gt_30.show()
  17. # (5) 将数据按age分组
  18. df_grouped_by_age = df.groupBy("age").count().show()
  19. # (6) 将数据按name升序排列
  20. df_sorted_by_name = df.orderBy("name")
  21. df_sorted_by_name.show()
  22. # (7) 取出前3行数据
  23. df_first_3_rows = df.limit(3)
  24. df_first_3_rows.show()
  25. # (8) 查询所有记录的name列,并为其取别名为username
  26. df_username = df.select(df["name"].alias("username"))
  27. df_username.show()
  28. # (9) 查询年龄age的平均值
  29. mean_age = df.agg({"age": "avg"}).collect()[0][0]
  30. print(mean_age)
  31. # (10) 查询年龄age的最小值
  32. min_age = df.agg({"age": "min"}).collect()[0][0]
  33. print(min_age)

python3 ans3.py

(1) 

(2)  

(3)  

(4)  

(5)  

(6)  

(7)  

(8)  

(9)  

 (10) 

 总结

  1.  通过查阅博客了解了agg方法可以接收多个聚合函数作为参数,也可以使用字典或多个键值对来指定要聚合的列以及聚合函数。它返回一个DataFrame,其中包含所有指定列的聚合结果。在例如计算平均值时,可以使用agg方法。第二个方法是使用groupBy方法对DataFrame进行分组,然后使用avg方法计算分组后每组age列的平均值,最后使用select方法选择要返回的列,并使用collect方法获取计算结果并转换为一个列表。由于只有一个分组并且只有一个聚合函数,因此列表中只有一个元素。使用索引[0]获取这个元素,然后使用asDict方法将其转换为字典。最后,使用字典的键'avg(age)'获取平均值聚合结果。
  2.  编程中也遇到很多问题,如:在数据需要去重时。可以使用Spark提供的dropDuplicates函数进行去重。distinct也是用来去重的,区别是distinct是根据每一条数据进行完整的比对和去重,dropDuplicates可以根据指定的字段进行去重。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/589152
推荐阅读
相关标签
  

闽ICP备14008679号