赞
踩
groupByKey
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
groupByKey会将RDD[key,value] 按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式, 有点类似于sql中的groupby,例如类似于mysql中的group_concat
scala版本
package nj.zb.kb09 import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("scalaRDD") val sc = new SparkContext(conf) val groupRDD = sc.parallelize(List( ("zs", 43), ("zs", 64), ("ls", 64), ("ls", 63), ("zl", 98) )).groupByKey() groupRDD.collect()foreach(println) // (zl,CompactBuffer(98)) // (ls,CompactBuffer(64, 63)) // (zs,CompactBuffer(43, 64)) groupRDD.collect()foreach(x=>{ val name=x._1 val score=x._2 score.foreach(score=>println(name,score)) }) // (zl,98) // (ls,64) // (ls,63) // (zs,43) // (zs,64) } }
java版本
package nj.zb.kb09; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import scala.Tuple3; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.Set; public class test { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Tuple2<String,Integer>> RDD = sc.parallelize(Arrays.asList( new Tuple2<String,Integer>("zs", 53), new Tuple2<String,Integer>("ls", 55), new Tuple2<String,Integer>("zl", 75), new Tuple2<String,Integer>("ww", 53), new Tuple2<String,Integer>("ls", 76) )); JavaPairRDD<String, Integer> mapRDD = JavaPairRDD.fromJavaRDD(RDD); Map<String, Iterable<Integer>> resultRDD = mapRDD.groupByKey().collectAsMap(); Set<String> keySet = resultRDD.keySet(); for (String s:keySet ) { System.out.println(s+","+resultRDD.get(s)); } // zl,[75] // ww,[53] // zs,[53] // ls,[55, 76] System.out.println("**************************"); for (String key:keySet) { Iterable<Integer> values = resultRDD.get(key); for (Integer value : values) { System.out.println(key+","+value); } } // zl,75 // ww,53 // zs,53 // ls,55 // ls,76 System.out.println("***************************"); for (String s : keySet) { Iterable<Integer> integers = resultRDD.get(s); Iterator<Integer> iterator = integers.iterator(); while (iterator.hasNext()){ System.out.println(s+","+iterator.next()); } } // zl,75 // ww,53 // zs,53 // ls,55 // ls,76 System.out.println("***************************"); Iterator<String> iterator = keySet.iterator(); while (iterator.hasNext()){ String key = iterator.next(); Iterable<Integer> values = resultRDD.get(key); Iterator<Integer> iterator1 = values.iterator(); while (iterator1.hasNext()){ System.out.println(key+","+iterator1.next()); } } // zl,75 // ww,53 // zs,53 // ls,55 // ls,76 System.out.println("*************************"); Iterator<String> iterator1 = keySet.iterator(); while (iterator1.hasNext()){ String key = iterator1.next(); Iterable<Integer> integers = resultRDD.get(key); for (Integer i: integers) { System.out.println(key+","+i); } } // zl,75 // ww,53 // zs,53 // ls,55 // ls,76 } }
cogroup
groupByKey是对单个 RDD 的数据进行分组,还可以使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组
例如
RDD1.cogroup(RDD2) 会将RDD1和RDD2按照相同的key进行分组,得到(key,RDD[key,Iterable[value1],Iterable[value2]])的形式
cogroup也可以多个进行分组
例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])
scala版本
package nj.zb.kb09 import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("scalaRDD") val sc = new SparkContext(conf) val score1 = sc.parallelize(List( ("zs", 43), ("zs", 64), ("ls", 64), ("ls", 63), ("zl", 98))) val score2 = sc.parallelize(List( ("ww", 43), ("zs", 64), ("zl", 64), ("zs", 63), ("ww", 98))) val score3 = sc.parallelize(List( ("ls", 43), ("ww", 64), ("zs", 64), ("zl", 63), ("ww", 98))) score1.cogroup(score2,score3).collect()foreach(println) // (zl,(CompactBuffer(98),CompactBuffer(64),CompactBuffer(63))) // (ww,(CompactBuffer(),CompactBuffer(43, 98),CompactBuffer(64, 98))) // (ls,(CompactBuffer(64, 63),CompactBuffer(),CompactBuffer(43))) // (zs,(CompactBuffer(43, 64),CompactBuffer(64, 63),CompactBuffer(64))) } }
java版本
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import scala.Tuple3; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.Set; public class test { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Tuple2<String,Integer>> RDD1 = sc.parallelize(Arrays.asList( new Tuple2<String,Integer>("zs", 53), new Tuple2<String,Integer>("ls", 55), new Tuple2<String,Integer>("zl", 75), new Tuple2<String,Integer>("ww", 53), new Tuple2<String,Integer>("ls", 76) )); JavaRDD<Tuple2<String,Integer>> RDD2 = sc.parallelize(Arrays.asList( new Tuple2<String,Integer>("zs", 53), new Tuple2<String,Integer>("ls", 55), new Tuple2<String,Integer>("zl", 75), new Tuple2<String,Integer>("ww", 53), new Tuple2<String,Integer>("ls", 76) )); JavaRDD<Tuple2<String,Integer>> RDD3 = sc.parallelize(Arrays.asList( new Tuple2<String,Integer>("zs", 53), new Tuple2<String,Integer>("ls", 55), new Tuple2<String,Integer>("zl", 75), new Tuple2<String,Integer>("ww", 53), new Tuple2<String,Integer>("ls", 76) )); JavaPairRDD<String, Integer> mapRDD1 = JavaPairRDD.fromJavaRDD(RDD1); JavaPairRDD<String, Integer> mapRDD2 = JavaPairRDD.fromJavaRDD(RDD2); JavaPairRDD<String, Integer> mapRDD3 = JavaPairRDD.fromJavaRDD(RDD3); Map<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> result = mapRDD1.cogroup(mapRDD2, mapRDD3).collectAsMap(); for (String s:result.keySet()) { System.out.println(s+","+result.get(s)); } // zl,([75],[75],[75]) // ww,([53],[53],[53]) // zs,([53],[53],[53]) // ls,([55, 76],[55, 76],[55, 76]) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。