赞
踩
目录
Spark Core包含Spark最基础和最核心的功能,当提及Spark运行架构时,就是指Spark Core的运行架构。本节首先介绍Spark的基本概念和架构设计,然后介绍Spark运行基本流程,最后介RDD 的设计与运行原理。
在具体讲解 Spark 运行架构之前,需要先了解几个基本的概念。
(1) RDD:是 Resilient Distributed Dataset(弹性分布式数据集)的缩写,是 分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。基本概念和架构设。
RDD单词拆解:
Resilient :它是弹性的,RDD 里面的中的数据可以保存在内存中或者磁盘里面;
Distributed : 它里面的元素是分布式存储的,可以用于分布式计算;
Dataset: 它是一个集合,可以存放很多元素。
(2)DAG:是Directed Acyclic Graph (有向无环图)的缩写,反映了RDD之间的依赖关系。
(3)执行器(Executor):是运行在工作节点(Worker Node)上的一个进程,负责运行任务, 为应用程序存储数据。
(4)应用(Application):是用户编写的Spark应用程序。
(5)任务(Task):是运行在执行器上的工作单元。
(6)作业(Job):一个作业包含多个RDD及作用于相应RDD上的各种操作。
(7)阶段(Stage):是作业的基本调度单位,一个作业会被分为多组任务,每组任务被称为阶段,也被称为任务集。
如图所示,Spark 运行架构包括集群资源管理器(Cluster Manager)、运行任务的工作节点、每个应用的驱动器(Driver Program,或简称为Driver)和每个工作节点上负责具体任务的执行器。
其中,集群资源管理器可以是 Spark 自带的资源管理器,也可以是 YARN 或 Mesos 等资源管理框架;
执行器在集群内各工作节点上运行,它会与驱动器进行通信,并负责在工作节点上执行任务,在大多数部署模式中,每个工作节点上只有一个执行器。
每个工作节点WorkerNode,都会驻留Executor进程,每个进程派生出多个线程,每个线程具体执行相关任务。
从图中可以看出,就系统架构而言,Spark采用主从架构,包含一个 Master(即驱动器)和若干个 Worker。
与Hadoop MapReduce计算框架相比,Spark所采用的执行器有两个优点:
一是利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型),减少任务的启动开销;
二是执行器中有一个BlockManager存储模块,该存储模块会将内存和磁盘共同作为存储设备(默认使用内存,当内存不够时会使用磁盘),当需要多轮迭代计算时,可以将中间结果存储在这个存储模块里,下次需要时就可以直接读取该存储模块里的数据,而不需要读取 HDFS等文件系统的数据,因而能有效减少1/O开销,或者在交互式查询场景下,预先将表缓存在该存储模块上,从而提高读写1/0性能。
总体而言,在 Spark中,如图所示,一个应用由一个驱动器和若干个作业构成,一个作业由多个阶段构成,一个阶段由多个任务组成。
当执行一个应用时,驱动器会向集群资源管理器申请资源,启动执行器,并向执行器发送应用程序代码和文件,然后在执行器上执行。
执行结束后,执行结果会返回给驱动器,写到 HDFS 或者其他数据库中。
Spark程序的入口是驱动器中的SparkContext(指挥所中的指挥官)。与Spark 1.x 相比,从Spark 2.0开始,有一个变化是用SparkSession统一了与用户交互的接口, SparkContext成为SparkSession的成员变量。
Spark运行基本流程如下:
(1)当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由驱动器创建一个SparkContext对象,
由SparkContext负责和集群资源管理器的通信以及进行资源的申请、任务的分配和监控等,SparkContext会向集群资源管理器注册并申请运行执行器的资源。我们可以将SparkContext看成应用程序连接集群的通道。
(2)集群资源管理器为执行器分配资源,并启动执行器进程,执行器运行情况将随着“心跳”发送到集群资源管理器上。
(3)SparkContext根据RDD的依赖关系构建DAG
将DAG提交给DAG调度器(DAGScheduler)进行解析,再将DAG分解成多个阶段,并且计算出各个阶段之间的依赖关系,然后把这些阶段提交给底层的任务调度器(TaskScheduler)进行处理;
执行器向SparkContext申请任务,任务调度器将任务分发给执行器运行,同时,SparkContext将应用程序代码发放给执行器。
任务分配原则:计算向数据靠拢
(4)任务在执行器上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
总体而言,Spark运行架构具有以下几个特点。
(1)每个应用都有自己专属的执行器进程,并且该进程在应用运行期间一直驻留。执行器进程以多线程的方式运行任务,减少了多进程任务频繁启动的开销,使任务执行变得非常高效和可靠。
(2)Spark运行基本流程与集群资源管理器无关,只要能够获取执行器进程并保持通信即可。
(3)执行器上有一个BlockManager存储模块,类似于键值对存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入HDFS等文件系统,而是直接把中间结果放在这个存储模块上,后续有需要时就可以直接读取:在交互式查询场景下,也可以把表提前缓存在这个存储模块上,从而提高读写1/0性能。
(4)任务采用了数据本地性和延时调度等优化机制。数据本地性是指尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。此外,Spark采用了延时调度机制,可以在更大程度上实现对执行过程的优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么调度就会等待,直到当前节点可用。
Spark Core是建立在统一的抽象RDD之上的,这使Spark的各个组件可以无缝进行集成,以便在同一个应用程序中完成大数据计算任务。
RDD的设计理念源自AMP实验室发表的论文《弹性分布式数据集:内存集群计算的容错抽象》。
在实际应用中,存在许多迭代式算法(如机器学习、图算法等)和交互式数据挖掘工具,它们的应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。
MapReduce框架把中间结果写入HDFS中,这带来了大量的数据复制、磁盘1/O和序列化开销。虽然类似Pregel等的图计算框架也将结果保存在内存当中,但是这些框架只能支持一些特定的计算模式,并没有提供一种通用的数据抽象。
RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作,不同RDD之间的转换操作会使它们形成依赖关系
可以实现管道(Pipeline)化,从而避免对中间结果的存储,大大降低数据复制、磁盘 1/0 和序列化开销。
避免数据落地(磁盘或者其他存储器):将上一个操作的输出作为下一个操作的输入。
一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,
每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中的不同节点上,从而可以在集群中的不同节点上进行并行计算。
RDD 提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,
只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map()、join()和groupBy()等)而创建得到新的RDD,
即
RDD提供了一组丰富的操作以支持常见的数据运算,分为动作(Action)和转换(Transformation )两种类型,前者用于执行计算并指定输出的形式,后者用于指定RDD之间的相互依赖关系。
两类操作的主要区别是,转换操作(比如map()、filter()、 groupBy()、join()等)接收RDD并返回RDD,而行动操作(如count()、 collect()等)接收RDD但是返回非RDD (即输出一个值或结果)。
RDD提供的转换操作非常简单,都是类似map()、filter()、groupBy()、join()等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。
因此,RDD比较适用于对数据集中的元素执行相同操作的批处理式应用,而不适用于需要异步、细粒度状态的应用,如Web应用系统、增量式的网页爬虫等。
正因为这样,这种粗粒度转换操作设计,会使人直觉上认为RDD的功能很受限、不够强大。但是,实际上RDD已经被实践证明可以很好地应用于许多并行计算应用中,可以具备很多现有计算框架(如MapReduce、SQL、Pregel等)的表达能力,并且可以应用于这些计算框架处理不了的交互式数据挖掘应用。
Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作。
RDD 典型的执行过程如下:
(1)读入外部数据源(或者内存中的集合)进行 RDD 创建;
(2)RDD 经过一系列的转换操作,每一次都会产生不同的 RDD,供给下一次转换操作使用;
(3)最后一个RDD经行动操作进行处理,并输出到外部数据源(或者变成Scala集合或标量)。
需要说明的是, Spark采用了惰性机制,即在 RDD的执行过程中,如图所示,真正的计算发生在RDD的行动操作中,对于行动操作之前的所有转换操作,Spark只记录转换操作所使用的一些基础数据集以及RDD生成的轨迹(即RDD之间的依赖关系),而不会触发真正的计算。转换操作被记录下来以后,Spark在后续生成执行计划时可以重新安排这些转换操作,比如合并多个转换操作,或者优化为不同的执行阶段来提高执行效率。
例如,在下图中,从输入中逻辑上生成 A 和 C 两个RDD,经过一系列转换操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生, Spark只记录了RDD之间的生成和依赖关系,也就是得到DAG。当F要进行计算、输出时,也就是当遇到针对F的行动操作时,Spark才会生成一个作业,向DAG调度器提交作业,触发从起点开始的真正的计算。
上述这一系列的处理称为一个“血缘关系”(Lineage),即DAG拓扑排序的结果。采用惰性机制以后,通过血缘关系连接起来的一系列RDD操作就可以实现管道化,避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间结果,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间结果,而是直接管道式地流入下一个操作进行处理。同时,这种通过血缘关系把一系列操作进行管道化连接的设计方式,也使管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。
总体而言,Spark 采用 RDD 以后能够实现高效计算的主要原因有以下几点。
(1)高效的容错性。
现有的分布式共享内存、键值对存储系统、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对数据密集型应用而言会带来很大的开销。
在RDD的设计中,数据只读,不可修改,如果需要修改数据,必须从父RDD转换到子RDD,由此在不同RDD之间建立血缘关系。所以,RDD是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(如检查点)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无须回滚整个系统,这样就避免了数据复制的高开销,而且重新计算过程可以在不同节点中并行进行,实现了高效的容错。
此外,RDD提供的转换操作都是一些粗粒度的操作(如map(), filter()和join()等), RDD之间的依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(如对哪个数据项进行了修改等),这就大大降低了数据密集型应用中的容错开销。
(2)中间结果持久化到内存。
数据在内存中的多个RDD 之间进行传递,不需要“落地”到磁盘上,避免了不必要的读写磁盘开销。
(3)存放的数据可以是 Java 对象,避免了不必要的对象序列化和反序列化开销。
RDD中不同的操作,会使不同RDD之间产生不同的依赖关系。
DAG调度器 根据RDD之间的依赖关系,把DAG划分成若干个阶段。
RDD之间的依赖关系分为 窄依赖( Narrow Dependency)与宽依赖(Wide Dependency),二者的主要区别在于是 否包含 Shuffle 过程。
(1)Shuffle 过程
RDD运行原理(RDD 概念与特性)Spark中的一些操作会触发Shuffle过程,这个过程涉及数据的重新分发,因此会产生大量的磁盘I/O和网络开销。
这里以reduceByKey(func)操作为例介绍Shuffle过程。
在reduceByKey(func)操作中,对于所有(key,value)形式的RDD元素,所有具有相同key的RDD元素的value会被归并,得到(key,value-list)的形式,然后对value-list使用函数func()计算得到聚合值,比如,("hadoop",1)、("hadoop",1)和("hadoop",1)这3个键值对,会被归并成("hadoop",(1,1,1))的形式,如果func()是一个求和函数,则可以计算得到汇总结果("hadoop",3)这里的问题是,对于与一个 key 关联的 value-list, 这个 value-list 里面可能包含很多的 value,而这些value一般会分布在多个分区里,并且散布在不同的机器上。但是,对Spark而言,在执行reduceByKey(func)操作时,必须把与某个 key关联的所有 value都发送到同一台机器上。
下图所示是一个关于Shuffle过程的简单实例,假设这里在3台不同的机器上有3个Map任务,即Map1、Map2和Map3,它们分别从输入的文本文件中读取数据执行map()操作并得到了中间结果,为简化起见,这里让3个Map任务输出的中间结果都相同,即("a",1)、(“b",1)和(”c",1)。
现在要把 Map 的输出结果发送到 3 个不同的 Reduce任务中进行处理,Reducel、Reduce2 和 Reduce3 分别运行在 3 台不同的机器上,并且假设 Reducel任务专门负责处理 key为"a"的键值对的词频统计工作, Reduce2 任务专门负责处理 key为"b"的键值对的词频统计工作,Reduce3任务专门负责处理key为"c"的键值对的词频统计工作。
这时, Map1必须把("a",1)发送到Reduce1,把("b",1)发送到Reduce2,把("c",1)发送到 Reduce3,同理, Map2和 Map3也必须完成同样的工作,这个过程就被称为"Shuffle"。 可以看出, Shuffle过程(即把Map输出的中间结果分发到Reduce任务所在的机器)会产生大量的网络数据分发,带来高昂的网络传输开销。
Shuffle 过程不仅会产生大量网络传输开销,也会带来大量的磁盘 I/O 开销。Spark 经常被认为是基于内存的计算框架,为什么也会产生磁盘1/O开销呢?
对于这个问题,这里有必要做出解释。
在Hadoop MapReduce框架中,Shuffle是连接Map和Reduce的“桥梁”, Map的输出结果需要经过Shuffle过程后,也就是经过数据分类后再交给Reduce处理,因此, Shuffle过程的性能高低直接影响了整个程序的性能和吞吐量。
所谓Shuffle过程,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。因此, MapReduce的Shuffle过程分为Map端的Shuffle过程和Reduce端的Shuffle过程,如图所示,主要执行以下操作。
① Map端的Shuffle过程。
Map的输出结果先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要对缓存中的数据进行分区,不同分区的数据发送给不同的Reduce任务进行处理,然后对每个分区的数据进行排序(Sort)和合并(Combine),之后再写入磁盘。每次溢写操作都会生成一个新的文件,随着Map任务的执行,磁盘中就会有多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并(Merge)成一个大的文件,然后由相应的Reduce任务来领取自己要处理的那个分区的数据。
② Reduce端的Shuffle过程。
Reduce端从Map端的不同Map任务处领回自己要处理的那部分数据,在对数据进行归并后交给Reduce任务处理。Spark 作为 MapReduce 框架的一种改进,自然也实现了 Shuffle过程的逻辑,如图所示。
首先,在 Map 端的 Shuffle 写入(Shuffle Write)方面,每一个Map 任务(Map Task)会根据 Reduce 任务(Reduce Task)的数量创建相应的桶(Bucket),桶的数量是m*r,其中m是Map任务的个数,r是Reduce任务的个数。
Map任务产生的结果会根据设置的分区(Partition)算法填充到每个桶中。 分区算法可以自定义,也可以采用系统默认的算法,默认的算法会根据每个键值对(key,value)的key, 把键值对哈希到不同的桶中去。当Reduce任务启动时,它会根据自己的 id和所依赖的Map任务的 id从远端或是本地取得相应的桶,并将该桶作为Reduce任务的输入进行处理。这里的桶是一个抽象概念,在实现中每个桶可以对应一个文件,也可以对应文件的一部分。但是,从性能角度而言,每个桶对应一个文件的实现方式会导致Shuffle过程生成过多的文件。
例如,如果有1000个Map任务和1000个Reduce任务,就会生成100万个文件,这样会给文件系统带来沉重的负担。所以,在最新的Spark版本中,采用了多个桶写入一个文件的方式,如图所示。
每个Map任务不会为每个Reduce任务单独生成一个文件,而是把每个Map任务所有的输出数据都写到一个文件中。因为每个Map任务中的数据会被分区,所以使用索引(Index)文件来存储具体Map任务的输出数据在同一个文件中如何被分区的信息。
Shuffle过程中每个Map任务会产生两个文件,即数据文件和索引文件,其中数据文件存储当前 Map 任务的输出结果,而索引文件则存储数据文件中数据的分区信息。下一个阶段的 Reduce任务就是根据索引文件来获取自己要处理的那个分区的数据。
其次,在 Reduce 端的 Shuffle 读取(Shuffle Fetch) 方面,在 Hadoop MapReduce 的 Shuffle 过程中,在Reduce端, Reduce任务会到各个Map任务那里把自己要处理的数据都下载到本地,并对下载过来的数据进行归并和排序,使相同 key 的不同 value按序归并到一起,以供使用。
这个归并和排序的过程,在Spark中是如何实现的呢?
虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改进。Spark假定在大多数应用场景中, Shuffle数据的排序操作不是必须的,比如在进行词频统计时,如果强制地进行排序,只会使性能变差,因此,Spark 并不在Reduce端做归并和排序,而是采用称为Aggregator的机制。Aggregator本质上是一个HashMap,里面的每个元素是<K,V>形式的。
以词频统计为例,它会将从Map端下载的每一个(key,value)更新或插入HashMap中,若在HashMap中没有查找到这个key,则把这个(key,value)插入其中,若查找到这个key,则把value的值累加到V上去。这样就不需要预先对所有的(key,value)进行归并和排序,而是“来一个处理一个”、避免了外部排序这一步骤。
但同时需要注意的是,Reduce任务所拥有的内存必须足以存放自己要处理的所有 key 和 value,否则会产生内存溢出问题。因此,Spark 文档中建议用户在涉及这类操作时尽量增加分区的数量,也就是增加Map和Reduce任务的数量。
增加Map和Reduce任务的数量虽然可以减小分区的大小,使内存可以容纳分区。但是,在Shuffle写入环节,桶的数量是由Map和Reduce任务的数量决定的,任务越多,桶的数量就越多,就需要更多的缓冲区(Buffer),带来更多的内存消耗。
因此,在内存使用方面,我们会陷入一个两难的境地,一方面,为了减少内存的使用,需要采取增加Map和Reduce任务数量的策略;另一方面, Map和Reduce任务数量的增加,又会带来内存开销更大的问题。
最终,为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行。也就是说,尽管Spark经常被称为基于内存的分布式计算框架,但是,它的Shuffle过程依然需要把数据写入磁盘。
(2)窄依赖和宽依赖
以是否包含 Shuffle 过程为判断依据,RDD 中的依赖关系可以分为窄依赖与宽依赖。
其中,窄依赖不包含Shuffle过程,宽依赖则包含 Shuffle 过程。
下图展示了两种依赖之间的区别。
窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,
或多个父RDD 的分区对应于一个子 RDD 的分区。
例如,图 (a)窄依赖中,RDD1 是 RDD2 的父RDD, RDD2 是子RDD, RDD1的分区1对应于RDD2的一个分区(即分区4); RDD6和RDD7都是RDD8的父RDD, RDD6的分区15和RDD7的分区18都对应于RDD8的分区 21。
RDD运行原理(RDD 之间的依赖关系)宽依赖则表现为存在一个父 RDD 的一个分区对应一个子 RDD 的多个分区。
例如,图 (b)宽依赖中,RDD9是RDD12的父RDD, RDD9中的分区24对应于RDD12中的两个分区(即分区27 和分区 28)。
总体而言,如果父RDD的一个分区只被一个子RDD的一个分区所使用就表示窄依赖,否则表示宽依赖。
窄依赖的典型操作包括map()、filter()、 union()等,不会包含Shuffle过程;宽依赖的典型操作包括groupByKey()等,通常会包含Shuffle过程。
对于连接 (join()) 操作,可以分为两种情况。
①对输入做协同划分,属于窄依赖,如图(a)所示。
所谓协同划分(Co-Partitioned),是指多个父RDD的某一分区的所有键(key)都落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区落在子RDD的两个分区的情况。
② 对输入做非协同划分,属于宽依赖,如图 (b)所示。
Spark的这种依赖关系设计,使其具有了天生的容错性,大大加快了Spark的执行速度。
因为RDD通过血缘关系记住了它是如何从其他RDD中演变过来的,血缘关系记录的是粗粒度的转换操作行为,当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的分区数据,由此带来了性能的提升。
相对而言,在两种依赖关系中,窄依赖的故障恢复更为高效,它只需要根据父RDD分区重新计算丢失的分区即可(不需要重新计算所有分区),而且可以并行地在不同节点进行重新计算。
而对宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父RDD分区,开销较大。此外, Spark还提供了数据检查点和记录日志,用于持久化中间RDD,使在进行故障恢复时不需要追溯到最开始的阶段。在进行故障恢复时,Spark会对数据检查点开销和重新计算 RDD 分区的开销进行比较,从而自动选择最优的恢复策略。
Spark根据DAG中的RDD间的依赖关系,把一个作业分成多个阶段。对宽依赖和窄依赖而言,窄依赖对于作业的优化更有利。
逻辑上,每个RDD 操作都是基于fork/join (一种用于并行执行任务的框架)的,把计算fork到每个RDD分区完成计算后对各个分区得到的结果进行join()操作,然后fork/join下一个RDD操作。
如果把一个Spark作业直接翻译到物理实现(即执行完一个RDD操作再继续执行另外一个RDD操作),则是很不经济的。
首先,每一个RDD (即使是中间结果)都需要保存到内存或磁盘中,时间和空间开销大;其次, join()作为全局的路障(Barrier),代价是很高昂的,所有分区上的计算都要完成以后,才能进行join()得到结果,这样作业执行进度就会严重受制于最慢的那个节点。
如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,即把两个fork/join合并为一个;如果连续的变换操作序列都是窄依赖,就可以把很多个fork/join合并为一个,通过这种合并 不但减少了大量的全局路障,而日无须保存很多中间结果RDD,这样可以极大地提升性能。
在Spark中,这个合并过程被称为“流水线(Pipeline)优化”。可以看出,只有窄依赖可以实现流水线优化。
对于窄依赖的 RDD,可以以流水线的方式计算所有父RDD分区数据,不会造成网络之间的数据混合。
对于宽依赖的RDD,则通常伴随着Shuffle过程,即首先需要计算好所有父RDD分区数据,然后在节点之间进行Shuffle,这个过程会涉及不同任务之间的等待,无法实现流水线化处理。
因此, RDD之间的依赖关系就成为把DAG划分成不同阶段的依据。
Spark通过分析各个RDD之间的依赖关系生成DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法:
在DAG中进行反向解析,遇到宽依赖就断开(因为宽依赖涉及Shuffle过程,无法实现流水线化处理),遇到窄依赖就把当前的RDD加入当前的阶段中(因为窄依赖不会涉及Shuffle过程,可以实现流水线化处理)。
如图所示,假设从 HDFS中读入数据生成3个不同的RDD (即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS。
对DAG 进行解析时,在依赖图中进行反向解析,由于从A 到 B 的转换以及从 B 和 F 到 G 的转换都属于宽依赖,因此在宽依赖处断开后可以得到3个阶段,即阶段1、阶段2和阶段3。
可以看出,在阶段2中,从map()到union()都是窄依赖,这两步操作可以形成一个流水线化处理。
例如,分区7通过map()操作生成的分区9,可以不用等待分区8到分区10这个转换操作的计算结束,而是继续进行 union()操作,转换得到分区 13,这样流水线化处理大大提高了计算的效率。
由上述论述可知,把一个DAG划分成多个阶段以后,每个阶段都代表了由一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合。每个阶段都会被提交给任务调度器进行处理,由任务调度器将任务分发给执行器运行。
RDD 运行过程通过上述对RDD概念、依赖关系和阶段划分的介绍,结合之前介绍的Spark运行基本流程,这里再总结一下 RDD 在 Spark 架构中的运行过程(见图 ):
(1)创建 RDD 对象;
(2)计算 RDD 之间的依赖关系,构建 DAG;
(3)DAG 调度器负责把 DAG 分解成多个阶段,每个阶段中包含多个任务每个任务会被任务调度器分发给各个工作节点上的执行器去执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。