赞
踩
大数据计算系统
大数据计算框架的几个要素 :
• 计算场景: 适用于何种任务使用?
• 抽象:程序员看到的框架是什么样的?
• API:程序员如何使用框架?
• 系统架构:系统有哪些模块?
• 基本数据操作:如何操作数据?如何高效实现?
• 流程优化:如何将一个计算任务转化为基本数据操作执行过程?
• 流程调度:如何有效执行基本数据操作?
• 数据存储机制:如何存储数据?
• 事务处理:如何确保计算过程正确地进行?
分别对于几种计算系统
Hadoop是一个开源的、可靠的、可扩展的分布式并 行计算框架,主要包括:
– MapReduce 离线大数据分析计算引擎
– HDFS – 分布式文件系统
– YARN– 任务执行调度资源管理框架
– Hbase – NoSQL数据库
–Hive – 分布式数据仓库
批处理
Map-Reduce 函数模型
MapReduce并行处理的基本过程 :
1.有一个待处理的大数据,被划分为大 小相同的数据块(如 64MB),及与此相应 的用户作业程序
2.系统中有一个负责调度的主节点(Master), 以及数据Map和 Reduce工作节点 (Worker)
3.用户作业程序提交 给主节点
4.主节点为作业程序 寻找和配备可用的 Map节点,并将程 序传送给map节点
5.主节点也为作业程 序寻找和配备可用的 Reduce节点,并将 程序传送给Reduce 节点
6.主节点启动每个 Map节点执行程序, 每个map节点尽可 能读取本地或本机 架的数据进行计算
7.每个Map节点处理读取 的数据块,并做一些数据整 理工作(combining, sorting等)并将中间结果 存放在本地;同时通知主 节点计算任务完成并告知 中间结果数据存储位置
8.主节点等所有Map节 点计算完成后,开始 启动Reduce节点运行; Reduce节点从主节点 所掌握的中间结果数 据位置信息,远程读 取这些数据
9.Reduce节点计算结果 汇总输出到一个结果文 件即获得整个处理结果
重载MapReduce函数
可以分为客户端节点,JobTasker节点,TaskTracker节点
如上图所示,一个 MapReduce程序指 定的一个完整计算过程在Hadoop里被称为一个作业(Job) ,而一个作业在执行过程中可以被拆分为若干 Map和Reduce任务完成 (最简单的wordcount中Map拆单词,Reduce按Map后的Key归并组装)
MapReduce框架中 进行并行计算的基本事务单元被称为任务(Task) ,分为Map 和Reduce 任务,一个作业(Job)通常 包含多个任务 (Task)
客户端节点是面向外部客户提供输入输出接口的节点,其中包含我们实现的MapReduce程序,和替程序与 MapReduce运行框 架交互的对象 ——JobClient
JobTracker 是MapReduce框架的总的管理者 ,它协调MapReduce作业,给下面工作节点分配任务和监控任务 ,同时由负责对接上面的客户端节点,类似于部门经理之类的活
TaskTracker 执行JobTracker分配的任务,是具体工作的执行者 ,类似公司里的程序员,按工作类型可以分为Map TaskTracker(前端程序员) 和Reduce TaskTracker (后端程序员)两类 ,下属具体的JVM和资源(类似公司的电脑)
流程优化:无 (额外的DAG模型生成工具)
流程调度:基础任务调度、Map与Reduce函数的执行
初始化:
然后进行作业调度
作业调度大致有先进先出(FIFO)公平(Fair)调度,能力(Capacity)调度器 三种
下面进行的就是任务分配 ,差不多就是创建TaskTracker,然后JobTracker与这些TaskTracker进行通信,保活检测等
等待前三部完成后,后面进行的就是具体的Spilt-Map-Shuffle-Reduce-Output过程了
比较值得说的就属于Shuffle
Shuffle顾名思义,洗牌,做的就是Map后进行文件排序和部分的合并(合并Map产生的中间数据),如下图所示:
HDFS
采用NameNode 和DataNode 的结构,NameNode 是整个文件系统的大脑,提供整个文件系统的目录信息,各个文件的分块信息,数据块的位置信息,并且 管理各个数据服务器。 DataNode 是数据服务器,分布式文件系统中的每一个文件,都被切分成若务器上
HDFS中每个文件都会被切分成若干个块(Block),默认64MB,每一 块都有连续的一段文件内容是存储的基本单位。客户端写文件的时候,不是一个字节一个字节写 入文件系统的,而是累计到一定数量后,往文件写入数个数据包(Packet )。 在每一个数据包中, 都会将数据切成更小的块 (Chunk )( 512 字节 )
HDFS读:
1、使用HDFS Client,向远程的Namenode发起RPC(远程过程调用)请求;
2、Namenode会视情况返回文件的部分或者全部block列表,对于每个block, Namenode都会返回有该block拷贝的datanode地址;
3-4、HDFS Client 选取离客户端最接近的datanode来读取block;
5、当读完列表的block后,如果文件读取还没有结束,客户端开发库会继续向Namenode 获取下一批的block列表。
6、读取完当前block的数据后,关闭与当前的datanode连接,并为读取下一个block寻找最佳的datanode
HDFS写:
1.HDFS Client 向远程的Namenode发起RPC请求;
2.Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行 操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;
3.当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并 在内部以"data queue"的形式管理这些packets,并向Namenode申请新 的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根 据在Namenode中对replication的设置而定。
4. 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把 packet以流的方式写入第一个datanode,该datanode把该packet存储之后, 再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这 种写数据的方式呈流水线的形式。
5. 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递 至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返 回的ack packet后会从"ack queue"移除相应的packet。
6. 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关 闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下 的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的 datanode,保持replicas设定的数量。
HDFS优点:支持扩展
HDFS不擅长:
就是讲Hadoop的容错机制
节点的容错:
主节点中会周期性地设置检查点(checkpoint),检查整个计算作业的执行情况,一旦某个任务失效,可以从最近有效 的检查点开始重新执行,避免从头开始计算的时间浪费。
工作节点失效是很普遍发生的,主节点会周期性地给工作节点发送检测命令,如果工作节点没有回应,这认为该工作节点失效,主节点将终止该工作节点的任务并把失效的任务 重新调度到其它工作节点上重新执行
TaskTracker的容错:
容错的恢复
仅供参考
MapReduce 1.0存在很多缺点 :
JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成 很大的内存开销。
在 TaskTracker 端,以map/reducetask 的数目作 为资源的表示过于简单,没有考虑到 cpu/ 内存的占 用情况。
MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会 强制进行系统级别的升级更新。强制让分布式集群系 统的每一个用户端同时更新。
因此升级到2.0,使用Yarn进行调度
Yarn里面主要可以分为三种模块:
ResourceManager : 调度、启动每一个 Job 所属的 ApplicationMaster、 另外监控 ApplicationMaster ,是Client和ApplicationMaster 之间交流的中间件
NodeManager :类似老版本的TaskTracker
ApplicationMaster :负责一个 Job 生命周期内的所有工作,类似老的框架 中 JobTracker
YARN 容错
ResourceManager
– 存在单点故障;
– 基于ZooKeeper实现。
NodeManager
–失败后,ResourceManager 将失败任务告诉对应的ApplicationMaster;
–ApplicationMaster决定如何处理失败的任务。
ApplicationMaster
–失败后,由ResourceManager重启;
–ApplicationMaster需处理内部任务的容错问题;
–ApplicationMaster会保存已经运行完成的Task, 重启后无需重新运行已经完成的工作。
在谈Spark之前,我们看看什么是大数据的批处理:
批处理模式中使用的数据集通常符合下列特征:
批处理适合
批处理不适合
再看一下Spark与Hadoop的对比
不难看出,MapReduce框架把中间结果写入到稳定存储 (比如磁盘)中,带来了大量的数据复制、磁盘IO和序列化开销;而Spark在进行处理时,中间内容和计算结果是常驻内存的,这就是Spark中的RDD机制
因为要理解Spark必须先理解其中的RDD机制,spark中的很多特性都和他有关,所以先谈RDD(弹性分布式数据集)
一个RDD是一个分布式对象集合,本质上是一个只读的分区记录集合 ,提供了一个抽象的数据架构,不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理 。
一个RDD的不同分区可以被保存到集群中不同的节点上, 从而可以在集群中的不同节点上进行并行计算
不同RDD之间的转换操作形成依赖关系,可以实现数据流水处理,避免中间数据存储
RDD提供了一种高度受限的共享内存模型
RDD的执行过程:
从上面可以看出RDD的优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单
计算的中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销
存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化
RDD的演化流程,以wordcount为例
sc=new SparkContext
rDD=sc.textfile(“ hdfs://…”)
rDD.filter(…)
rDD.Cache
rDD.Count
rDD.map
还是以最简单的WordCount举例
概念:
spark中 一个应用由一个Driver(SparkContext)和若干个作业构成,一个作业(Job )由多个阶段构成,一个阶段由多个没有Shuffle关系的任务组成
这里的作业( Job )是指一个作业包含多个RDD及作用于相应RDD上的各种操作 ,和Hadoop中的类似
而阶段( Stage )是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合, 代表了一组关联的、相互之间没有Shuffle(重新洗牌,就是在各个分布式系统上重新分布数据,为后面的Reduce过程节省一些操作)依赖关系的任务组 成的任务集
DAG:是Directed Acyclic Graph(有向无环图)的简称, 反映RDD之间的依赖关系 ,就是由初始的RDD逐个向后演化的过程
SparkContext:处于DriverProgram核心位置,所有与 Cluster、Worker Node交互的操作都需要SparkContext 来完成,即是包含用户的应用程序 ,
Cluster Manager:集群资源管理器,顾名思义,管理集群资源,集群资源管理器为task分配满足 要求的节点,并在节点按照要求创建Executor
Executor:是运行在工作节点(WorkerNode)的一个进程 ,负责运行Task
任务( Task ):运行在Executor上的工作单元
spark的启动流程:
- Spark的Driver Program (简称 Driver)获取来自用户的应用程序 ,完成task的解析和生成
- Driver向Cluster Manager(集 群资源管理器)申请运行task需 要的资源。
- 集群资源管理器为task分配满足 要求的节点,并在节点按照要求 创建Executor ,创建的Executor向Driver注册
- Driver将spark应用程序的代码 和文件传送给分配的executor
- executor运行task,运行完之 后将结果返回给Driver或者写入 HDFS或其他介质。
首先就要划分阶段,将Job划分为Stage,具体是根据DAG 图中的RDD 依赖关系,把一个作业分成多个阶段
阶段划分的依据是窄依赖和宽依赖:
这里不得不提窄依赖和宽依赖 (老师答疑时候也提了一下)
如上图所示:Map,操作都是窄依赖的,从定义上讲,对于Map由于映射是一对一或多对一的,一个或多个父RDD的分区对应于一个子RDD的分区,因此是窄依赖,fiter也类似;
从理解上谈,窄依赖就是一个父分区中的信息最多只会传输到一个子分区中,类似于映射的性质
宽依赖就是相反的的,一个父分区中的信息可以存在于多个子分区
举一个形象的例子:
因为宽依赖往往对应着shuffle操作(多对一,汇总,多节点),需要在运行过程中将同一个父RDD 的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换,可以实现“流水线”优化,而宽依赖无法实现“流水线”优化
从上面可以得出流程优化的方法:
一个优化的实例:
分区7通过map操作生成的分区9, 可以不用等待分区8到分区10这个 map操作的计算结束,而是直接继续进行union操作,得到分区13,这样流水线执行大大提高了计算的效率
就是spark的缓存机制,基于内存和磁盘的缓存
首先分区和数据块是一一对应的
在内部建立RDD分区和数据块之间的映射,需要读取缓存的RDD时, 根据映射关系取得分区对应的数据块
一个数据块对应着文件系统中的一个文件,文件名和块名称的映射 关系是通过哈希算法计算所得的
RDD本身维护着可以用来重建丢失分区的信息
RDD还有优秀的容错机制:
流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,实时性要求保证较低的延迟时间,达到秒级别,甚至是毫秒级别 ,相对于批处理用充裕时间处理静态数据,流数据必须采用实时计算,响应时间为秒级甚至更少
基于MapReduce模型的Hadoop很难满足时效性要求,启动本身是需要时间的:输入切分、调度、启动进程 等,在集群上共享Job也比较复杂,可能需要等待资源 ,而所有数据都需要读写磁盘 ,这些因素加一起导致了Hadoop不适合流计算
数据流处理,可用来实时处理新数据和更新数据库,兼具容错性和 可扩展性。
既然是流计算,就得和批处理的MapReduce不一样把
Storm将流数据Stream描述成一个无限的Tuple序列,这些Tuple序列会以分布式的方式并行地创建和处理 ,每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型
Spout水龙头:Storm认为每个Stream都有一个源头,并把这个源头抽象 为Spout ,通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装 成Tuple形式,发送到Stream中。Spout是一个主动的角色,在接口 内部有个nextTuple函数,Storm框架会不停的调用该函数
Bolt:Storm将Streams的状态转换过程抽象为Bolt。Bolt即可以处 理Tuple,也可以将处理后的Tuple作为新的Streams发送给其他Bolt ,可以执行过滤、函数操作、Join、操作数据库等任何操作 ,其接口中有一个execute(Tuple input)方法, 在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑
Topology:Storm将Spouts和Bolts组成的网络抽象成Topology, 它可以被提交到Storm集群执行。Topology可视为流转换图,图中 节点是一个Spout或Bolt,边则表示Bolt订阅了哪个Stream。当 Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该 Stream的Bolt上进行处理
(类似于前面的有向无环图DAG和Hadoop中的Job)
Topology里面的每一个组件都是并行运行的
了解一下接即可,差不多得了
Nimbus:主节点,是一个调度中心,负责分发任务 ,类似Hadoop中的JobTracker
Zookeeper:是完成Supervisor和Nimbus之间协调的服务,来作为分布式协调组件,负责Nimbus和多个 Supervisor之间的所有协调工作,若Nimbus 进程或Supervisor进程意外终止,重启时也能读取、恢复之前的状 态并继续工作,加入分布式协调组件使得Storm极其稳定
Supervisor:从节点,任务执行的地方 ,类似于TaskTracker
Worker:任务工作进程,一个Supervisor中可以有多个Worker。
Executor:Worker进程在执行任务时,会启动多个Executor线程
关键组件:
Stream Groupings: 定义了一个流在Bolt任务间该如何被切分。
这里有Storm提供 的6个Stream Grouping类型:
- 随机分组(Shuffle grouping):随机分发 tuple到Bolt的任务,保证每个任务获得相等数量 的tuple。
- 字段分组(Fields grouping):根据指定字段 分割数据流,并分组。例如,根据“user-id”字 段,相同“user-id”的元组总是分发到同一个任 务,不同“user-id”的元组可能分发到不同的任 务。
- 全部分组(All grouping):tuple被复制到bolt的所 有任务。这种类型需要谨慎使用。
- 全局分组(Global grouping):全部流都分配到 bolt的同一个任务。明确地说,是分配给ID最小的那个 task。
- 无分组(None grouping):你不需要关心流是如何 分组。目前,无分组等效于随机分组。但最终,Storm 将把无分组的Bolts放到Bolts或Spouts订阅它们的同一 线程去执行(如果可能)。
- 直接分组(Direct grouping):这是一个特别的分组 类型。元组生产者决定tuple由哪个元组处理者任务接收。
基于Topology进行持续的流式计算
一个wordcount的案例:
除此之外,Storm中有一类叫Acker的task,它会对tuple进行跟踪,并检测相应的spout tuple是否处理完成了。当一个tuple被创建时,不管是在Spout还是Bolt中创建,它都会被赋予一个tuple-id,这些tuple-id就是Acker用来跟踪每个spout 读入的tuple处理的生命周期(该tuple和其产生的所有tuple,也可以称为一棵tuple树)。如果一个spout 读入的tuple被完全处理了,它会给创建这个spout tuple的那个task发送一个成功消息,否则发送一个失败消息。
Strom的容错可以分为多种类型:
任务级失败
单机节点级故障
集群机器故障
实时计算,Spark Streaming是Spark核心API的一个扩展,可以 实现高吞吐量的、具备容错机制的实时流数据的处理
以一系列非常小的、确定的批处理作业的形式运行流计算 ,说白了还是spark模型,就是让他gkd而已
其本质来看还是spark,无法实现毫秒级的流计算,而 Storm可以实现毫秒级响应 ,但是相比于Storm,RDD数据集更容 易做高效的容错处理
Spark Streaming是将流式计算分解成一系列短小的批处理作业。
批处理引擎用Spark Core。
类似于spark的容错机制, RDD可以记住从原始的容错输入创建它的操作序列 ,如果丢失可以重新计算 ;而批量输入数据被复制到多个工作节点的内存中,相互之间是容错的 ,如果是工作人员故障而丢失的数据,可以从输入的数据开始重新计算
许多实际计算机问题会涉及到大型图 ,而MapReduce不适合图处理 , 这是因为MapReduce每一阶段都利用整个图的全部状态 ,需要整合MapReduce链 ,并行处理需要多次迭代,这导致MapReduce的迭代,影响到了整体的性能
Superstep:并行结点计算 ,图并行模式
对于图中的每一个节点,都需要接受上一个superstep发出的消息 ,执行相同的用户定义函数 ,来修改它的值或者其输出边的值 ,然后将消息送到其他点(由下一个superstep接受) ,以此来改变改变大图的拓扑结构
图并行模式的简要流程:
对于每个节点:
基于上面的抽象,Pregel 允许将图算法写成一系列的MapReduce调用,然后在执行计算的机器上保持顶点和边 , 用网状结构传输信息
Pregel系统也使用主/从模型
主节点
从节点
处理自己的任务
与其他从节点通信
从节点可以聚合它的节点报告的消息并整 合为一条消息发送 ,可以减少消息流量和硬盘占用
持久化的数据位于分布式存储系统(如GFS或BigTable) 中
临时数据存储在本地磁盘中
(一笔带过)
容错
引入检查点机制: – 主节点定期指示从节点将分区的状态保存到持久化存储中 ( 例如:顶点数值,边数值,信息内容 )
恢复
主节点将图形分区重新分配给当前可用的从节 点
所有worker都从最近可用的检查点重新加载分区状态
既然是基于spark的,还是RDD那套东西
就是将属性图转化为表
将图中的顶点分割开来,构建顶点表,边表,路由表(以RDD的形式储存)
spark
用 Map-Reduce三元组收集每个顶点的邻域信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。