当前位置:   article > 正文

大数据学习之Spark-core常用代码示例_spark应用实例代码

spark应用实例代码

一、查看分区表

  1. packagecom.shujia.spark.core
  2. importorg.apache.spark.rdd.RDD
  3. importorg.apache.spark.{SparkConf,SparkContext}
  4. objectDemo2Partition{
  5. defmain(args:Array[String]):Unit={
  6. valconf=newSparkConf()
  7. conf.setMaster("local")
  8. conf.setAppName("Demo2Partition")
  9. valsc=newSparkContext(conf)
  10. /**
  11. * 读取目录,如果读取的是目录,目录下不能有子目录
  12. *
  13. */
  14. /**
  15. * RDDd的分区数,分区数越多,并行度越高,在资源充足的情况下效率越高
  16. * 1、读取文件,默认等于切片的数量
  17. * 2、读取文件可以设置最新分区数minPartitions,
  18. * 控制rdd的分区数,,只能在切片数的基础上,增加分区数,不能减少分区数
  19. * 原则是需要保证每一个分区的数据量差不多
  20. *
  21. * 3、窄依赖的算子不能改变分区数,默认等于前一个rdd的分区数
  22. * 4、宽依赖算子分区数默认等于前一个RDD,也可以手动设置分区数
  23. *
  24. *
  25. */
  26. vallinesRDD:RDD[String]=sc.textFile("data/words",7)
  27. println(s"linesRDD分区数:${linesRDD.getNumPartitions}")
  28. valwordsRDD:RDD[String]=linesRDD.flatMap(_.split(","))
  29. println(s"wordsRDD分区数:${wordsRDD.getNumPartitions}")
  30. valkvRDD:RDD[(String, Iterable[String])]=wordsRDD.groupBy(w=>w,numPartitions=10)
  31. println(s"kvRDD分区数:${kvRDD.getNumPartitions}")
  32. valwordcountRDD:RDD[(String, Int)]=kvRDD.map(kv=>(kv._1,kv._2.size))
  33. println(s"wordcountRDD分区数:${wordcountRDD.getNumPartitions}")
  34. wordcountRDD.saveAsTextFile("data/wc")
  35. }
  36. }

二、map读取一行

  1. packagecom.shujia.spark.core
  2. importorg.apache.spark.rdd.RDD
  3. importorg.apache.spark.{SparkConf,SparkContext}
  4. objectDemo3Map{
  5. defmain(args:Array[String]):Unit={
  6. valconf=newSparkConf()
  7. conf.setAppName("map")
  8. conf.setMaster("local")
  9. valsc=newSparkContext(conf)
  10. //读取学生表的数据
  11. valstudentsRDD:RDD[String]=sc.textFile("data/students.txt")
  12. /**
  13. * map:将rdd的数据一条一条传递给后面的函数,函数的返回值构建成一个新的RDD
  14. * map算子不会改变总的数据行数
  15. *
  16. */
  17. valclazzRDD:RDD[String]=studentsRDD.map((student:String)=>student.split(",").last)
  18. clazzRDD.foreach(println)
  19. }
  20. }

三、Filter 过滤

  1. packagecom.shujia.spark.core
  2. importorg.apache.spark.{SparkConf,SparkContext}
  3. importorg.apache.spark.rdd.RDD
  4. objectDemo4Filter{
  5. defmain(args:Array[String]):Unit={
  6. valconf=newSparkConf()
  7. conf.setAppName("map")
  8. conf.setMaster("local")
  9. valsc=newSparkContext(conf)
  10. //读取学生表的数据
  11. valstudentsRDD:RDD[String]=sc.textFile("data/students.txt")
  12. /**
  13. * Filter:将RDD的数据一条一条传递给函数,如果函数返回true保留数据,如果函数返回false过滤数据
  14. *
  15. * filter会减少RDD的数据行数
  16. *
  17. */
  18. valfilterRDD:RDD[String]=studentsRDD.filter((student:String)=>{
  19. valgender:String=student.split(",")(3)
  20. "男".equals(gender)
  21. })
  22. filterRDD.foreach(println)
  23. }
  24. }

四、FlatMap一行转多行

  1. 相当于sql种 的explod
  2. packagecom.shujia.spark.core
  3. importorg.apache.spark.rdd.RDD
  4. importorg.apache.spark.{SparkConf,SparkContext}
  5. objectDemo5FlatMap{
  6. defmain(args:Array[String]):Unit={
  7. valconf=newSparkConf()
  8. conf.setAppName("Demo5FlatMap")
  9. conf.setMaster("local")
  10. valsc=newSparkContext(conf)
  11. vallinesRDD:RDD[String]=sc.textFile("data/words.txt")
  12. /**
  13. * flatMap:一条一条将RDD的数据传递给后面的函数,函数的返回值必须是一个集合,最后会将集合展开构建成一个新的RDD
  14. * 传入一行返回多行
  15. *
  16. */
  17. valwordsRDD:RDD[String]=linesRDD.flatMap(line=>line.split(","))
  18. wordsRDD.foreach(println)
  19. }
  20. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/80240
推荐阅读
相关标签
  

闽ICP备14008679号