赞
踩
使用Scala语言编写Spark应用程序实现数据去重(需要使用编译打包工具Maven或sbt进行编译打包)。
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20180901 x
20180902 y
20180903 x
20180904 y
20180905 z
20180906 z
输入文件B的样例如下:
20180901 y
20180902 y
20180903 x
20180904 z
20180905 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20180901 x
20180901 y
20180902 y
20180903 x
20180904 y
20180904 z
20180905 y
20180905 z
20180906 z
可以在根目录文件夹下以此创建目录及文件,形成最后的目录树
mkdir sparkapp4
cd ~/sparkapp4
mkdir data
将题目要求去重的两个文本文件放到data文件夹下
vim data/A.txt
vim data/B.txt
mkdir -p src/main/scala
vim src/main/scala/De_duplication.scala
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object De_duplication { def main(args: Array[String]) { val conf = new SparkConf().setAppName("De_duplication") val sc = new SparkContext(conf) val dataFile = "file:///home/hadoop/sparkapp4/data" val data = sc.textFile(dataFile,2) val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys res.collect().foreach(println) //这句是运行是为了用grep筛选结果,可删,不影响结果 res.saveAsTextFile("file:///home/hadoop/sparkapp4/result") } }
这里需要注意的点有:
vim simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
注意scalaversion和libraryDependencies中的版本号,不清楚的可以运行一下spark-shell查看
cd /usr/local/spark
/bin/spark-shell
最后检查目录树上面的结构即可
在 sparkapp4 文件夹下
/usr/local/sbt/sbt package
/usr/local/spark/bin/spark-submit --class "De_duplication" ~/sparkapp4/target/scala-2.11/simple-project_2.11-1.0.jar 2>&1 | grep "2018"
注意:
结果正确!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。