赞
踩
1. 通过Spark-shell的操作理解RDD操作;
2. 能通过RDD操作的执行理解RDD的原理;
3. 对Scala能有一定的认识。
在实验结束时能完成max,first,distinct,foreach等api的操作。
RDD(Resilient Distributed Datasets,弹性分布式数据集)是一个分区的只读记录的集合。RDD只能通过在稳定的存储器或其他RDD的数据上的确定性操作来创建。我们把这些操作称作变换以区别其他类型的操作。例如 map、filter和join。
RDD在任何时候都不需要被“物化”(进行实际的变换并最终写入稳定的存储器上)。实际上,一个RDD有足够的信息描述着其如何从其他稳定的存储器上的数据生成。它有一个强大的特性:从本质上说,若RDD失效且不能重建,程序将不能引用该RDD。而用户可以控制RDD的其他两个方面:持久化和分区。用户可以选择重用哪个RDD,并为其制定存储策略(比如:内存存储)。也可以让RDD中的数据根据记录的key分布到集群的多个机器。 这对位置优化来说是有用的,比如可用来保证两个要jion的数据集都使用了相同的哈希分区方式。
Spark 编程接口,对编程人员通过对稳定存储上的数据进行变换操作(如map和filter)。而得到一个或多个RDD。然后可以调用这些RDD的actions(动作)类的操作。这类操作的目是返回一个值或是将数据导入到存储系统中。动作类的操作如count(返回数据集的元素数),collect(返回元素本身的集合)和save(输出数据集到存储系统)。Spark直到RDD第一次调用一个动作时才真正计算RDD。
还可以调用RDD的persist(持久化)方法来表明该RDD在后续操作中还会用到。默认情况下,Spark会将调用过persist的RDD存在内存中。但若内存不足,也可以将其写入到硬盘上。通过指定persist函数中的参数,用户也可以请求其他持久化策略(如Tachyon)并通过标记来进行persist,比如仅存储到硬盘上或是在各机器之间复制一份。最后,用户可以在每个RDD上设定一个持久化的优先级来指定内存中的哪些数据应该被优先写入到磁盘。 缓存有个缓存管理器,Spark里被称作blockmanager。注意,这里还有一个误区是,很多人认为调用了cache或者persist的那一刻就是在缓存了,这是完全不对的,真正的缓存执行指挥在action被触发。
总结:RDD是分布式只读且已分区集合对象。这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。具有自动容错、位置感知调度和可伸缩性,而容错性是最难实现的,大多数分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。对于大规模数据分析系统,数据检查点操作成本高,主要原因是大规模数据在服务器之间的传输带来的各方面的问题,相比记录数据的更新,RDD也只支持粗粒度的转换,也就是记录如何从其他RDD转换而来(即lineage),以便恢复丢失的分区。
简而言之,特性如下:
1. 数据结构不可变 ;
2. 支持跨集群的分布式数据操作 ;
3. 可对数据记录按key进行分区 ;
4. 提供了粗粒度的转换操作 ;
5. 数据存储在内存中,保证了低延迟性。
依据前面实验启动Hadoop和Spark集群。
利用xmanager中的xshell登录到master机器上。
进入到master机器的的Spark的安装目录,执行命令:bin/spark-shell
稍等一段时间,待spark-shell启动之后,屏幕上出现scala的命令提示符之后开始进行Spark命令的键入。
注意:实验需要以本地模式启动:bin/spark-shell,如果以集群模式启动,有可能无法查看输出结果。
scala > var a = sc.parallelize(List(”Gnu”,”Cat”,”Rat”,”Dog”,”Gnu”,”Rat”),2); scala > a.distinct.collect
执行输出结果如图15-1所示:
图15-1
scala > var b = sc.parallelize(List(”cat”,”dog”,”tiger”,”lion”,”gnu”,”crocodile”,”ant”,”whale”,”dolphin”,”spider”),3) scala > b.foreach(x=>println(x+”s are yummy”))
执行输出结果如图15-2所示:
图15-2
scala > var c=sc.parallelize(List(”dog”,”Cat”,”Rat”,”Dog”),2) scala > c.first
执行输出结果如图15-3所示:
图15-3
scala > var d=sc.parallelize(10 to 30) scala > d.max
执行输出结果如图15-4所示:
图15-4
scala > var e = sc.parallelize(List((10,”dog”),(20,”cat”),(30,”tiger”),(18,”lion”))) scala > e.max
执行输出结果如图15-5所示:
图15-5
scala > var f = sc.parallelize(1 to 20) scala > var g = sc.parallelize(10 to 30) scala > var h = f.intersection(g) scala > h.collect
执行输出结果如图15-6所示:
图15-6
步骤1:搭建Spark集群
配置Spark集群(独立模式):
1.前提:
配置各节点之间的免密登录,并在/etc/hosts中写好hostname与IP的对应,这样方便配置文件的相互拷贝。2、因为下面实验涉及Spark集群使用HDFS,所以按照之前的实验预先部署好HDFS。
在master机上操作:确定存在spark。
[root@master ~]# ls /usr/cstor
spark/
[root@master ~]#
在master机上操作:进入/usr/cstor目录中。
[root@master ~]# cd /usr/cstor
[root@master cstor]#
进入配置文件目录/usr/cstor/spark/conf, 先拷贝并修改slave.templae为slave。
[root@master ~]# cd /usr/cstor/spark/conf
[root@master cstor]# cp slaves.template slaves
然后用vim命令编辑器编辑slaves文件
[root@master cstor]# vim slaves
编辑slaves文件将下述内容添加到slaves文件中。
slave1
slave2
slave3
上述内容表示当前的Spark集群共有三台slave机,这三台机器的机器名称分别是slave1~3。
在spark-conf.sh中加入JAVA_HOME。
[root@master cstor]# vim /usr/cstor/spark/sbin/spark-config.sh
加入以下内容
export JAVA_HOME=/usr/local/jdk1.7.0_79
将配置好的Spark拷贝至slaveX、client。(machines在目录/root/data/2下,如果不存在则自己新建一个)
使用for循环语句完成多机拷贝。
[root@master ~]# cd /root/data/2
[root@master ~]# cat machines
slave1
slave2
slave3
client
[root@master ~]# for x in `cat machines` ; do echo $x ; scp -r /usr/cstor/spark/ $x:/usr/cstor/; done;
在master机上操作:启动Spark集群。
[root@master local]# /usr/cstor/spark/sbin/start-all.sh
2.配置HDFS
配置Spark集群使用HDFS:
首先关闭集群(在master上执行)
[root@master ~]# /usr/cstor/spark/sbin/stop-all.sh
将Spark环境变量模板复制成环境变量文件。
[root@master ~]# cd /usr/cstor/spark/conf
[root@master conf]# cp spark-env.sh.template spark-env.sh
修改Spark环境变量配置文件spark-env.sh。
[root@master conf]$ vim spark-env.sh
在sprak-env.sh配置文件中添加下列内容。
export HADOOP_CONF_DIR=/usr/cstor/hadoop/etc/hadoop
重新启动spark
步骤2:进入spark-shell
步骤3:验证RDD distinct
步骤4:验证RDD foreach
步骤5:验证RDD first
步骤6:验证RDD max
步骤7:验证RDD intersection
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。