当前位置:   article > 正文

Spark编程基础(Python版)实验三RDD编程_运用rdd编程在jupyter里面运行三、实验任务 “学生成绩.txt”文件存储了学生考试

运用rdd编程在jupyter里面运行三、实验任务 “学生成绩.txt”文件存储了学生考试

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

一、实验环境

           Ubuntu18.04

           Spark 2.4.0

           Python 3.6.5

二、实验流程

1.PySpark交互式编程

在 spark下创建文件夹sparksqldata,将data01.txt上传到sparksqldata下:

  1. cd /usr/local/spark
  2. mkdir sparksqldata
  3. cd /bin
  4. ./pyspark

(1)统计学生人数(即文件的行数)

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
  2. res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //获取每行数据的第1
  3. distinct_res = res.distinct() //去重操作
  4. distinct_res.count()//取元素总个数

(2)统计开设课程总数

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. df = lines.map(lambda x:x.split(",")).map(lambda x:x[1])
  3. df1 = df.distinct()
  4. df1.count()

 

(3)计算Tom所有课程的平均分

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom")
  3. res.foreach(print)
  4. score = res.map(lambda x:int(x[2]))
  5. num = res.count()
  6. sum_score = score.reduce(lambda x,y:x+y)
  7. avg = sum_score/num
  8. print(avg)

(4)计算每一个人的选课总数

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
  3. each_res = res.reduceByKey(lambda x,y: x+y)
  4. each_res.foreach(print)

(5)计算DataBase的选修人数

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
  3. res.count()

(6)计算每门课程的平均分

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))
  3. temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
  4. avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))
  5. avg.foreach(print)

(7) 使用累加器计算共有多少人选了DataBase这门课。

  1. lines = sc.textFile("file:///usr/local/spark/sparksqldata/data01.txt")
  2. res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
  3. accum = sc.accumulator(0)
  4. res.foreach(lambda x:accum.add(1))
  5. accum.value

 

2.编写独立应用程序

退出Pyspark交互模式:exit()

(1)在spark目录下创建文件A.txt,B.txt:

  1. cd /usr/local/spark
  2. vim A.txt
  3. A.txt写入以下内容(一定要是竖着的):
  4. 20170101 x
  5. 20170102 y
  6. 20170103 x
  7. 20170104 y
  8. 20170105 z
  9. 20170106 z
  10. vim B.txt
  11. B.txt写入以下内容
  12. 20170101 y
  13. 20170102 y
  14. 20170103 x
  15. 20170104 z
  16. 20170105 y

 然后创建C.py文件:

  1. vim C.py
  2. from pyspark import SparkContext
  3. #初始化SparkContext
  4. sc = SparkContext('local','remdup')
  5. #加载两个文件A和B
  6. lines1 = sc.textFile("file:///usr/local/spark/A.txt")
  7. lines2 = sc.textFile("file:///usr/local/spark/B.txt")
  8. #合并两个文件的内容
  9. lines = lines1.union(lines2)
  10. #去重操作
  11. distinct_lines = lines.distinct()
  12. #排序操作
  13. res = distinct_lines.sortBy(lambda x:x)
  14. #将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件
  15. res.repartition(1).saveAsTextFile("file:///usr/local/spark/result")
  16. Python3 C.py

 然后执行C.py文件:python3 C.py,运行结果如下:

然后我们要查看去重的结果:

  1. cd /usr/local/spark
  2. cd result
  3. ls
  4. cat part-00000 _SUCCESS

(2)在spark目录下创建 Algorithm.txt、Database.txt、Python.txt

  1. cd /usr/local/spark
  2. vim Algorithm.txt:
  3. 小明 92
  4. 小红 87
  5. 小新 82
  6. 小丽 90
  7. vim Database.txt:
  8. 小明 95
  9. 小红 81
  10. 小新 89
  11. 小丽 85
  12. vim Python.txt:
  13. 小明 82
  14. 小红 83
  15. 小新 94
  16. 小丽 91

 然后编写python程序:

  1. vim score.py
  2. from pyspark import SparkContext
  3. sc = SparkContext('local',' avgscore')
  4. lines1 = sc.textFile("file:///usr/local/spark/Algorithm.txt")
  5. lines2 = sc.textFile("file:///usr/local/spark/Database.txt")
  6. lines3 = sc.textFile("file:///usr/local/spark/Python.txt")
  7. lines = lines1.union(lines2).union(lines3)
  8. data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
  9. res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
  10. result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
  11. result.repartition(1).saveAsTextFile("file:///usr/local/spark/result1")

 然后执行Py文件:

python3 score.py

然后查看我们统计的结果:

  1. cd /usr/local/spark
  2. cd result1
  3. ls
  4. cat part-00000 _SUCCESS

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/592830
推荐阅读
相关标签
  

闽ICP备14008679号