当前位置:   article > 正文

大数据编程实验:RDD编程_rdd编程实践

rdd编程实践

一、目的与要求

1、熟悉Spark的RDD基本操作及键值对操作;

2、熟悉使用RDD编程解决实际具体问题的方法。

二、实验内容

1给定数据集 data1.txt,包含了某大学计算机系的成绩,数据格式如下所示:

Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容:

(1)该系总共有多少学生;

先获取每行的姓名字段,再将其用字典统计汇总,最后统计出几个键值对即为学生数量

 >>> lines=sc.textFile("file:///home/deeszechyi/data1.txt")

>>> lines.foreach(print)

>>> namecount=lines.map(lambda line:(line.split(",")[0],1))

>>> namecount.foreach(print)

>>> namecount=namecount.reduceByKey(lambda x,y:(x+y))

>>> namecount.foreach(print)

>>> namecount.count()

(2)该系共开设了多少门课程;

可以考虑先使用map函数映射获取课程字段,再用字典统计,方法与第一小问类似

>>> coursecount=lines.map(lambda x:x.split(",")[1])

>>> coursecount.foreach(print)

>>> coursecount=coursecount.map(lambda x:(x,1))

>>> coursecount.foreach(print)

>>> coursecount=coursecount.reduceByKey(lambda x,y:x+y)

>>> coursecount.count()

(3)Tom同学的总成绩平均分是多少;

本题可以考虑使用filter方法过滤姓名字段为Tom的记录,再映射其课程分数

>>> filtered_rdd = score.filter(lambda x: x[0] == "Tom").map(lambda x: int(x[1]))

>>> tom_sum=filtered_rdd.reduce(lambda a,b:a+b)

>>> print(tom_sum)

>>> tom_ave=tom_sum/filtered_rdd.count()

>>> print(tom_ave)

(4)求每名同学的选修的课程门数;

该题可直接映射获取姓名字段,并用字典统计每个姓名出现次数,该次数即代表该同学所选修的课程数。

>>>stu=lines.map(lambda x:x.split(“,”)[0])

>>>stu.foreach(print)

>>>stu=stu.map(lambda x:(x,1)).reduceByKey(lambda a,b:(a+b))

>>>stu.foreach(print)

(5)该系DataBase课程共有多少人选修;

本题对数据集直接映射过滤课程为DataBase的课程

>>> course=lines.map(lambda x:x.split(",")[1])

>>> course=course.filter(lambda x:x=="DataBase")

>>> course.count()

(6)各门课程的平均分是多少;

针对问题(6),考虑使用嵌套形式的数据结构来存储,从该数据集中映射出课程名称和分数,对课程出现次数用字典进行统计:(课程名称, (分数, 1))使用reduceByKey方法将分数和方法加,得到新的数据:(课程名称,(总分数,总人数))

 >>> cave=lines.map(lambda x:(x.split(",")[1],(x.split(",")[2],1)))

>>> cave.foreach(print)

>>> cave=cave.reduceByKey(lambda x,y:(int(x[0])+int(y[0]),x[1]+y[1]))

>>> cave.foreach(print)

>>> cave=cave.map(lambda x:(x[0],x[1][0]/x[1][1]))

>>> cave.foreach(print)

7)使用累加器计算共有多少人选了DataBase这门课。

本题使用map方法映射课程字段并用字典对其进行统计,统计结果使用filter过滤即可

 >>>course=lines.map(lambda x:x.split(“,”)[1])

>>>course=course.map(lambda x:(x,1))

>>>course=course.reduceByKey(lambda x,y:(x+y))

>>>course.foreach(print)

>>>DB=course.filter(lambda x:x[0]==’DataBase’).map(lambda x:x[1])

>>>DB.foreach(print)

2.编写独立应用程序实现数据去重

于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

输入文件B的样例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根据输入的文件A和B合并得到的输出文件C的样例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

将数据写入文件A和B

创建unique.py文件,代码如下:

  1. from pyspark import SparkContext, SparkConf
  2. conf = SparkConf().setMaster("local").setAppName("Sparkunique")
  3. sc = SparkContext(conf=conf)
  4. linesA = sc.textFile("file:///home/deeszechyi/A.txt")
  5. linesB = sc.textFile("file:///home/deeszechyi/B.txt")
  6. lines = linesA.union(linesB)
  7. lines = lines.distinct()
  8. lines = lines.sortBy(lambda x: x)
  9. lines.repartition(1).saveAsTextFile("file:///home/deeszechyi/C.txt")

该段代码读取A和B文件,将两个文件内容合并去重并按照第一个字段排序,保存到C.txt中

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

    (小红,83.67)

    (小新,88.33)

    (小明,89.67)

(小丽,88.67)

创建三个文本文件和一个.py文件

代码如下:

  1. from pyspark import SparkContext,SparkConf
  2. conf=SparkConf().setMaster("local").setAppName("avescore")
  3. sc=SparkContext(conf=conf)
  4. linesA=sc.textFile("file:///home/deeszechyi/Algorithm.txt")
  5. linesB=sc.textFile("file:///home/deeszechyi/Database.txt")
  6. linesC=sc.textFile("file:///home/deeszechyi/Python.txt")
  7. lines=linesA.union(linesB).union(linesC)
  8. uniquelines=lines.distinct()
  9. ave=uniquelines.sortBy(lambda x:x).filter(bool)
  10. ave=ave.map(lambda x:x.split())
  11. ave=ave.map(lambda x:(x[0],(int(x[1]),1)))
  12. ave=ave.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
  13. ave=ave.map(lambda x:(x[0],x[1][0]/x[1][1]))
  14. ave.repartition(1).saveAsTextFile("file:///home/deeszechyi/ave.txt")

该段代码构造了一个复合型数据结构:(姓名,(成绩,1)),与第一题第(6)问相同。

4、运行教材P86第四节中的三个综合实例,对每个Python程序要给出适当的注释。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/466622
推荐阅读
相关标签
  

闽ICP备14008679号