6(好)、两个文件合并的问题:给定a、b两个文件,各存放50亿个url,每个url各占用64字节,内存限制是4G,如何找出a、b文件共同的url?
1)主要的思想是把文件分开进行计算,在对每个文件进行对比,得出相同的URL,因为以上说是含有相同的URL所以不用考虑数据倾斜的问题。详细的解题思路如下:
a、可以估计每个文件的大小为5G*64=300G,远大于4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
b、遍历文件a,对每个url求取hash(url)%1000,然后根据所得值将url分别存储到1000个小文件(设为a0,a1,...a999)当中。这样每个小文件的大小约为300M。
b、遍历文件b,采取和a相同的方法将url分别存储到1000个小文件(b0,b1....b999)中。这样处理后,所有可能相同的url都在对应的小文件(a0 vs b0, a1 vs b1....a999 vs b999)当中,不对应的小文件(比如a0 vs b99)不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。
c、比如对于a0 vs b0,我们可以遍历a0,将其中的url存储到hash_map当中。然后遍历b0,如果url在hash_map中,则说明此url在a和b中同时存在,保存到文件中即可。
d、如果分成的小文件不均匀,导致有些小文件太大(比如大于2G),可以考虑将这些太大的小文件再按类似的方法分成小小文件即可
//各个文件的ip数
object Demo2 {
val cachedThreadPool = Executors.newCachedThreadPool()
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo2").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val hdpConf: Configuration = new Configuration
val fs: FileSystem = FileSystem.get(hdpConf)
val listFiles: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path("f://txt/2/"), true)
while (listFiles.hasNext) {
val fileStatus = listFiles.next
val pathName = fileStatus.getPath.getName
cachedThreadPool.execute(new Runnable() {
override def run(): Unit = {
println("=======================" + pathName)
analyseData(pathName, sc)
}
})
}
}
def analyseData(pathName: String, sc: SparkContext): Unit = {
val data: RDD[String] = sc.textFile("f://txt/2/" + pathName)
val dataArr: RDD[Array[String]] = data.map(_.split(" "))
val ipAndOne: RDD[(String, Int)] = dataArr.map(x => {
val ip = x(0)
(ip, 1)
})
val counts: RDD[(String, Int)] = ipAndOne.reduceByKey(_ + _)
val sortedSort: RDD[(String, Int)] = counts.sortBy(_._2, false)
sortedSort.saveAsTextFile("f://txt/3/" + pathName)
}
}
2)出现在b.txt而没有出现在a.txt的ip
package cn.bigdata
import java.util.concurrent.Executors
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/*
* 出现在b.txt而没有出现在a.txt的ip
*/
object Demo3 {
val cachedThreadPool = Executors.newCachedThreadPool()
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Demo3").setMaster("local")
val sc = new SparkContext(conf)
val data_a = sc.textFile("f://txt/2/a.txt")
val data_b = sc.textFile("f://txt/2/b.txt")
val splitArr_a = data_a.map(_.split(" "))
val ip_a: RDD[String] = splitArr_a.map(x => x(0))
val splitArr_b = data_b.map(_.split(" "))
val ip_b: RDD[String] = splitArr_b.map(x => x(0))
val subRdd: RDD[String] = ip_b.subtract(ip_a)
subRdd.saveAsTextFile("f://txt/4/")
}
}
3)
package cn.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable.Set
/*
* 每个user出现的次数以及每个user对应的ip数
*/
object Demo4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Demo4").setMaster("local")
val sc = new SparkContext(conf)
val data: RDD[String] = sc.textFile("f://txt/5/")
val lines = data.map(_.split(" "))
val userIpOne = lines.map(x => {
val ip = x(0)
val user = x(1)
(user, (ip, 1))
})