赞
踩
- packagecom.shujia.spark.core
- importorg.apache.spark.rdd.RDD
- importorg.apache.spark.{SparkConf,SparkContext}
- objectDemo2Partition{
- defmain(args:Array[String]):Unit={
- valconf=newSparkConf()
- conf.setMaster("local")
- conf.setAppName("Demo2Partition")
- valsc=newSparkContext(conf)
- /**
- * 读取目录,如果读取的是目录,目录下不能有子目录
- *
- */
- /**
- * RDDd的分区数,分区数越多,并行度越高,在资源充足的情况下效率越高
- * 1、读取文件,默认等于切片的数量
- * 2、读取文件可以设置最新分区数minPartitions,
- * 控制rdd的分区数,,只能在切片数的基础上,增加分区数,不能减少分区数
- * 原则是需要保证每一个分区的数据量差不多
- *
- * 3、窄依赖的算子不能改变分区数,默认等于前一个rdd的分区数
- * 4、宽依赖算子分区数默认等于前一个RDD,也可以手动设置分区数
- *
- *
- */
- vallinesRDD:RDD[String]=sc.textFile("data/words",7)
- println(s"linesRDD分区数:${linesRDD.getNumPartitions}")
- valwordsRDD:RDD[String]=linesRDD.flatMap(_.split(","))
- println(s"wordsRDD分区数:${wordsRDD.getNumPartitions}")
- valkvRDD:RDD[(String, Iterable[String])]=wordsRDD.groupBy(w=>w,numPartitions=10)
- println(s"kvRDD分区数:${kvRDD.getNumPartitions}")
- valwordcountRDD:RDD[(String, Int)]=kvRDD.map(kv=>(kv._1,kv._2.size))
- println(s"wordcountRDD分区数:${wordcountRDD.getNumPartitions}")
- wordcountRDD.saveAsTextFile("data/wc")
- }
- }
- packagecom.shujia.spark.core
- importorg.apache.spark.rdd.RDD
- importorg.apache.spark.{SparkConf,SparkContext}
- objectDemo3Map{
- defmain(args:Array[String]):Unit={
- valconf=newSparkConf()
- conf.setAppName("map")
- conf.setMaster("local")
- valsc=newSparkContext(conf)
- //读取学生表的数据
- valstudentsRDD:RDD[String]=sc.textFile("data/students.txt")
- /**
- * map:将rdd的数据一条一条传递给后面的函数,函数的返回值构建成一个新的RDD
- * map算子不会改变总的数据行数
- *
- */
- valclazzRDD:RDD[String]=studentsRDD.map((student:String)=>student.split(",").last)
- clazzRDD.foreach(println)
- }
- }
- packagecom.shujia.spark.core
- importorg.apache.spark.{SparkConf,SparkContext}
- importorg.apache.spark.rdd.RDD
- objectDemo4Filter{
- defmain(args:Array[String]):Unit={
- valconf=newSparkConf()
- conf.setAppName("map")
- conf.setMaster("local")
- valsc=newSparkContext(conf)
- //读取学生表的数据
- valstudentsRDD:RDD[String]=sc.textFile("data/students.txt")
- /**
- * Filter:将RDD的数据一条一条传递给函数,如果函数返回true保留数据,如果函数返回false过滤数据
- *
- * filter会减少RDD的数据行数
- *
- */
- valfilterRDD:RDD[String]=studentsRDD.filter((student:String)=>{
- valgender:String=student.split(",")(3)
- "男".equals(gender)
- })
- filterRDD.foreach(println)
- }
- }
- 相当于sql种 的explod
-
- packagecom.shujia.spark.core
- importorg.apache.spark.rdd.RDD
- importorg.apache.spark.{SparkConf,SparkContext}
- objectDemo5FlatMap{
- defmain(args:Array[String]):Unit={
- valconf=newSparkConf()
- conf.setAppName("Demo5FlatMap")
- conf.setMaster("local")
- valsc=newSparkContext(conf)
- vallinesRDD:RDD[String]=sc.textFile("data/words.txt")
- /**
- * flatMap:一条一条将RDD的数据传递给后面的函数,函数的返回值必须是一个集合,最后会将集合展开构建成一个新的RDD
- * 传入一行返回多行
- *
- */
- valwordsRDD:RDD[String]=linesRDD.flatMap(line=>line.split(","))
- wordsRDD.foreach(println)
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。