赞
踩
现在有一组不同学科,不同老师的访问数据,需求是求出每个学科中排名前三的老师。
数据样例:http://bigdata.edu360.cn/laozhang
接下来用三种方法来计算:
工具:hadoop集群,zookeeper集群,spark集群
思路:
1.对数据进行切分,留下学科和对应的老师
2.将学科和老师当作key,和1组合在一起,形成((学科,老师),1)的元组
3.把学科和老师都相同的value进行聚合
4.按学科进行分组,形成(学科,((学科,老师),XX))的形式
5.把每一组的数据进行排序,取出前三名
6.收集结果
代码:
package cn.edu360.spark import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GroupFavTeacher1 { def main(args: Array[String]): Unit = { //用户自定义求前多少名 val topN = args(1).toInt val conf = new SparkConf().setAppName("GroupFavTeacher1").setMaster("local[4]") val sc = new SparkContext(conf) //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val subjectTeacherAndOne: RDD[((String, String),Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher),1) }) //和1组合在一起(不好,调用了两次map方法) // val unitmap: RDD[((String, String), Int)] = subjectAndTeacher.map((_,1)) //聚合,将老师和学科联合起来当作key val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_+_) //分组排序(按学科进行分组) //[学科,该学科对应的老师的数据] val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) //经过分组后,一个分区内可能有多个学科的数据,一个学科就是一个迭代器 //将每一个组拿出来进行操作 //为什么可以调用Scala的sortby方法呢? //因为一个学科的数据已经在一台机器上的一个Scala集合里面了 //toList是要把迭代器中指向的数据存放到磁盘中(本地列表) //所以soutBy是Scala中的方法 //reverse是要按照从大到小的顺序排 val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(topN)) //收集结果 //方便展示数据,实际开发中最好不要使用collect方法。 val r: Array[(String, List[((String, String), Int)])] = sorted.collect() //打印 println(r.toBuffer) //释放资源 sc.stop() } }
缺点:
由于在对每一组的数据进行排序时,执行了toList操作,这样是把这个组中的数据保存到本地磁盘了,在实际应用中,有可能一个组中的数据量很大,就会导致本地磁盘存不下的情况。
思路:
前三步同上
4.定义一个for循环,首先筛选出来一个学科的数据,然后只将这一个学科的数据排序,这里的排序用的是RDD上的sortBy,内存+磁盘的方式
代码:
package cn.edu360.spark import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GroupFavTeacher2 { def main(args: Array[String]): Unit = { val topN = args(1).toInt //提前将数据中的所有学科读出来 val subjects = Array("bigdata","javaee","php") val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]") val sc = new SparkContext(conf) //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //聚合,将学科和老师联合当作key val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_+_) //scala的集合排序是在内存中进行的,但是内存可能不够用 //可以调用RDD的sortBy方法,内存+磁盘进行排序 for (sb <- subjects){ //该RDD中对应的数据仅有一个学科的数据(因为过滤过了) val filtered: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb) //现在调用的是RDD上的sortBy方法,是内存+磁盘的方式 //这个take是一个Action,会触发任务提交,但是是在集群中提前下发指令,取出前几个数据后,再发送到driver端 val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2,false).take(topN) //打印 println(favTeacher.toBuffer) } //释放资源 sc.stop() } }
优点:
这种方式就弥补了第一种方法的缺陷,使用的数据量比较大的情况,因为这种方式触发了多次Action,有多少个学科就触发几次Action,避免了某个学科数据量特别大的情况。
思路:
前三步同上
4.自定义一个分区器,将一个学科的数据放到一个学科中,一个分区只放一个学科的信息
5.再将每个分区拿出来进行排序
代码:
package cn.edu360.spark import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object GroupFavTeacher3 { def main(args: Array[String]): Unit = { val topN = args(1).toInt val conf = new SparkConf().setAppName("GroupFavTeacher3").setMaster("local[4]") val sc = new SparkContext(conf) //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //聚合,将学科和老师联合当作key val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_+_) //计算有多少学科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //自定义一个分区器,并且按照指定的分区器进行分区 val sbPartitioner = new SubjectPartitioner(subjects) //partitionBy按照指定的分区规则进行分区 //调用partitionBy的时候RDD的key是(String,String) val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPartitioner) //如果一次拿出一个分区(可以操作一个分区的数据) val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => { //将迭代器转换成list,然后排序,再转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) //收集结果 val r: Array[((String, String), Int)] = sorted.collect() println(r.toBuffer) //释放资源 sc.stop() } } //自定义分区器 class SubjectPartitioner(sbs:Array[String]) extends org.apache.spark.Partitioner { //相当于主构造器(new的时候会执行一次) //用于存放规则的一个map val rules = new mutable.HashMap[String,Int]() var i = 0 for(sb <- sbs){ //rules(sb) = i rules.put(sb,i) i += 1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length //根据传入的key计算分区标号 //key是一个元组(String,String) override def getPartition(key: Any): Int = { //获取学科名称 val subject = key.asInstanceOf[(String,String)]._1 //根据规则计算分区编号 //这里是直接调用了apply方法,返回的是value值,就是分区编号 rules(subject) } }
优点:
更加清楚,一个分区中只放一个学科的信息
缺点:
1.和第一种方法相同,如果一个学科中的信息非常多,就会导致一台机器存放不下的情况。
2.shuffle次数太多,浪费资源
reduceByKey是一次shuffle过程(因为相同key的数据会聚合在一起)
partitionBy是一次shuffle过程(会根据规则将数据进行分区)
思路:
不用partitionBy,在reduceByKey中直接传入自定义分区器
先来看一下reduceByKey源码:
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
可以看出,reduceByKey可以直接传入两个参数,第一个参数可以是一个分区器,第二个参数是一个函数。
代码:
package cn.edu360.spark import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object GroupFavTeacher4 { def main(args: Array[String]): Unit = { val topN = args(1).toInt val conf = new SparkConf().setAppName("GroupFavTeacher4").setMaster("local[4]") val sc = new SparkContext(conf) //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //计算有多少学科 val subjects: Array[String] = subjectTeacherAndOne.map(_._1._1).distinct().collect() //自定义一个分区器,并且按照指定的分区器进行分区 val sbPartitioner = new SubjectPartitioner2(subjects) //聚合,聚合是就按照指定的分区器进行分区 //该RDD一个分区内仅有一个学科的数据 val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(sbPartitioner,_+_) //如果一次拿出一个分区(可以操作一个分区的数据) val sorted: RDD[((String, String), Int)] = reduced.mapPartitions(it => { //将迭代器转换成list,然后排序,再转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) //收集结果 // val r: Array[((String, String), Int)] = sorted.collect() // println(r.toBuffer) sorted.saveAsTextFile("/home/hadoop/local-file") //释放资源 sc.stop() } } //自定义分区器 class SubjectPartitioner2(sbs:Array[String]) extends org.apache.spark.Partitioner { //相当于主构造器(new的时候会执行一次) //用于存放规则的一个map val rules = new mutable.HashMap[String,Int]() var i = 0 for(sb <- sbs){ //rules(sb) = i rules.put(sb,i) i += 1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length //根据传入的key计算分区标号 //key是一个元组(String,String) override def getPartition(key: Any): Int = { //获取学科名称 val subject = key.asInstanceOf[(String,String)]._1 //根据规则计算分区编号 rules(subject) } }
第一种分组排序方法是最基础的,适用于数据量少的处理;
第二种过滤之后再分多次提交的方法适用于数据量很大的处理;
第三种方法很简洁,使用了自定义分区器,以及一次可以拿出一个分区进行处理。
总之,三种方法各有各个使用场景,相同的是在写程序时,要尽量减少shuffle过程,节省资源,这样计算速度会更快。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。