赞
踩
Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。主要用于数据计算,经常被认为是Hadoop框架的升级版。
Hadoop的MR框架和Spark框架都是进行数据处理的框架。
1、Spark在传统的MR计算框架上,利用对计算过程的优化,从而大大加快了数据分析、挖掘的运行和读写速度,并将计算单元缩小到更合适并行计算和重复使用的RDD计算模型中。
2、Spark和Hadoop的根本差异在多个作业之间的数据通信问题上:Spark多个作业之间数据通信是基于内存的,而Hadoop是基于磁盘的。
3、Spark只有在Shuffle时才将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互。
4、Spark Task的启动时间快。Spark采用fork线程的方式,而Hadoop采用创建新进程的方式。
Spark是基于内存的,所以在实际的生产环境中,可能会由于内存的限制,造成内存资源不足而导致job执行失败。所以,虽然绝大多数计算场景中,spark优于MR但是,Spark也还做不到完全替代MR。
此模块可以与结构化数据配合使用, 可以作为一个纯的SQL引擎。可以读取存储在RDBMS表中的数据,或从具有结构化数据的文件格式中读取,然后在Spark中构造成永久性或临时的表。在使用各种编程语言时,可以结合类似SQL的查询来查询数据,前提是将数据读入Spark数据框架。在使用Python、R或Java编写代码时,生成的字节码完全相同,所以性能也会是相同的,因此无需过多考虑使用哪种语言更合适,只需要选择自己熟悉的编程语言。
spark中有一个包含通用机器学习算法的库,称为MLib。MLlib提供了许多流行的高级DataFrame API机器学习算法来构建模型。这些API允许在部署期间提取或转换特性、构建管道(用于训练和评估)以及持久化模型(用于保存和更新加载)。其他实用程序还包括使用公共线性代数操作和统计数据。
在Structured Streaming 模型下,Spark SQL核心引擎可以处理容错和延迟数据语义的所有方面,允许开发人员相对轻松地专注于Streaming应用程序的开发。
GraphX是一个用于操作图和图并行计算的库。它提供了由社区用户贡献的分析、连接和遍历的标准图算法:可以用的算法包括PageRank、连接组件和三角形计数。
注: Spark开发环境常用IDEA+Maven。在IDEA中以本地模式运行Spark项目是不需要在本机搭建Spark和Hadoop环境的。
注:Spark Shell在测试和验证程序时使用的较多,在生产环境中,通常会在IDEA中编写程序然后打成jar包,然后再提交到集群中使用。最常用的是创建Maven项目,利用Maven来管理Jar包的依赖。
前置知识:MapReduce框架的原理及基本使用,了解其底层数据处理的实现方式。(待学)
参考:
Spark源码分析调试环境搭建 - 仗剑走天涯的文章 - 知乎
想研读下spark的源码,怎么搭阅读和调试的环境呢? - 连城的回答 - 知乎
spark开发环境搭建(基于idea 和maven) - 南唐遗少的文章 - 知乎
Spark简介及spark部署、原理和开发环境搭建
在本地部署单个spark服务,比较适合简单了解spark目录结构,熟悉配置文件,简单跑一下demo示例等调试场景。
Spark自带的任务调度模式,多个Spark机器之间内部协调调度,但仅是Spark自身的任务调度。Standalone独立模式,是Spark 原生的简单集群管理器,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用 Standalone 可以很方便地搭建一个集群;但毕竟Spark主要处理计算,对资源管理调度并不擅长,所以只适合学习不适用于生产环境,一般公司会使用Yarn模式。
Spark使用Hadoop的Yarn组件进行资源与任务调度,实现真正意义上Spark与外部对接协作。Yarn是一个统一的资源管理机制,在上面可以运行多套计算框架,如MR、Storm等。根据Driver在集群中的位置不同,分为Yarn client(集群外)和Yarn cluster(集群内部)
Spark使用mesos平台进行资源与任务的调度。Spark客户端直接连接Mesos;不需要额外构建Spark集群。一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 Yarn。但此模式在国外使用较多,国内还未被广泛使用。
容器化部署,基于Docker镜像运行能够让用户更方便地对应用进行管理和运维。
也可以直接在Windows下直接部署,但这种使用较少,有兴趣可以后期自行学习。
Spark架构的核心是计算引擎,其整体采用了标准的master-slave结构。下图展示了一个Spark执行时的基本结构,图中的Driver表示master ,负责管理整个集群中的作业任务调度,Executor则表示slave,负责实际执行任务。
注:Driver和Executor是计算相关的组件,Master和Worker是独立部署模式下的两大资源相关的组件。ApplicationMaster能帮助资源和计算之间解耦合。
Driver是Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。在Spark作业执行时会负责(功能/作用):将用户程序转化为作业(job);会负责在Executor之间的调度任务(task);会跟踪Executor的执行情况;还能通过UI展示查询运行情况。
Spark Executor是集群中工作节点(Worker)中的一个JVM进程,会负责在Spark作业中运行具体任务(Task),任务之间彼此独立,在Spark应用启动时Executor节点会被同时启动并且伴随整个应用的生命周期。Executor有两个核心功能:
负责运行组成Spark应用的任务,并将结果返回给驱动器进程;可以通过自身的块管理器(Block Manager)将用户程序中要求缓存的RDD放入内存。RDD可以直接缓存在Executor进程内,以此充分利用缓存数据加速运算。
Master节点会常驻master守护进程,会对Worker节点进行管理,并且会从master节点提交应用。Worker节点会常驻worker守护进程,会与Master节点进行通信,并对Executor进程进行管理。在搭建Spark集群时,就已经设置好了Master节点和Worker节点,一个集群中可以有多个Master节点和多个Worker节点。一台PC机可以同时作为Master和Worker节点,例如:有四台机器,就可以选择一台机器做masker节点,剩下三台设置为worker节点,也可以把四台都设置为worker节点,这种情况下,有一台机器既是master节点又是worker节点。
Master和Worker是Spark环境部署中的两个核心组件,在Spark的独立部署环境中都是进程,Master主要负责资源的调度和分配,并进行机器监控,类似于Yarn环境中的RM,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn中的NM。
参考:
了解Spark中的Master、Worker和Driver、Executor
对Spark中一些基础概念的了解
Hadoop用户向Yarn集群提交应用程序的时候,会在提交程序中加入ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,以此运行用户自己的程序任务Job,监控整个任务的执行,跟踪整个任务的状态,出力任务失败等异常情况。
在提交应用时,可以提供参数指定计算节点的数量以及对应资源,而此处的资源则是指工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量。
并行度是指整个集群能够并行执行任务的数量。一个作业的并行度取决于框架的默认配置, CPU的核数直接影响并行度的上限。并行度的参数值也可以在应用程序运行的过程中进行动态修改。
DAG有向无环图是由点和线组成的拓扑图形,该图形具有方向,但不会闭环。
Spark中的DAG是指由Spark程序直接映射成的数据流的高级抽象模型,可以理解为是将整个程序计算的执行过程用图形表示出来。
Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式: Client和 Cluster。 两种模式主要区别在于: Driver 程序的运行节点位置。
Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。
Cluster模式将用于监控和调度的Driver模式启动在Yarn集群资源中执行。一般应用于实际生产环境。
Spark计算框架为了高并发和高吞吐的进行数据处理,封装看三大数据结构,用于处理不同的应用场景:
RDD:弹性分布式数据集;
累加器:分布式共享***只写变量***;
广播变量:分布式共享***只读变量***;
RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据处理模型。在代码中表现为一个抽象类,它可以代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
* Internally, each RDD is characterized by five main properties:每个RDD的五个主要属性
* - A list of partitions 一个分区列表
* - A function for computing each split 分区计算函数
* - A list of dependencies on other RDDs RDD之间的依赖关系
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
用于K-V RDD的分区器(可选)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
计算分区的首选位置 (可选)
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
由子类实现,返回该RDD中的分区集。这个方法只会被调用一次,所以在其中实现耗时的计算是安全的。
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
Spark在计算时,是使用分区函数对每一个分区进行计算。
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
由子类实现来计算给定的分区。
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系。
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
由子类实现,返回RDD如何依赖于父RDD。这个方法只会被调用一次,所以在其中实现耗时的计算是安全的。
*/
protected def getDependencies: Seq[Dependency[_]] = deps
当数据为KV类型数据时,可以通过设定分区器自定义数据的分区,即自定义分区规则进行数据分区处理。
/** Optionally overridden by subclasses to specify how they are partitioned. */
可选地被子类重写,以指定如何分区。
@transient val partitioner: Option[Partitioner] = None
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算。即可判断将计算发送到那个节点,执行效率最优。
/**
* Optionally overridden by subclasses to specify placement preferences.
可选地被子类重写以指定位置首选项。
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
Spark框架在执行时,需要先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。接着会将任务发到已经分配好资源的计算节点上,再按照指定的计算模型进行数据计算,最终得到计算结果。
RDD是Spark架构中用于数据处理的核心模型,下面以Yarn环境为例,看一下RDD的工作原理:
1)启动Yarn集群环境
2)Spark通过申请资源创建调度节点(Driver)和计算节点(Executor)
3)Spark框架会根据需求将计算逻辑根据分区划分成不同的任务
4)调度节点会将任务根据计算节点的状态发送到对应的计算节点进行计算
从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算。
参考:
对Spark中一些基础概念的了解:
从集合中创建RDD,Spark主要提供两个方法:parallelize和makeRDD
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
从底层代码实现来看,makeRDD方法其实就是parallelize方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。使用方法为textFile
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
主要通过一个RDD运算完后,再产生新的RDD。
使用new的方式直接构造RDD,一般由Spark框架自身使用。
在默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数,被称之为并行度。这个数量可以在构建RDD时指定。要注意,这里并行执行的任务数量是和Executor有关,并不是指切分后的任务数量。
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,分区规则的Spark核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}}
读取文件数据时,数据时按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则是有些差异的,具体源码如下:
在这里插入代码片public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); ... for (FileStatus file: files) { ... if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); ... } protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
RDD会根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型。
函数名 | 函数说明 |
1、map | 将处理的数据逐条进行映射转换,可以是类型转换也可以是值转换 |
2、mapPartitions | 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可进行任意的处理,哪怕是过滤数据。 |
思考问题:map和mapPartitions的区别?
下面分三个方面进行理解:
数据处理 Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作的。
功能 Map算子主要目的是将数据源中的数据进行转换或改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求元素的个数保存不变,所以是可以增加或减少数据的。
性能 Map算子因为类似于串行操作,所以性能比较低,但mapPartitions算子类似于批处理,所以性能较高。但mapPartitions算子会长时间占用内存,也会因此导致内存不够用,进而出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。更多还是使用map操作。
3、mapPartitionsWithIndex | 可以将待处理的数据以分区为单位发送到计算节点进行处理,在处理的同时可以获取当前分区索引 |
4、flatMap | 将待处理的数据进行扁平化后再进行映射处理。 |
5、glom | 将同一个分区的数据直接转换为相同类型的内存数据进行处理,分区不变。 |
6、groupBy | 会将数据根据指定的分区规则进行分组,分区默认不变,但是数据会被打乱重新组合(即有shuffle操作)。极限情况下,数据有可能被分在同一个分区中。注:一个组的数据在一个分区中,但是并不是说一个分区中只能有一个组,分组和分区没有必然关系。 |
7、filter | 可以将数据根据指定的规矩进行筛选过滤,符合规矩的留下不符合的数据丢弃。筛选过滤后,分区不变,但是可能会造成分区内数据不均衡,造成数据倾斜。 |
8、sample | 根据指定的规则从数据集群中抽取数据。其中有一个参数可以决定抽取后的数据是否还会重新放回。 |
9、distinct | 将数据集中重复的数据去重 |
10、coalesce | 可以根据数量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当spark程序中,存在过多的小任务时,可以通过coalesce方法收缩合并分区,减少分区的个数,减小任务调度成本。但在默认情况下,不会进行shuffle操作,也就不会打乱分区内的数据。 |
11、repartition | 用于扩大分区。其底层实际就是coalesce+shuffer。所以无论是将分区数多的RDD转为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都能完成,且都会有shuffle过程。 |
12、sortBy | 用于数据排序,默认为升序。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle过程。 |
函数名 | 函数说明 |
13、intersection | 对源RDD和参数RDD求交集后返回一个新的RDD |
14、union | 并集 |
15、subtract | 差集 |
16、zip | 将两个RDD中的元素,以键值对的形式进行合并。 |
函数名 | 函数说明 |
17、partitionBy | 将数据按照指定的partitioner重新进行分区。默认1的分区器是HashParttioner。该算子会对数据进行改变,且需要先将数据转变成键值对形式才能使用。 |
18、reduceByKey | 可以将数据按照相同的Key对Value进行聚合。如果key的数据只有一个,则不会参与运算。 |
19、groupBykey | 将数据源的数据根据key对value进行分组 |
思考问题:reduceByKey 和 groupByKey 的区别?
Shuffle:
reduceByKey和groupByKey都存在shuffle操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine),这样可以减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,由shuffle的角度看,reduceByKey的性能较高。
功能:
reduceByKey其实包含分组和聚合的功能。groupByKey只能进行分组,不能进行聚合,所以在需要分组和聚合的场合下推荐使用reduceByKey,但是如果仅仅是分组而不需要聚合。那么还是只能选择groupByKey。
20、aggregateByKey | 将数据根据不同的规则进行分区内计算和分区间计算。 |
21、foldByKey | 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey。 |
22、combineByKey | 类似于aggregate(),combineByKey允许用户返回值的类型与输入不一致。 |
思考问题:reduceByKey、 foldByKey、 aggregateByKey、 combineByKey 的区别?
reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
23、sortBykey | 在一个(K,V)的 RDD 上调用, K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的 |
24、join | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD |
25、leftOuterJoin | 类似于 SQL 语句的左外连接 |
26、cogroup | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
行动算子会真正的触发任务的执行
函数名 | 函数说明 |
1、reduce | 聚集RDD中的所有元素,先聚合分区内的函数再聚合分区间的函数。 |
2、collect | 在驱动程序中,以数组 Array 的形式返回数据集的所有元素 |
3、count | 返回RDD中的元素的个数 |
4、first | 返回RDD中的第一个元素 |
5、take | 返回一个由RDD的前n个元素组成的数组 |
6、takeOrdered | 返回该 RDD 排序后的前 n 个元素组成的数组 |
7、aggregate | 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。 |
8、fold | 折叠操作, aggregate 的简化版操作 |
9、countByKey | 统计每种 key 的个数 |
10、save相关算子 | 将数保持到不同格式的文件中 saveAsTextFile、saveAsObjectFile、saveAsSequenceFile |
11、foreach | 分布式变量RDD中的每个元素,调用指定函数 |
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。 那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。 Scala2.12 版本后闭包编译方式发生了改变。
extends Serializable
Java 的序列化能够序列化任何的类。但是比较重(字节多) ,序列化后,对象的提交也比较大。 Spark 出于性能的考虑, Spark2.0 开始支持另外一种 Kryo 序列化机制。 Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
参考地址: https://github.com/EsotericSoftware/kryo
(1)RDD血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。Spark中会将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
(2)RDD依赖关系
Spark中所谓的依赖关系,就是两个相邻RDD之间的关系。
(3)RDD窄依赖
窄依赖通常会被比喻为独生子女,它表示每个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用。
(4)RDD宽依赖
宽依赖则会被比喻为多生,宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle。
(5)RDD任务划分
Spark中的DAG表示记录了RDD的转换过程和任务的阶段。
RDD阶段划分源码—stage
RDD任务划分中间会分为:Application、Job、Stage、Task
Application:初始化一个SparkContext即生成一个Application
Job:一个Action算子就会生成一个Job
Stage:Stage等于宽依赖(ShuffleDependency)的个数加一
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数
注意: Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
(1)RDD Cache缓存 cache()、persist()
存储级别
RDD通过Cache或Persist方法可以将前面的计算结果进行缓存,默认情况下会把数据缓存在JVM的堆内存中。但是并不是这两个方法被调用时就立即生效,而是在触发action算子时,该RDD才会被缓存在计算节点的内存中,以供后面重用。
缓存也有可能会丢失,或者存储于内存的数据可能会由于内存不足而被删除,而RDD的缓存容错机制就保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部的Partition。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
(2)RDD CheckPoint检查点 checkpoint()
检查点实际就是将RDD的中间结果写入磁盘。由于血缘关系过长会造成容错成本过高,这样就不如在中间阶段做检查点来进行容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,以此就减少了不少开销。
但同样,对RDD进行checkpoint操作并不会马上被执行,而是必须自行action算子之后才能被触发。
(3)缓存和检查点的区别
Cache缓存只会将数据保存起来,不切断血缘依赖。Checkpoint检查点会切断血缘依赖;Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高;建议对checkpoint的RDD使用Cache缓存,这样checkpoint的job只需从cache缓存中读取数据即可,否则需要再从头计算一次RDD;
Spark目前支持Hash分区和Range分区,划分用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入那个分区,进而决定了Reduce的个数。
注:只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None。每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余;
Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序;
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式及文件系统。
文件格式分为:text文件、csv文件、sequence文件以及Object文件
文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(FlatFile)。在 SparkContext中,可以调用 sequenceFilekeyClass, valueClass
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFileT:ClassTag函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。另外,因为是序列化所以要指定类型。
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
累加器,分布式共享只写变量。累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
分布到各个Executor端的累加器变量是不能互相访问的,只有Driver端才能进行读取或更改,因此回传到Driver端后需要进行merge。
如果仅用普通变量,Driver端会将数据传到Executor端进行计算,但是不会将数据进行回传,因此如果是普通变量,遇见类似闭包的问题,就无法对数据进行实质的操作。
注:少加:在转换算子中调用累加器,如果没有调用行动算子的话,也是不会执行的 ;多加:累加器是全局可用的,如果多次调用行动算子,是会多次执行的 也就是多故一般,累加器会放置在行动算子中进行操作;可自定义累加器;
广播变量,分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。
略
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。