当前位置:   article > 正文

Spark RDD算子键值对分组操作groupByKey,cogroup(scala、java版本)_scala groupbykey(10)

scala groupbykey(10)

groupByKey

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  • 1
  • 2
  • 3

groupByKey会将RDD[key,value] 按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式, 有点类似于sql中的groupby,例如类似于mysql中的group_concat

scala版本

package nj.zb.kb09

import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("scalaRDD")
    val sc = new SparkContext(conf)

    val groupRDD = sc.parallelize(List(
      ("zs", 43), ("zs", 64), ("ls", 64), ("ls", 63), ("zl", 98)
    )).groupByKey()
    groupRDD.collect()foreach(println)
    // (zl,CompactBuffer(98))
    // (ls,CompactBuffer(64, 63))
    // (zs,CompactBuffer(43, 64))
    
    groupRDD.collect()foreach(x=>{
      val name=x._1
      val score=x._2
    score.foreach(score=>println(name,score))
    })
    // (zl,98)
    // (ls,64)
    // (ls,63)
    // (zs,43)
    // (zs,64)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

java版本

package nj.zb.kb09;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class test {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Tuple2<String,Integer>> RDD = sc.parallelize(Arrays.asList(
                new Tuple2<String,Integer>("zs", 53),
                new Tuple2<String,Integer>("ls", 55),
                new Tuple2<String,Integer>("zl", 75),
                new Tuple2<String,Integer>("ww", 53),
                new Tuple2<String,Integer>("ls", 76)
        ));
        JavaPairRDD<String, Integer> mapRDD = JavaPairRDD.fromJavaRDD(RDD);
        Map<String, Iterable<Integer>> resultRDD = mapRDD.groupByKey().collectAsMap();
        Set<String> keySet = resultRDD.keySet();
        for (String s:keySet
             ) {
            System.out.println(s+","+resultRDD.get(s));
        }
        // zl,[75]
        // ww,[53]
        // zs,[53]
        // ls,[55, 76]


        System.out.println("**************************");
        for (String key:keySet) {
            Iterable<Integer> values = resultRDD.get(key);
            for (Integer value : values) {
                System.out.println(key+","+value);
            }
        }
        // zl,75
        // ww,53
        // zs,53
        // ls,55
        // ls,76

        System.out.println("***************************");
        for (String s : keySet) {
            Iterable<Integer> integers = resultRDD.get(s);
            Iterator<Integer> iterator = integers.iterator();
            while (iterator.hasNext()){
                System.out.println(s+","+iterator.next());
            }
        }
        // zl,75
        // ww,53
        // zs,53
        // ls,55
        // ls,76

        System.out.println("***************************");
        Iterator<String> iterator = keySet.iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            Iterable<Integer> values = resultRDD.get(key);
            Iterator<Integer> iterator1 = values.iterator();
            while (iterator1.hasNext()){
                System.out.println(key+","+iterator1.next());
            }
        }
        // zl,75
        // ww,53
        // zs,53
        // ls,55
        // ls,76

        System.out.println("*************************");
        Iterator<String> iterator1 = keySet.iterator();
        while (iterator1.hasNext()){
            String key = iterator1.next();
            Iterable<Integer> integers = resultRDD.get(key);
            for (Integer i: integers) {
                System.out.println(key+","+i);
            }
        }
        // zl,75
        // ww,53
        // zs,53
        // ls,55
        // ls,76
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

cogroup

groupByKey是对单个 RDD 的数据进行分组,还可以使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组
例如
RDD1.cogroup(RDD2) 会将RDD1和RDD2按照相同的key进行分组,得到(key,RDD[key,Iterable[value1],Iterable[value2]])的形式
cogroup也可以多个进行分组
例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])

scala版本

package nj.zb.kb09

import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("scalaRDD")
    val sc = new SparkContext(conf)
    
    val score1 = sc.parallelize(List(
      ("zs", 43), ("zs", 64), ("ls", 64), ("ls", 63), ("zl", 98)))
    val score2 = sc.parallelize(List(
      ("ww", 43), ("zs", 64), ("zl", 64), ("zs", 63), ("ww", 98)))
    val score3 = sc.parallelize(List(
      ("ls", 43), ("ww", 64), ("zs", 64), ("zl", 63), ("ww", 98)))
    score1.cogroup(score2,score3).collect()foreach(println)
    
    // (zl,(CompactBuffer(98),CompactBuffer(64),CompactBuffer(63)))
    // (ww,(CompactBuffer(),CompactBuffer(43, 98),CompactBuffer(64, 98)))
    // (ls,(CompactBuffer(64, 63),CompactBuffer(),CompactBuffer(43)))
    // (zs,(CompactBuffer(43, 64),CompactBuffer(64, 63),CompactBuffer(64)))
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class test {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Tuple2<String,Integer>> RDD1 = sc.parallelize(Arrays.asList(
                new Tuple2<String,Integer>("zs", 53),
                new Tuple2<String,Integer>("ls", 55),
                new Tuple2<String,Integer>("zl", 75),
                new Tuple2<String,Integer>("ww", 53),
                new Tuple2<String,Integer>("ls", 76)
        ));
        JavaRDD<Tuple2<String,Integer>> RDD2 = sc.parallelize(Arrays.asList(
                new Tuple2<String,Integer>("zs", 53),
                new Tuple2<String,Integer>("ls", 55),
                new Tuple2<String,Integer>("zl", 75),
                new Tuple2<String,Integer>("ww", 53),
                new Tuple2<String,Integer>("ls", 76)
        ));
        JavaRDD<Tuple2<String,Integer>> RDD3 = sc.parallelize(Arrays.asList(
                new Tuple2<String,Integer>("zs", 53),
                new Tuple2<String,Integer>("ls", 55),
                new Tuple2<String,Integer>("zl", 75),
                new Tuple2<String,Integer>("ww", 53),
                new Tuple2<String,Integer>("ls", 76)
        ));
        JavaPairRDD<String, Integer> mapRDD1 = JavaPairRDD.fromJavaRDD(RDD1);
        JavaPairRDD<String, Integer> mapRDD2 = JavaPairRDD.fromJavaRDD(RDD2);
        JavaPairRDD<String, Integer> mapRDD3 = JavaPairRDD.fromJavaRDD(RDD3);

        Map<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> result
                = mapRDD1.cogroup(mapRDD2, mapRDD3).collectAsMap();
        for (String s:result.keySet()) {
            System.out.println(s+","+result.get(s));
        }
        // zl,([75],[75],[75])
        // ww,([53],[53],[53])
        // zs,([53],[53],[53])
        // ls,([55, 76],[55, 76],[55, 76])
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/466681
推荐阅读
相关标签
  

闽ICP备14008679号