赞
踩
进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
Spark Core提供了三种创建RDD的方式,包括:
个人经验认为:
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。 Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。
// 案例: 1到10累加求和
val arr= Array(1,2, 3, 4, 5,6,7,8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会 为每一个parition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认 会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如pallelize(arr,10)
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、 HBase以及 本地文件。通过调用SparkContext的**textFile()**方法,可以针对本地文件或HDFS文件创建RDD。
有几个事项是需要注意的:
//案例:文件字数统计
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)
Spark支持两种RDD操作: transformation和action
transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、 保存到文件等,并可以返回结果给Driver程序。
例如,map就是一种transformnation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。
transformation的特点就是lazy特性。lazy特性 指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是 不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过 这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。
action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。
常见的Transformation算子:
Transformation | Feature |
---|---|
map | 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD |
filter | 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。 |
flatMap | 与map类似,但是对每个元素都可以返回一个或多个新元素。 |
gropuByKey | 根据key进行分组,每个key对应一个Iterable |
reduceByKey | 对每个key对应的value进行reduce操作。 |
sortByKey | 对每个key对应的value进行排序操作。 |
join | 对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。 |
cogroup | 同join,但是是每个key对应的Iterable都会传入自定义函数进行处理。 |
使用Java语言对每一种Transformation算子举例讲解:
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/25 * @time : 7:10 下午 */ public class TransformationJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("TransformationJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { map(); filter(); flatmap(); groupByKey(); reduceByKey(); sortByKey(); join(); cogroup(); } /** * 数据集一:(2,"lili") cogroup() -> <2,<"lili",(90,95,99)>> * 数据集二:(2,90)(2,95)(2,99) */ private static void cogroup() { List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"), new Tuple2<Integer,String>(2,"lili"), new Tuple2<Integer,String>(3,"cherry"), new Tuple2<Integer,String>(4,"jack"), new Tuple2<Integer,String>(5,"jone"), new Tuple2<Integer,String>(6,"lucy"), new Tuple2<Integer,String>(7,"aliy")); List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90), new Tuple2<Integer,Integer>(2,79), new Tuple2<Integer,Integer>(2,95), new Tuple2<Integer,Integer>(2,99), new Tuple2<Integer,Integer>(3,87), new Tuple2<Integer,Integer>(3,88), new Tuple2<Integer,Integer>(3,89), new Tuple2<Integer,Integer>(4,98), new Tuple2<Integer,Integer>(5,89), new Tuple2<Integer,Integer>(6,93), new Tuple2<Integer,Integer>(7,96)); JavaSparkContext sc = getsc(); JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList); JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList); JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupValues = javaPairStuRdd.cogroup(javaPairScoreRdd); cogroupValues.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>>>() { public void call(Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>> integerTuple2Tuple2) throws Exception { System.out.println(integerTuple2Tuple2._1); System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2); } }); } /** * 数据集一:(1,"alex") join() -> <1,<"alex",90>> * 数据集二:(1,90) */ private static void join() { List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"), new Tuple2<Integer,String>(2,"lili"), new Tuple2<Integer,String>(3,"cherry"), new Tuple2<Integer,String>(4,"jack"), new Tuple2<Integer,String>(5,"jone"), new Tuple2<Integer,String>(6,"lucy"), new Tuple2<Integer,String>(7,"aliy")); List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90), new Tuple2<Integer,Integer>(2,95), new Tuple2<Integer,Integer>(3,87), new Tuple2<Integer,Integer>(4,98), new Tuple2<Integer,Integer>(5,89), new Tuple2<Integer,Integer>(6,93), new Tuple2<Integer,Integer>(7,96)); JavaSparkContext sc = getsc(); JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList); JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList); JavaPairRDD<Integer,Tuple2<String,Integer>> joinValue = javaPairStuRdd.join(javaPairScoreRdd); joinValue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() { public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception { System.out.println(integerTuple2Tuple2._1); System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2); } }); } /** * <90,alex> * <95,lili> sortByKey() -> <87,cherry> <90,alex> <95,lili> * <87,cherry> */ private static void sortByKey() { List list = Arrays.asList(new Tuple2<Integer,String>(90,"alex"), new Tuple2<Integer,String>(95,"lili"), new Tuple2<Integer,String>(87,"cherry"), new Tuple2<Integer,String>(98,"jack"), new Tuple2<Integer,String>(89,"jone"), new Tuple2<Integer,String>(93,"lucy"), new Tuple2<Integer,String>(96,"aliy")); JavaPairRDD<Integer, String> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<Integer,String> sortByKeyValues = javaPairRdd.sortByKey(true); sortByKeyValues.foreach(new VoidFunction<Tuple2<Integer, String>>() { public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception { System.out.println(integerStringTuple2._1 + " : " + integerStringTuple2._2); } }); } /** * <class_1,(90,87,98,96)> reduceByKey() -> <class_1,(90+87+98+96)> * <class_2,(95,89,93)> reduceByKey() -> <class_2,(95+89+93)> */ private static void reduceByKey() { List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90), new Tuple2<String,Integer>("class_2",95), new Tuple2<String,Integer>("class_1",87), new Tuple2<String,Integer>("class_1",98), new Tuple2<String,Integer>("class_2",89), new Tuple2<String,Integer>("class_2",93), new Tuple2<String,Integer>("class_1",96)); final JavaPairRDD<String, Integer> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<String,Integer> reduceByKeyValues = javaPairRdd.reduceByKey(new Function2<Integer,Integer,Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); reduceByKeyValues.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2); } }); } /** * class_1 90 groupByKey() -> <class_1,(90,87,98,96)> <class_2,(95,89,93)> * class_2 95 * class_1 87 * class_1 98 * class_2 89 * class_2 93 * class_1 96 */ private static void groupByKey() { List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90), new Tuple2<String,Integer>("class_2",95), new Tuple2<String,Integer>("class_1",87), new Tuple2<String,Integer>("class_1",98), new Tuple2<String,Integer>("class_2",89), new Tuple2<String,Integer>("class_2",93), new Tuple2<String,Integer>("class_1",96)); JavaPairRDD<String,Integer> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<String,Iterable<Integer>> groupByKeyValue = javaPairRdd.groupByKey(); groupByKeyValue.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { public void call(Tuple2<String,Iterable<Integer>> stringIteratorTuple2) throws Exception { System.out.println(stringIteratorTuple2._1); Iterator<Integer> iterator = stringIteratorTuple2._2.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } } }); } /** * hbase hadoop hive * java python flatmap() -> hbase hadoop hive java python java python * java python */ private static void flatmap() { List<String> list = Arrays.asList("hbase hadoop hive","java python","java python"); JavaRDD<String> javaRdd = getsc().parallelize(list); JavaRDD<String> flatMapValue = javaRdd.flatMap(new FlatMapFunction<String,String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); flatMapValue.foreach(new VoidFunction<String>() { public void call(String value) throws Exception { System.out.println(value); } }); } /** * 1,2,3,4,5,6,7,8,9,10 filter() -> 2,4,6,8,10 */ private static void filter() { List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaRDD<Integer> javaRdd = getsc().parallelize(list); // 取偶数 JavaRDD<Integer> filterValue = javaRdd.filter(new Function<Integer,Boolean>() { public Boolean call(Integer value) throws Exception { return value % 2 == 0; } }); filterValue.foreach(new VoidFunction<Integer>() { public void call(Integer o) throws Exception { System.out.println(o); } }); } /** * 1,2,3,4,5 map() -> 10,20,30,40,50 */ public static void map(){ List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); JavaRDD<Integer> mapValue = javaRdd.map(new Function<Integer,Integer>() { public Integer call(Integer value) throws Exception { return value * 10; } }); mapValue.foreach(new VoidFunction<Integer>() { public void call(Integer o) throws Exception { System.out.println(o); } }); } }
使用Scala语言对每一种Transformation算子举例讲解:
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/25 * @time : 7:19 下午 */ object TransformationScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local") return new SparkContext(sparkConf) } def main(args: Array[String]): Unit = { map() filter() flatmap() groupByKey() reduceByKey() sortByKey() join() cogroup() } /** * 1,2,3,4,5 map() -> 10,20,30,40,50 */ def map(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val mapValue = rdd.map(x => x * 10) mapValue.foreach(x => System.out.println(x)) } /** * 1,2,3,4,5,6,7,8,9,10 filter() -> 2,4,6,8,10 */ def filter(): Unit ={ val list = Array(1,2,3,4,5,6,7,8,9,10) val rdd = getsc().parallelize(list) val filterValue = rdd.filter(x => x % 2 == 0) filterValue.foreach(x => System.out.println(x)) } /** * hbase hadoop hive * java python flatmap() -> hbase hadoop hive java python java python * java python */ def flatmap(): Unit = { val list = Array("hbase hadoop hive", "java python", "storm spark") val rdd = getsc().parallelize(list) val flatMapValue = rdd.flatMap(x => x.split(" ")) flatMapValue.foreach(x => System.out.println(x)) } /** * class_1 90 groupByKey() -> <class_1,(90,87,98,96)> <class_2,(95,89,93)> * class_2 95 * class_1 87 * class_1 98 * class_2 89 * class_2 93 * class_1 96 */ def groupByKey(): Unit = { val list = Array( Tuple2("class_1", 90), Tuple2("class_2", 95), Tuple2("class_1", 87), Tuple2("class_1", 98), Tuple2("class_2", 89), Tuple2("class_2", 93), Tuple2("class_1", 96)) val rdd = getsc().parallelize(list) val groupByKeyValue = rdd.groupByKey() groupByKeyValue.foreach(x => { System.out.println(x._1) x._2.foreach(y => System.out.println(y)) }) } /** * <class_1,(90,87,98,96)> reduceByKey() -> <class_1,(90+87+98+96)> * <class_2,(95,89,93)> reduceByKey() -> <class_2,(95+89+93)> */ def reduceByKey(): Unit = { val list = Array( Tuple2("class_1", 90), Tuple2("class_2", 95), Tuple2("class_1", 87), Tuple2("class_1", 98), Tuple2("class_2", 89), Tuple2("class_2", 93), Tuple2("class_1", 96)) val rdd = getsc().parallelize(list) val reduceByKeyValues = rdd.reduceByKey((x,y) => x+y) reduceByKeyValues.foreach(x => { System.out.println(x._1 + " : " + x._2) }) } /** * <90,alex> * <95,lili> -> <87,cherry> <90,alex> <95,lili> * <87,cherry> */ def sortByKey(): Unit ={ val list = Array(Tuple2(90, "alex"), Tuple2(95, "lili"), Tuple2(87, "cherry"), Tuple2(98, "jack"), Tuple2(89, "jone"), Tuple2(93, "lucy"), Tuple2(96, "aliy") ) val rdd = getsc().parallelize(list) val sortByKeyValues = rdd.sortByKey(true) sortByKeyValues.foreach(x => { System.out.println(x._1 + " : " + x._2) }) } /** * 数据集一:(1,"alex") join() -> <1,<"alex",90>> * 数据集二:(1,90) */ def join(): Unit ={ val stuList = Array(Tuple2(1, "alex"), Tuple2(2, "lili"), Tuple2(3, "cherry"), Tuple2(4, "jack"), Tuple2(5, "jone"), Tuple2(6, "lucy"), Tuple2(7, "aliy")) val scoreList = Array(Tuple2(1, 90), Tuple2(2, 95), Tuple2(3, 87), Tuple2(4, 98), Tuple2(5, 89), Tuple2(6, 93), Tuple2(7, 96)) val sc = getsc() val stuRdd = sc.parallelize(stuList) val scoreRdd = sc.parallelize(scoreList) val joinValue = stuRdd.join(scoreRdd) joinValue.foreach(x => { System.out.println(x._1 + " > " + x._2._1 + " : " + x._2._2) }) } /** * 数据集一:(2,"lili") cogroup() -> <2,<"lili",(90,95,99)>> * 数据集二:(2,90)(2,95)(2,99) */ def cogroup(): Unit ={ val stuList = Array(Tuple2(1, "alex"), Tuple2(2, "lili"), Tuple2(3, "cherry"), Tuple2(4, "jack"), Tuple2(5, "jone"), Tuple2(6, "lucy"), Tuple2(7, "aliy")) val scoreList = Array(Tuple2(1, 90), Tuple2(2, 95), Tuple2(2, 95), Tuple2(2, 99), Tuple2(3, 87), Tuple2(3, 88), Tuple2(3, 89), Tuple2(4, 98), Tuple2(5, 89), Tuple2(6, 93), Tuple2(7, 96)) val sc = getsc() val stuRdd = sc.parallelize(stuList) val scoreRdd = sc.parallelize(scoreList) val cogroupValues = stuRdd.cogroup(scoreRdd) cogroupValues.foreach(x => { System.out.println(x._1 + " > " + x._2._1.toList + " : " + x._2._2.toList) }) } }
常见Action算子:
Action | Feature |
---|---|
reduce | 将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。 |
collect | 将RDD中所有元素获取到本地客户端。 |
count | 获取RDD元素总数。 |
take(n) | 获取RDD中前n个元素。 |
saveAsTextFile | 将RDD元素保存到文件中,对每个元素调用toString方法。 |
countByKey | 对每个key对应的值进行count计数。 |
foreach | 遍历RDD中的每个元素。 |
使用Java语言对每一种Action算子举例讲解:
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/26 * @time : 10:42 下午 */ public class ActionJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { reduce(); collect(); count(); take(); save(); countByKey(); } /** * <"class_1","alex"> * <"class_2","jone"> * <"class_1","lucy"> <class_1,4> * <"class_1","lili"> countByKey() -> * <"class_2","ben"> <class_2,3> * <"class_2","jack"> * <"class_1","cherry"> */ private static void countByKey() { List list = Arrays.asList(new Tuple2<String,String>("class_1","alex"), new Tuple2<String,String>("class_2","jone"), new Tuple2<String,String>("class_1","lucy"), new Tuple2<String,String>("class_1","lili"), new Tuple2<String,String>("class_2","ben"), new Tuple2<String,String>("class_2","jack"), new Tuple2<String,String>("class_1","cherry")); JavaPairRDD javaPairRdd = getsc().parallelizePairs(list); Map<String,Integer> countByKeyValues = javaPairRdd.countByKey(); for (Map.Entry obj : countByKeyValues.entrySet()){ System.out.println(obj.getKey() + " : " + obj.getValue()); } } /** * saveAsTextFile() */ private static void save() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); javaRdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/javaRdd"); } /** * 1,2,3,4,5 take(3) -> [1,2,3] */ private static void take() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); List<Integer> listValues = javaRdd.take(3); for (int value : listValues){ System.out.println(value); } } /** * 1,2,3,4,5 count() -> 5 */ private static void count() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); System.out.println(javaRdd.count()); } /** * 1,2,3,4,5 collect() -> [1,2,3,4,5] */ private static void collect() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); List<Integer> collectValues = javaRdd.collect(); for (int value : collectValues){ System.out.println(value); } } /** * 1,2,3,4,5 reduce() -> 15 */ private static void reduce() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); Integer reduceValues = javaRdd.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); System.out.println(reduceValues); } }
使用Scala语言对每一种Action算子举例讲解:
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/26 * @time : 10:42 下午 */ object ActionScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local") return new SparkContext(sparkConf) } def main(args: Array[String]): Unit = { reduce() collect() count() take() save() countByKey() } /** * 1,2,3,4,5 reduce() -> 15 */ def reduce(): Unit = { val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val reduceValues = rdd.reduce((x,y) => x + y) System.out.println(reduceValues) } /** * 1,2,3,4,5 collect() -> [1,2,3,4,5] */ def collect(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val collectValue = rdd.collect() for (value <- collectValue){ System.out.println(value) } } /** * 1,2,3,4,5 count() -> 5 */ def count(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) System.out.println(rdd.count()) } /** * 1,2,3,4,5 take(3) -> [1,2,3] */ def take(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val takeValues = rdd.take(3) for (value <- takeValues){ System.out.println(value) } } /** * saveAsTextFile() */ def save(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) rdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/scalaRdd") } /** * <"class_1","alex"> * <"class_2","jone"> * <"class_1","lucy"> <class_1,4> * <"class_1","lili"> countByKey() -> * <"class_2","ben"> <class_2,3> * <"class_2","jack"> * <"class_1","cherry"> */ def countByKey(): Unit ={ val list = Array( Tuple2("class_1", "alex"), Tuple2("class_2", "jack"), Tuple2("class_1", "jone"), Tuple2("class_1", "lili"), Tuple2("class_2", "ben"), Tuple2("class_2", "lucy"), Tuple2("class_1", "cherry")) val rdd = getsc().parallelize(list) val countByKeyValues = rdd.countByKey() System.out.println(countByKeyValues) } }
Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何parition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
不使用持久化
使用持久化
基于Spark 1.6.1 的源码,可以看到
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorogeLevel即可。
持久化级别 | 含义 |
---|---|
MEMORY_ONLY | 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下次需要使用它的时候,重新被计算。 |
MEMORY_AND_DISK | 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取。 |
MEMORY_ONLY_SER | 同MEMORY ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。 |
MEMORY_AND_DSK_SER | 同MEMORY_ AND DSK,但是使用序列化方式持久化Java对象。 |
DISK_ONLY | 使用非序列化Java对象的方式持久化,完全存储到磁盘上。 |
MEMORY_ONLY_2 MEMORY_ AND_DISK_2 | 如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点。从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。 |
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其构造函数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
可以看到StorageLevel类的主构造器包含了5个参数:
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:
下面我将用代码来验证一下RDD持久化的优势
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import javax.swing.*; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/27 * @time : 2:35 下午 */ public class persistJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { JavaRDD lines = getsc().textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/2015082818").cache(); // 第一次开始时间 long begin = System.currentTimeMillis(); System.out.println("行数:"+lines.count()); // 第一次总时间 System.out.println("第一次总时间:"+(System.currentTimeMillis() - begin)); // 第二次开始时间 long begin1 = System.currentTimeMillis(); System.out.println("行数:"+lines.count()); // 第二次总时间 System.out.println("第二次总时间:"+(System.currentTimeMillis() - begin1)); } }
不使用rdd持久化:
行数:64972
第一次总时间:1085
行数:64972
第二次总时间:191
使用rdd持久化:
行数:64972
第一次总时间:986
行数:64972
第二次总时间:23
测试的话尽量采用数据量大一些,不然测试结果会看不出效果,根据上面的测试结果,我们可以看出使用rdd持久化和不使用rdd持久化差距很明显。
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。