当前位置:   article > 正文

编写独立应用程序实现数据去重及求平均值_2、编写独立应用程序实现数据去重 对于两个输入文件a和b,编写spark独立应用程序,

2、编写独立应用程序实现数据去重 对于两个输入文件a和b,编写spark独立应用程序,

(1)编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

输入文件B的样例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根据输入的文件A和B合并得到的输出文件C的样例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

  1. vim ans1.py
  2. from pyspark import SparkConf, SparkContext
  3. conf = SparkConf().setAppName("MergeDeduplicationSort")
  4. sc = SparkContext(conf=conf)
  5. inputA = sc.textFile("file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/fileA")
  6. inputB = sc.textFile("file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/fileB")
  7. merged = inputA.union(inputB)
  8. deduplicated = merged.distinct()
  9. sortedResult = deduplicated.sortBy(lambda x: x)
  10. sortedResult.repartition(1).saveAsTextFile("file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/fileC")
  11. sc.stop()
  12. python3 ans1.py
  13. cd fileC
  14. cat part-00000

(2)编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

小明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

    (小红,83.67)

    (小新,88.33)

    (小明,89.67)

(小丽,88.67)

  1. vim ans2.py
  2. from pyspark import SparkConf, SparkContext
  3. conf = SparkConf().setAppName("CalculateAverageScore")
  4. sc = SparkContext(conf=conf)
  5. lines1 = sc.textFile(“file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/Javaweb”)
  6. lines2 = sc.textFile(“file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/C++”)
  7. lines3 = sc.textFile(“file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/Spark”)
  8. lines = lines1.union(lines2).union(lines3)
  9. data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
  10. res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
  11. result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
  12. result.repartition(1).saveAsTextFile(“file:///opt/module/spark-3.0.3-bin-without-hadoop/mycode/fileD”)
  13. sc.stop()
  14. python3 ans2.py
  15. cd fileD
  16. cat part-00000

 总结

  1.  通过编写独立应用程序实现一些功能了解到了在使用Spark进行数据处理时,可以通过创建SparkConf和SparkContext对象来配置和初始化Spark应用程序。对于需要对RDD进行转换的操作,可以使用各种转换函数,如union、distinct和sortBy等、最终结果可以通过将RDD保存到一个或多个文件中来进行持久化。在保存结果之前,可以对结果进行分区、排序或其他处理,以获得更好的性能和可读性。在完成Spark应用程序后,需要停止SparkContext对象,以释放资源和关闭应用程序。
  2.  编程中也遇到很多问题,如:将结果写入文件中时,应该加入repartition(1),作用是让结果合并到一个文件中,不加的话会结果写入到多个文件中。不加的话结果会分布式地保存在多个文件中,好处是可以更好地利用分布式文件系统的性能。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/592843
推荐阅读
相关标签
  

闽ICP备14008679号