赞
踩
groupBy算子
1)函数签名
def groupBy[K](f:T=>K)(implicit kt:ClassTag[K]):RDD[(K,Iterable[T])]
2)功能说明
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
3)需求说明:创建一个RDD,按照元素模以2的值进行分组。
4)具体实现:
package com.huc.Spark.value import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test06_groupby { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.使用Scala进行spark编程 // 创建一个RDD val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2) // 将每个分区的数据放到一个数组并收集到Driver端打印 rdd.groupBy(i => i).collect().foreach(println) rdd.groupBy(_ % 2).collect().foreach(println) // 创建一个RDD val rdd1: RDD[String] = sc.makeRDD(List("hello", "hive", "hadoop", "spark", "scala")) // 按照首字母第一个单词相同的分组 println("--------------") rdd1.groupBy(_.head).collect().foreach(println) rdd1.groupBy(str => str.substring(0, 1)).collect().foreach(println) // 4.关闭连接 sc.stop() } }
package com.huc.Spark1.value import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test06_groupby { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.使用Scala进行spark编程 val add: RDD[String] = sc.textFile("input1/1.txt") val value: RDD[(String, Int)] = add.flatMap(_.split(" ").map((_, 1))) val value2: RDD[(String, Iterable[(String, Int)])] = value.groupBy(tuple => tuple._1) val value1: RDD[(String, Int)] = value2.map(tuple => (tuple._1, tuple._2.size)) value1.collect().foreach(println) // 匿名函数模拟匹配的写法 value2.map(tuple => tuple match { case (k, v) => (k, v.size) }).collect().foreach(println) // 偏函数的写法 能够省略外部的小括号,大括号不可以省略,因为大括号表示偏函数 value2.map { case (k, v) => (k, v.size) }.collect().foreach(println) //4.关闭连接 sc.stop() } }
package com.huc.Spark.value import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test07_groupby_wordcount { def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.使用Scala进行spark编程 // 创建一个RDD val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark", "Hello World")) // 将字符串拆分成一个一个的单词 并进行转换:word=>(word,1) val value: RDD[(String, Int)] = rdd.flatMap((s: String) => s.split(" ").map((_, 1))) // 将转换后的数组分组 val value1: RDD[(String, Iterable[(String, Int)])] = value.groupBy((tuple: (String, Int)) => tuple._1) // 将分组后的数据进行结构的转换 val value2: RDD[(String, Int)] = value1.mapValues((list: Iterable[(String, Int)]) => { list.foldLeft(0)((res: Int, elem: (String, Int)) => res + elem._2) }) value2.collect().foreach(println) println(value2.collect()) println(value2.collect().mkString(",")) //4.关闭连接 sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。