赞
踩
目录
本文仅用于学习交流使用,感谢大家的阅读!
Spark是用于大规模数据处理的统一分析引擎。最初是由加州大学柏克莱分校AMPLab所开发。它提供了Scala,Java,Python和R中的高级API,以及优化的引擎,该引擎支持用于数据分析的通用计算图。它还支持一组丰富的高级工具,包括使用 SQL 处理结构化数据处理的 Spark SQL,用于机器学习的 MLlib,用于图计算的 GraphX,以及 Spark Streaming。
spark的一个主要特点是能够在内存中进行计算,及时依赖磁盘进行复杂的运算,相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中来说,Spark依然比MapReduce更加高效。
我们都知道Hadoop框架虽然优秀,但是也有天然的缺陷,所以Spark应运而生。比如说:
① MapReduce不能进行实时流计算。实时流处理场景需要数据处理的延迟比较低,需要达到秒级,甚至毫秒级,但是MapReduce处理时间是相对很长的。
② MapReduce不适用于交互式处理。比如说当收到查询请求,需要快速处理并返回结果,MapReduce不能快速的返回计算结果。
③MapReduce不适合处理迭代计算,迭代计算需要不断对Job调度执行,Job之间依赖度比较高,处理逻辑比较复杂。
④ MapReduce只支持Map和Reduce两种语义的操作,如果实现复杂的操作需要开发大量的Map和Reduce程序。
⑤ MapReduce处理效率低,耗费时间长,具体表现在以下几个方面:
- MapReduce是基于磁盘的,无论是MapReduce还是YARN都是将数据从磁盘中加载出来,经过DAG,然后重新写回到磁盘中。
- MapReduce计算过程中会经历Suffle阶段,Map段会将中间写入到磁盘,如果数据文件比较大,会多次写入到磁盘,产生很多小文件,当Map任务执行结束之后,会将这些小的文件合并成一个大的文件,要经历多次读取和写入磁盘的操作,非常耗时。如果一个计算逻辑需要多个MapReduce串联,后一个MapReduce任务依赖前一个MapReduce任务的计算结果,那么每个MapReduce任务计算完都会将计算结果写入到HDFS,多个MapReduce之间通过HDFS交换数据,效率非常低,耗时也很长。
- MapReduce任务的调度和启动开销也很大(由于MapReduce任务是通过进程的方式启动的),不能充分利用内存资源,而且Map端和Reduce端都需要对数据进行排序,由于MapReduce框架设计的特点决定了它在运行过程中的效率是非常低的,只能满足离线的批处理计算任务。
针对以上的一些介绍,所以MapReduce主要用于离线的分布式批处理计算,所以Spark应运而生。
① 运行速度快
Apache Spark官方介绍运行工作负载的速度提高了100倍。
Apache Spark使用最先进的DAG调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。
② 使用方便
提供了丰富的开发API,可以使用Java,Scala,Python,R和SQL快速编写应用程序。推荐谁用Scala来开发,因为Spark是使用Scala语言来开发的,实现相同的功能Scala所写的代码要比Java少很多。
集成批处理、流处理、交互式计算、机器学习算法、图计算。
③ 运行方式多样(后面会详细介绍)
(1)Application:表示你的应用程序。
(2)Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext。
(3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。
(4)Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。
(5)Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage。
(6)Job:包含多个Task组成的并行计算,是由Action行为触发的。
(7)Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage。
(8)DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系。
(9)TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。
注意:一个Job == 多个Stage;一个Stage == 多个同种Task。
RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),Spark计算都是基于RDD进行的。它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。
基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。
Partition:数据分区。即一个RDD的数据可以划分为多少个分区。
窄依赖(NarrowDependency):即子RDD依赖于父RDD中固定的Partition。Narrow-Dependency分为OneToOneDependency和RangeDependency两种。
宽依赖(ShuffleDependency):shuffle依赖,即子RDD对父RDD中的所有Partition都有依赖。
可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。
RDD特点包括:分区、只读、依赖、缓存。
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
4.2.1 分区
如下图所示,RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
4.2.2 只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了,如下图所示。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。下图是RDD所支持的操作算子列表。
4.2.3 依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
通过RDDs之间的这种依赖关系,一个任务流可以描述为DAG(有向无环图),如下图所示,在实际执行过程中宽依赖对应于Shuffle(图中的reduceByKey和join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中map和union可以一起执行)。
4.2.4 缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
Spark是一个master/slave架构的分布式系统,使用内存计算引擎,提供Cache缓存机制,将RDD缓存到内存或磁盘当中,支持迭代计算和多次数据共享,减少数据读取的IO开销。它的架构主要包含有以下五部分:
5.1.1 Application
用户编写的 Spark 程序,可以理解为两部分,包含了一个 Driver Program 和集群中多个的 Executor。即Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
5.1.2 Driver
简单的说就是运行Application 的main()函数。Driver的主要功能包括:
Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,通常用SparkContext代表Driver。
5.1.3 Executor
执行器,是为某个Application运行在worker node上的一个进程。
某个Application运行在worker节点上的一个进程, 该进程负责运行某些Task,每个Application的Executor数量可以通过配置指定(Static Allocation)或者有Spark动态分配(Dynamic Allocation)。Executor的主要功能包括:
每个Application都有各自独立的一批Executor, 在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象, 负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task, 这个每一个oarseGrainedExecutor Backend能并行运行Task的数量取决与分配给它的cpu个数。
5.1.4 Cluter Manager
指的是在集群上获取资源的外部服务。目前有四种类型
5.1.5 Worker
从节点,负责控制计算节点,启动Executor或者Driver。
集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点。
架构设计图如下:
(1)首先我们会编写Spark程序然后提交作业,作业提交了之后就会启动一个Driver并创建SparkContext(同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等),由SparkContext负责与ClusterManager通信,进行资源的申请和任务的调度,监控等。
(2)SparkContext会向资源管理器(可以是Standalone、Mesos或YARN)注册并申请Executor资源(Executor类似于Yarn中的Container),资源管理器收到资源申请之后就会为Executor分配资源并启动Executor,并且Executor会向SparkContext申请Task,Executor启动了之后会向资源管理器发送心跳汇报自己的运行情况。
(3)然后SparkContext会根据Action行为触发的Job并依据RDD之间依赖关系构建DAG图,并分解成Stage发送TaskSet给TaskScheduler。
(4)TaskScheduler会将对应的TaskSet提交到对应Executor的Worker(集群)上运行,同时SparkContext会将应用程序发送给Executor。
(5)然后Task会在Executor上运行,并向SparkContext汇报运行情况。
(6)运行完毕释放资源。
Standalone模式是Spark内部默认实现的一种集群管理模式,这种模式是通过集群中的Master来统一管理资源,而与Master进行资源请求协商的是Driver内部的StandaloneSchedulerBackend(实际上是其内部的StandaloneAppClient真正与Master通信)。
结合前面介绍的Spark运行模式来看看 Spark on Standalone的执行流程:
(1)SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);
(2)Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
(3)StandaloneExecutorBackend向SparkContext注册;
(4)SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
(5)StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
(6)所有Task完成后,SparkContext向Master注销,释放资源。
YARN模式下,可以将资源的管理统一交给YARN集群的ResourceManager去管理,选择这种模式,可以更大限度的适应企业内部已有的技术栈,如果企业内部已经在使用Hadoop技术构建大数据处理平台。
Spark On Yarn有两种模式,分别是 yarn-client 和 yarn-cluster。
6.2.1 yarn-client 运行流程
(1)Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
(2)ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
(3)Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
(4)一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
(5)Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6)应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
6.2.2 yarn-cluster 运行流程
(1)Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
(2)ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
(3)ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
(4)一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
(5)ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
(6)应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
YARN-Client 与 YARN-Cluster 区别
理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。
YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;
YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。
随着Apache Mesos的不断成熟,一些企业已经在尝试使用Mesos构建数据中心的操作系统(DCOS),Spark构建在Mesos之上,能够支持细粒度、粗粒度的资源调度策略(Mesos的优势),也可以更好地适应企业内部已有技术栈。
通常当需要处理的数据量超过了单机尺度(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算,有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这时我们也可以选择利用spark集群强大的计算资源,并行化地计算,其架构示意图如下:
7.1 Spark Core
Spark Core是大规模并行和分布式数据处理的基础引擎。 核心是分布式执行引擎,Java,Scala和Python API为分布式ETL应用程序开发提供了一个平台。包含Spark的基本功能,包含任务调度,内存管理,容错机制等,内部定义了RDDs(弹性分布式数据集),提供了很多APIs来创建和操作这些RDDs。为其他组件提供底层的服务。 在核心上构建的其他库允许用于流式传输,SQL和机器学习的各种工作负载。 它负责:
内存管理和故障恢复。
在群集上调度,分发和监视作业。
与存储系统交互。
7.2 Spark SQL
Spark处理结构化数据的库,就像Hive SQL,Mysql一样,企业中用来做报表统计。
Spark SQL是Spark中的一个新模块,它使用Spark编程API实现集成关系处理。 它支持通过SQL或Hive查询查询数据。 对于那些熟悉RDBMS的人来说,Spark SQL将很容易从之前的工具过渡到可以扩展传统关系数据处理的边界。
Spark SQL通过函数编程API集成关系处理。 此外,它为各种数据源提供支持,并且使用代码转换编织SQL查询,从而产生一个非常强大的工具。以下是Spark SQL的四个库。
Data Source API
DataFrame API
Interpreter & Optimizer
SQL Service
7.3 Spark Streaming
实时数据流处理组件,类似Storm。Spark Streaming提供了API来操作实时流数据。企业中用来从Kafka接收数据做实时统计。
7.4 MLlib
一个包含通用机器学习功能的包,Machine learning lib包含分类,聚类,回归等,还包括模型评估和数据导入。MLlib提供的上面这些方法,都支持集群上的横向扩展。
7.5 Graphx
处理图的库(例如,社交网络图),并进行图的并行计算。像Spark Streaming,Spark SQL一样,它也继承了RDD API。它提供了各种图的操作,和常用的图算法,例如PangeRank算法。
Spark生态系统组件的应用场景:
如果想对Spark有更深入的了解,推荐大家阅读Matei Zaharia的Spark论文。(有需要请下邮箱地址)
另外,很喜欢Spark官方文档中的一句话:一个人可以走的很快,但是一群人却可以走的更远。
大数据系列的其他文章:
大数据系列(四)之 MapReduce过程及shuffle详解
附:
Spark官方文档:https://spark.apache.org/docs/latest/
Spark官方中文文档:https://spark.apachecn.org
Spark Java API 文档:https://spark.apache.org/docs/latest/api/java/index.html
参考:
http://sharkdtu.com/posts/spark-rdd.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。