赞
踩
RDD全称Resilient Distributed DataSets,即弹性的分布式数据集,是Spark的核心内容。
RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上(分区即partition),从而让RDD中的数据可以被并行操作。(分布式的特性)
RDD通常通过Hadoop上的文件,即HDFS文件,来进行创建;有时也可以通过Spark应用程序中的集合来创建。
RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算partition。这一切对使用者是透明的。
RDD的数据默认的情况下是存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性的特性)
在RDD中,通常就代表和包含了Spark应用程序的输入源数据。
当我们,在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行transformation(转换)操作,来获取其他的RDD。
Spark Core为我们提供了三种创建RDD的方式,包括:
RDD也可以分为以下四种创建方式:
并行化集合的创建
外部存储创建
此操作可以将hadoop支持的文件转化成RDD,同时要保证所有节点都要能访问到文件。
并行化创建RDD部分代码,实现1到5的累加求和:
val arr = Array(1,2,3,4,5) // 创建一个集合
val rdd = sc.parallelize(arr) // 并行化操作RDD
val sum = rdd.reduce(_+_) // 累加求和
注意:并行集合的一个重要参数是slices,表示数据集切分的份数。用户可以在集群的每个CPU上分布2-4个slices。一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目,也可以通过传递给parallelize的第二个参数来进行手动设置。例如:
val rdd = sc.parallelize(arr,5)
spark sql对seq(s1, s2, s3, …)值的包装,seq的每个元素si会被包装成一个Row
f makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
这种用法和parallelize完全相同
该方法可以指定每一个分区的preferredLocations。操作如下:
var collect = Seq((1 to 10,Seq("hongyasce.com","hongyaa.bigdata.com")),(11 to 15,Seq("hongyasce01.com","hongyaa.bigdata01.com")))
var rdd = sc.makeRDD(collect)
rdd.preferredLocations(rdd.partitions(0))
rdd.preferredLocations(rdd.partitions(1))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。