当前位置:   article > 正文

spark RDD方法实操

spark RDD方法实操

package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(('a',1),('b',99),('c',100),('d',101)))
    val rdd2 = sc.parallelize(List(('e',120),('f',150)))
    val rdd3 = rdd1.union(rdd2)
    rdd3.filter(_._2 >= 100).collect
    rdd3.filter(x => x._2 >= 100).collect.foreach(println)
    val rdd4 = sc.parallelize(List(('a',1),('b',99),('c',100),('d',101),('c',100)))
    rdd4.filter(_._2 >= 100).collect
    val rdd5 = rdd4.distinct()
    rdd5.filter(x => x._2 >= 100).collect.foreach(println)
  }
}

创建rdd1与rdd2,用union()方法合并rdd1与rdd2放在rdd3中

对rdd3进行filter()方法过滤,去除100以下的数据

创建rdd4,先用filter()方法过滤去除100以下的数据,再用distinct()方法去重输出

package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}

object a2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(1,2,3))
    val rdd2 = sc.parallelize(List(3,4,5,6))
    val rdd3 = sc.parallelize(List('a','b','c'))
    val rdd4 = rdd1.union(rdd2)
    val rdd5 = rdd4.filter(_ >= 3)
    val rdd6 = rdd5.distinct()
    rdd6.cartesian(rdd3).collect.foreach(println)
  }
}
创建rdd1与rdd2,用union()方法合并rdd1与rdd2放在rdd4中

创建rdd5,用filter()方法过滤去除3以下的数据

创建rdd6,先用distinct()方法去再用cartesian()方法输出笛卡尔积

package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}

object a4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(
      List(('a',1),('a',2),('b',1),('c',1),('c',1))
    )
    val re_rdd1 = rdd1.reduceByKey((a,b) => a+b)
    re_rdd1.collect.foreach(println)
    val g_rdd1 = rdd1.groupByKey()
    g_rdd1.collect.foreach(println)
    g_rdd1.map(x => (x._1, x._2.size)).collect.foreach(println)
  }
}
创建re_rdd1,用reduceByKey()方法合并统计键相同的值,将值相加输出

创建g_rdd1,用groupByKey()方法对具有相同的值进行分组,将相同键的值的数量输出

package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val first_half = sc.textFile("D:\\Employee_salary_first_half.csv")
    val second_half = sc.textFile("D:\\Employee_salary_first_half.csv")
    val drop_first = first_half.mapPartitionsWithIndex((ix ,it) => {
      if (ix == 0) it.drop(1)
      it
    })
    val drop_second = second_half.mapPartitionsWithIndex((ix, it) => {
      if (ix == 0) it.drop(1)
      it
    })
    val split_first = drop_first.map(
      line => {val data = line.split(",");(data(1), data(6).toInt)}
    )
    val split_second = drop_second.map(
      line => {val data = line.split(",");(data(1), data(6).toInt)}
    )
    val filter_first = split_first.filter(x => x._2 > 200000).map(x => x._1)
    val filter_second = split_second.filter(x => x._2 > 200000).map(x => x._1)
    val name = filter_first.union(filter_second).distinct()
    name.collect.foreach(println)
  }
}
用textFile()方法将文件内容创建为rdd

用map方法加入逗号

用filter方法过滤小于20万的员工

用distinct()方法去重

最后输出结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/634073
推荐阅读
相关标签
  

闽ICP备14008679号