赞
踩
理解SPARK工作流程;
掌握SPARK基础编程方法;
操作系统:Linux(建议Ubuntu16.04);
Hadoop版本:2.7.1;
JDK版本:1.7或以上版本;
Java IDE:IDEA
spark版本:3.1.2
scala版本:2.12.10
对于两个输入文件A.txt 和 B.txt, 编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新的文件C.txt。
输入文件 A.txt 的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件 B.txt 的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件 A.txt 和 B.txt 合并得到的输出文件 C.txt 的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
(1)建立 input1 文件夹存放所要处理的文件。
(2)将 task1.scala 文件所在 project_one 项目打包成 jar,并将其复制到相关文件夹中,运行 project_one.jar,将结果存放在新文件夹 output1 中,如下图所示:
(3)查看运行结果,如下图所示:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object task1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("task2_1")
val sc = new SparkContext(conf)
val dataFile = "file:///home/XXX/input1/A.txt,file:///home/XXX/input1/B.txt" //XXX为本人用户名
val res = sc.textFile(dataFile,2) .filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new
HashPartitioner(1)).groupByKey().sortByKey().keys
res.saveAsTextFile("file:///home/XXX/output1/result") //XXX为本人用户名
}
}
每个输入文件表示班级学生某个学科的成绩,每行内筒由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序(推荐使用Scala语言)求出所有学生的平均成绩,并输入到一个新文件中。
输入文件如下:
Algorithm.txt:
小明 92
小红 87
小新 82
小丽 90
Database.txt:
小明 95
小红 81
小新 89
小丽 85
Python.txt:
小明 82
小红 83
小新 94
小丽 91
输出平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object task2 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("AvgScore") val sc = new SparkContext(conf) val dataFile = "file:///home/XXX/input2/Algorithm.txt,file:///home/XXX/input2/Database.txt,file:///home/XXX/input2/Python.txt" //XXX为自己用户名 val data = sc.textFile(dataFile,3) val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => { var n = 0 var sum = 0.0 for(i <- x._2){ sum = sum + i n = n +1 } val avg = sum/n val format = f"$avg%1.2f".toDouble(x._1,format) }) res.saveAsTextFile("file:///home/XXX/output2/result") //XXX为自己用户名 } }
形式如下:
0 F 168
1 F 141
2 M 184
3 F 186…
编写Spark应用程序,计算得到男性总数、女性总数、男性最高身高、女性最高身高、男性最低身高、女性最低身高、男性平均升高、女性平均身高。
import org.apache.spark.HashPartitioner import org.apache.spark.{SparkConf, SparkContext} object task3 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(s"${task3.getClass.getSimpleName}") .setMaster("local[*]") val sc = new SparkContext(conf) val lines = sc.textFile("file:///home/XXX/input3/people.txt") //XXX表示本人用户名 val peopleInfo = lines.map(line => { val res = line.split("\\s+") if (res == null || res.length != 3) { null } else { val gender = res(1) val height = res(2).toDouble //返回性别与身高,过滤null值 (gender, height) } }).filter(t => t != null) peopleInfo.map { case (gender, height) => { (gender, Result(gender, 1, height, height)) } //比较两个对象的身高 }.reduceByKey((res1, res2) => { val maxHeight = if (res1.maxHeight > res2.maxHeight) { res1.maxHeight } else res2.maxHeight val minHeight = if (res1.minHeight < res2.minHeight) { res1.minHeight } else res2.minHeight Result(res1.gender, res1.total + res2.total, maxHeight, minHeight) }).foreach(println) sc.stop() } } //根据需求定义一个模式匹配类 case class Result(gender: String, total: Int, maxHeight: Double, minHeight: Double) { }
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object task4 { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("task4")) val dataFile = sc.textFile("file:///home/XXX/input4/relationship.txt") //被关注 val rdd1 = dataFile.map(x =>x.replaceAll("<",",").split(",")).filter(_.length > 0) val nag = rdd1.map(x=>{ val person= x(0) val count1 = x.size - 1 (person,count1) }) //关注 val rdd2 = dataFile.map(x =>x.split("<")(1)).filter(_.length > 0) val pos = rdd2.flatMap(line => line.split(",")) .map(x => (x,1)) .reduceByKey(_+_)//对key值相同的数据进行压缩 nag.saveAsTextFile("file:///home/XXX/output4/result1") //XXX代表本人用户名 pos.saveAsTextFile("file:///home/XXX/output4/result2") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。