赞
踩
上节我们完成了如下的内容:
SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互。
我们在集群的节点上启动 Spark-Shell 进行学习和测试
spark-shell --master local[*]
如果顺利启动,你就可以看到如下的画面:
尝试运行如下的指令,感受一下
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_412) Type in expressions to have them evaluated. Type :help for more information. scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd2.getNumPartitions res1: Int = 2 scala> rdd2.partitions.length res2: Int = 2 scala> val rdd3 = sc.makeRDD(List(1,2,3,4,5)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24 scala> val rdd4 = sc.makeRDD(1 to 100) rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd4.getNumPartitions res3: Int = 2 scala>
对应的截图如下:
用 textFile() 方法来从文件系统中加载数据创建RDD,方法将文件的URI作为参数:
# 本地系统 注意文件要确保存在
val lines = sc.textFile("file:///opt/wzk/1.txt")
# 从分布式文件系统加载
val lines = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")
运行结果如下图所示:
本质是将一个RDD转换为另一个RDD,从 Transformation
RDD的操作算子分为两类:
每一个Transformation操作都会产生新的RDD,供给下一个“转换”使用
转换得到RDD是惰性求值,也就是说,整个转换过程只有记录了转换的轨迹,并不会发生真正的计算,只有遇到Action操作时,才会发生真正的计算,开始从学院关系(lineage)源头开始,进行物理的转换操作。
测试如下的代码:
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = rdd1.map(_*2)
val rdd3 = rdd2.filter(_>10)
执行结果如下图:
我们可以查看当前的结果,但是当前的操作都是Transformation的,并没有真正的执行。
我们需要通过 collect 触发执行,拿到最终的结果
rdd2.collect
rdd3.collect
将会触发执行,可以看到结果为:
我们从HDFS加载一个文件过来
val rdd4 = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")
rdd4.collect
执行结果如下图:
我们使用“a”作为分隔符,对这段内容进行分割:
rdd4.flatMap(_.split("a")).collect
执行结果如下图:
val rdd5 = rdd1.mapPartitions(iter => iter.map(_*2))
执行结果如下
上面我们用:
那么这两种有什么区别呢?
宽依赖的算子(shuffle):groupBy,distinct、repartition、sortBy
val rdd1 = sc.parallelize(1 to 10)
val group = rdd1.groupBy(_%3)
group.collect
执行的结果如下图:
将 RDD 中元素的每10个元素分组
val rdd1 = sc.parallelize(1 to 101)
val rdd2 = rdd1.glom.map(_.sliding(10, 10).toArray)
rdd2.collect
执行结果如下图:
对数据采样,fraction表示采样的百分比
rdd1.sample(true, 0.2, 2).collect
rdd1.sample(false, 0.2, 2).collect
rdd1.sample(true, 0.2).collect
执行结果如下图:
对数据进行去重,我们生成一些随机数,然后对这些数值进行去重。
val random = scala.util.Random
val arr = (1 to 20).map(x => random.nextInt(10))
val rdd = sc.makeRDD(arr)
rdd.distinct.collect
执行结果如下图:
对RDD重分区,我们需要多分一些区出来
val rdd1 = sc.range(1, 1000, numSlices=10)
val rdd2 = rdd1.filter(_%2==0)
rdd2.getNumPartitions
执行结果如下图:
增加或者减少分区
rdd2.getNumPartitions
# repartition 是增加和缩减分区数
val rdd3 = rdd2.repartition(5)
# coalesce 是缩减分区数
val rdd4 = rdd2.coalesce(5)
执行结果如下图:
rdd.sortBy(x => x).collect
rdd.sortBy(x => x).collect
执行结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。