赞
踩
在实际业务场景中,二次排序真的非常重要,并且经常遇到,下面来模拟一下以下的场景,实现对电影评分数据进行二次排序,以TimeStamp和Rating两个维度降序排列,值得一提的是,java版本的二次排序非常繁琐,而使用scala实现就简捷,首先我们需要一个继承Ordered和Serializable的类。
数据:
ratings.txt (UserID::MovieID::Rating::TimeStamp)
- 001::01::3::1046454590
- 002::02::3::1046454533
- 001::03::5::1046454523
- 003::02::4::1046454576
- 002::07::3::1046454545
- 001::06::3::1046454509
class SecondarySortKey(val first:Double,val second:Double) extends Ordered[SecondarySortKey] with Serializable{ //这个类中重写了compare方法 override def compare(that: SecondarySortKey): Int = { //既然是二次排序,就先判断第一个字段是否相等,如果不相等,就直接排序 if(this.first-that.first!=0){ (this.first-that.first).toInt }else{ //如果第一个字段相等,就比较第二个字段,若想要实现多次排序,也按照这个模式继续比较下去 if(this.second-that.second>0){ //ceil向上取整 Math.ceil(this.second-that.second).toInt }else if(this.second-that.second<0){ Math.floor(this.second-that.second).toInt }else{ (this.second-that.second).toInt } } } }
然后再加载数据为RDD,再把RDD的每条记录,里面想要的字段封装到上面定义的类中作为key,把该条记录整体作为value
结果:
object movies { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val sparkConf = new SparkConf().setAppName("movies").setMaster("local[*]") val sc = new SparkContext(sparkConf) val ratingsRDD = sc.textFile("C:\\Users\\xxx\\Desktop\\ratings.txt") val moviesRDD = sc.textFile("C:\\Users\\xxx\\Desktop\\movies.txt") val usersRDD = sc.textFile("C:\\Users\\xxx\\Desktop\\users.txt") val pairWithSortKey=ratingsRDD.map(line=>{ val splited=line.split("::") (new SecondarySortKey(splited(3).toDouble,splited(2).toDouble),line) }) //直接调用sortBykey,此时会按照之前实现的compare方法排序 val sorted=pairWithSortKey.sortByKey(false) val sortResult=sorted.map(sortedline=>sortedline._2).take(10) sortResult.foreach(println(_)) sc.stop() } }
- 001::01::3::1046454590
- 003::02::4::1046454576
- 002::07::3::1046454545
- 002::02::3::1046454533
- 001::03::5::1046454523
- 001::06::3::1046454509
取出排序后的RDD的value,此时这些记录已经是按照时间戳和评分排好序的,从上面结果可以看出已经按照timestamp和评分排序排列了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。