赞
踩
目录
(2)在/usr/local/spark/mycode/AvgScore目录下新建simple.sbt,复制如下代码:
(3)在/usr/local/spark/mycode/AvgScore目录下执行如下命令打包程序:
(4)在/usr/local/spark/mycode/AvgScore 目录下执行如下命令提交程序:
(5)在/usr/local/spark/mycode/AvgScore/result 目录下即可得到结果文件。
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例供参考。
小明 | 92 |
小红 | 87 |
小新 | 82 |
小丽 | 90 |
小明 | 95 |
小红 | 81 |
小新 | 89 |
小丽 | 85 |
小明 | 83 |
小红 | 82 |
小新 | 94 |
小丽 | 91 |
小明 | 89.67 |
小红 | 83.67 |
小新 | 88.33 |
小丽 | 88.67 |
1.进入到mycode目录,新建RemDup目录(没有mycode目录可以新建一个)
再进入到RemDup目录中去
2.新建datas目录,写入文件algorithm、database、python:
写入文件:
文件内容为:
↑这个内容很好看,但是很愚蠢,不知道的话这里错误会浪费几个小时(肯定不是我)
注意这里 algorithm、database 和 python 文件内容不能有多余的换行符或者空格!
所以内容复制粘贴以下内容 :
Algorithm
- 小明 92
- 小红 87
- 小新 82
- 小丽 90
Database
- 小明 95
- 小红 81
- 小新 89
- 小丽 85
Python
- 小明 82
- 小红 83
- 小新 94
- 小丽 91
书上是 avgscore ->AvgScore
//书上的源码
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object AvgScore {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AvgScore")
val sc = new SparkContext(conf)
val dataFile = "file:///home/hadoop/data"
val data = sc.textFile(dataFile,3)
val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => {
var n = 0
var sum = 0.0
for(i <- x._2){
sum = sum + i
n = n +1
}
val avg = sum/n
val format = f"$avg%1.2f".toDouble
(x._1,format)
})
res.saveAsTextFile("result")
}
}
- //实际运行代码
- //导入必要的 Spark 库:
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark.HashPartitioner
-
- //定义应用程序对象和入口点:
- object AvgScore {
- def main(args: Array[String]) {
-
- //设置 Spark 应用程序的配置:
- val conf = new SparkConf().setAppName("AvgScore")
- val sc = new SparkContext(conf)
-
- //定义数据文件的路径并加载数据:
- val dataFile = "file:///usr/local/spark/mycode/AvgScore/datas"
-
- /*
- 这里使用 sc.textFile() 方法加载一个文本文件,
- 3 参数指定每个分区包含的行数。
- */
- val data = sc.textFile(dataFile,3)
-
- //定义一个计算平均分的函数,并应用到数据集上:
- val res = data
-
- //filter() 方法通过 trim() 函数过滤空行
- .filter(_.trim().length > 0)
-
- /*
- map() 方法转换数据集,每行数据被转换成一个元组,包含学生姓名和成绩。
- split() 方法用于将行拆分为数组
- trim() 方法用于去除多余的空格,并将成绩转换为整数。
- */
- .map(line => (line.split(" ")(0).trim(), line.split(" ")(1).trim().toInt))
-
- //partitionBy() 方法将结果分区。
- .partitionBy(new HashPartitioner(1))
-
- //groupByKey() 方法将数据集按键(学生姓名)分组。
- .groupByKey()
-
- /*map() 方法遍历分组,计算平均成绩,并格式化为两位小数。
- 函数返回一个新的元组,包含学生姓名和平均成绩。*/
- .map(x => {
- var n = 0
- var sum = 0.0
- for (i <- x._2) {
- sum = sum + i
- n = n + 1
- }
- val avg = sum / n
- val format = f"$avg%1.2f".toDouble
- (x._1, format)
- })
-
- /*
- 将结果保存到输出文件:
- 这里使用 saveAsTextFile() 方法将结果保存到一个文本文件。
- Spark 会自动将数据保存在多个分区中。
- */
- res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")
- }
- }
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
这里我先查看scala 和spark版本并修改
scala 2.11.12 => 2.11.6
spark 2.4.0 => 2.4.0
修改好内容如下:
sudo /usr/local/sbt/sbt package
打包过程很长
结果如下:
/usr/local/spark/bin/spark-submit --class "AvgScore" /usr/local/spark/mycode/avgscore/target/scala-2.11/simple-project_2.11-1.0.jar
scala运行版本和查看版本不一致,让人思考simple.sbt底下用哪个版本合适
实验结果过后表明,两个版本都可以
scala2.11.12 底下的运行结果
查看结果
据此编写独立应用程序实现求平均值问题完成
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。