当前位置:   article > 正文

实验二、SPARK基础编程方法_2、编写独立应用程序实现数据去重 对于两个输入文件a和b,编写spark独立应用程序,

2、编写独立应用程序实现数据去重 对于两个输入文件a和b,编写spark独立应用程序,

一、实验目的

理解SPARK工作流程;
掌握SPARK基础编程方法;

二、实验平台

操作系统:Linux(建议Ubuntu16.04);
Hadoop版本:2.7.1;
JDK版本:1.7或以上版本;
Java IDE:IDEA
spark版本:3.1.2
scala版本:2.12.10

三、实验内容

(一)、编写独立应用程序实现数据去重

对于两个输入文件A.txt 和 B.txt, 编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新的文件C.txt。
输入文件 A.txt 的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z

输入文件 B.txt 的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

根据输入的文件 A.txt 和 B.txt 合并得到的输出文件 C.txt 的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

(1)建立 input1 文件夹存放所要处理的文件。
(2)将 task1.scala 文件所在 project_one 项目打包成 jar,并将其复制到相关文件夹中,运行 project_one.jar,将结果存放在新文件夹 output1 中,如下图所示:
在这里插入图片描述
(3)查看运行结果,如下图所示:
在这里插入图片描述

程序代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object task1 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("task2_1")
    val sc = new SparkContext(conf)
    val dataFile = "file:///home/XXX/input1/A.txt,file:///home/XXX/input1/B.txt" //XXX为本人用户名
    val res = sc.textFile(dataFile,2) .filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new
HashPartitioner(1)).groupByKey().sortByKey().keys
res.saveAsTextFile("file:///home/XXX/output1/result") //XXX为本人用户名
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
(二)、编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内筒由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序(推荐使用Scala语言)求出所有学生的平均成绩,并输入到一个新文件中。

输入文件如下:
Algorithm.txt:
小明 92
小红 87
小新 82
小丽 90

Database.txt:
小明 95
小红 81
小新 89
小丽 85

Python.txt:
小明 82
小红 83
小新 94
小丽 91

输出平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

基本步骤同上
程序代码:
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._

object task2 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("AvgScore")
    val sc = new SparkContext(conf)
    val dataFile = "file:///home/XXX/input2/Algorithm.txt,file:///home/XXX/input2/Database.txt,file:///home/XXX/input2/Python.txt"  //XXX为自己用户名
    val data = sc.textFile(dataFile,3)
    val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => {
      var n = 0
      var sum = 0.0
      for(i <- x._2){
      sum = sum + i
       n = n +1
      }
    val avg = sum/n
    val format = f"$avg%1.2f".toDouble(x._1,format)
    })
    res.saveAsTextFile("file:///home/XXX/output2/result")  //XXX为自己用户名
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
运行结果:

在这里插入图片描述

(三)、对于文件people.txt, 该文件包含了序号、性别和身高三个列。

形式如下:
0 F 168
1 F 141
2 M 184
3 F 186…
编写Spark应用程序,计算得到男性总数、女性总数、男性最高身高、女性最高身高、男性最低身高、女性最低身高、男性平均升高、女性平均身高。

基本步骤同上
程序代码:
import org.apache.spark.HashPartitioner
import org.apache.spark.{SparkConf, SparkContext}

object task3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(s"${task3.getClass.getSimpleName}")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("file:///home/XXX/input3/people.txt") //XXX表示本人用户名
    val peopleInfo = lines.map(line => {
      val res = line.split("\\s+")
      if (res == null || res.length != 3) {
        null
      } else {
        val gender = res(1)
        val height = res(2).toDouble
        //返回性别与身高,过滤null值
        (gender, height)
      }
    }).filter(t => t != null)

    peopleInfo.map { case (gender, height) => {
      (gender, Result(gender, 1, height, height))
    }
      //比较两个对象的身高
    }.reduceByKey((res1, res2) => {
      val maxHeight = if (res1.maxHeight > res2.maxHeight) {
        res1.maxHeight
      } else res2.maxHeight
      val minHeight = if (res1.minHeight < res2.minHeight) {
        res1.minHeight
      } else res2.minHeight
      Result(res1.gender, res1.total + res2.total, maxHeight, minHeight)
    }).foreach(println)
    sc.stop()
  }
}

//根据需求定义一个模式匹配类
case class Result(gender: String, total: Int, maxHeight: Double, minHeight: Double) {
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
运行结果:

在这里插入图片描述

(四)、对于文件relationship.txt,数据形式示例如下:A<B,C,D,F,E,O,表示用户B,C,D,F,E,O关注了A,现要求分别计算每个用户被关注的数量以及每个用户关注的数量。
基本步骤同上
程序代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object task4 {
    def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("task4"))
        val dataFile = sc.textFile("file:///home/XXX/input4/relationship.txt")
        //被关注        
        val rdd1 = dataFile.map(x =>x.replaceAll("<",",").split(",")).filter(_.length > 0)
        val nag = rdd1.map(x=>{
                val person= x(0)
                val count1 = x.size - 1
                (person,count1)
        })
        //关注
        val rdd2 = dataFile.map(x =>x.split("<")(1)).filter(_.length > 0)
        val pos = rdd2.flatMap(line => line.split(","))
                        .map(x => (x,1))
                        .reduceByKey(_+_)//对key值相同的数据进行压缩

        nag.saveAsTextFile("file:///home/XXX/output4/result1") //XXX代表本人用户名
        pos.saveAsTextFile("file:///home/XXX/output4/result2")

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
运行结果:

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/592834
推荐阅读
相关标签
  

闽ICP备14008679号