赞
踩
目录
特点:
① 保存多个副本,且提供容错机制,副本丢失或宕机自动恢复。默认存3份。
② 运行在廉价的机器上。
③ 适合大数据的处理。HDFS默认会将文件分割成block,128M为1个block(负载均衡)。然后将block按键值对存储在HDFS上,并将键值对的映射存到内存中。如果小文件太多,那内存的负担会很重。
基本知识:
hdfs:hadoop分布式文件系统。
- 1.HDFS集群分为两大角色:NameNode、多个DataNode(主从架构)
- 2.NameNode负责管理整个文件系统的元数据(文件名,文件目录结构,文件创建时间,文件副本数,文件权限,每个文件的block列表)
- 3.DataNode 负责管理用户的文件数据块
- 4.文件会按照固定的大小(blocksize,默认128M)切成若干块后分布式存储在若干台datanode上
- 5.每一个文件块可以有多个副本,并存放在不同的datanode上
- 6.Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
- 7.HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行
NameNode
- 负责文件元数据信息的操作以及客户端的请求
- 管理HDFS文件系统的命名空间
- 维护文件树中所有的文件和文件夹的元数据信息以及文件到快的对应关系和块到节点的对应关系
- 单个NameNode支持4000台DataNode集群
- NameNode在内存中保存着整个文件系统的名字空间和文件数据块的地址映射
DataNode
- 处理文件内容的读写请求
- 一个数据快在DataNode以文件存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据,包括数据块的长度,块数据的校验和,以及时间戳
- DataNode启动后,周期性的向NameNode上报所有的块信息。
- 每3秒保持一次心跳,如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
- Block数据块是HDFS文件系统基本的存储单位,文件划分成块,默认大小128M,以块为单位,每个块有多个副本(默认3个)存储不同的机器上。
- 一般1 台机器部署1 个DataNode
- 写数据步骤详解
1、Client客户端向Namenode通信请求上传文件,Namenode检查目标文件是否已存在,父目录是否存在
2、Namenode返回是否可以上传
3、Client请求第一个 block该传输到哪些Datanode服务器上
4、Namenode返回3个Datanode服务器ABC
5、Client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将真个pipeline建立完成,逐级返回客户端
6、Client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
7、当一个block传输完成之后,Client再次请求Namenode上传第二个block的服务器。(重复上述步骤,依次存储N 个block)
- 读数据步骤详解
1、跟namenode通信查询元数据,找到文件块所在的datanode服务器
2、挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流
3、datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)
4、客户端以packet为单位接收,现在本地缓存,然后写入目标文件
MapReduce是面向大数据并行处理的计算模型、框架和平台。对存储在HDFS分布式文件系统的数据进行分布式计算。
split 是mapreduce 的最小计算单位,默认对应于 block
n 个split对应 n 个 Mapper Task ,m 个 reducer Task 对应 m 个 文件
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task。
(1)Client:
(2)JobTracker:
(3)TaskTracker:
(4)Task:
MapReduce存在的问题
JobTracker访问压力大,影响系统可扩展性
难以支持除MapReduce计算框架之外的计算框架、比如spark、storm
ResourceManager(RM)
RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,AsM)。
调度器
调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。
需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。
应用程序管理器
应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。
NodeManager(NM)
NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。
ApplicationMaster(AM)
用户提交的应用程序均包含一个AM,负责为应用程序申请资源并分配给内部的任务,应用的监控,跟踪应用执行状态,重启失败任务等。ApplicationMaster是应用框架,它负责向ResourceManager协调资源,并且与NodeManager协同工作完成Task的执行和监控。MapReduce就是原生支持的一种框架,可以在YARN上运行Mapreduce作业。有很多分布式应用都开发了对应的应用程序框架,用于在YARN上运行任务,例如Spark,Storm等。如果需要,我们也可以自己写一个符合规范的YARN application。
Container
Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。每个Container可以根据需要运行ApplicationMaster、Map、Reduce或者任意的程序。
运行过程:
用户向YARN中提交应用程序,其中包括AM程序、启动AM的命令、命令参数、用户程序等;事实上,需要准确描述运行ApplicationMaster的unix进程的所有信息。提交工作通常由YarnClient来完成。
RM为该应用程序分配第一个Container,并与对应的NM通信,要求它在这个Container中启动AM;
AM首先向RM注册,这样用户可以直接通过RM査看应用程序的运行状态,运行状态通过 AMRMClientAsync.CallbackHandler的getProgress() 方法来传递给RM。 然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4〜7;
AM采用轮询的方式通过RPC协议向RM申请和领取资源;资源的协调通过 AMRMClientAsync异步完成,相应的处理方法封装在AMRMClientAsync.CallbackHandler中。
—旦AM申请到资源后,便与对应的NM通信,要求它启动任务;通常需要指定一个ContainerLaunchContext,提供Container启动时需要的信息。
NM为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务;
各个任务通过某个RPC协议向AM汇报自己的状态和进度,以让AM随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;ApplicationMaster与NM的通信通过NMClientAsync object来完成,容器的所有事件通过NMClientAsync.CallbackHandler来处理。例如启动、状态更新、停止等。
应用程序运行完成后,AM向RM注销并关闭自己。
工作流程:
mapreduce执行阶段:
shuffle 过程:
1. 分区: 决定当前的 key 由那个 reducer 处理,相同的key 由 同一个reducer 处理。默认: 由 key 的hash 值,对reducer 个数取余 ;
2. 分组:依据相同的 key 将value 合并,key 值相等则分到同一个组中
3. 排序:按照key 的字典顺序排序
Map端的shuffle 过程:
1. spill 溢写。 每一个 map 后将结果放入环形内存缓存区(默认内存:100M)
2. 分区:依据key 值的 哈希函数 进行分区(相同的key 会分到同一个区,决定那个key 由那个 reducer 处理)
如:
hadoop 1 reducer0
hi ve 1 reducer1
hadoop 1 reducer0
3. 排序: 将相同分区的数据在区内进行排序
hadoop 1 reducer0
hadoop 1 reducer0
hi ve 1 reducer1
4.当环形内存缓冲区达到阈值 80% 时,开始溢写。将分区排序后的数据溢写到磁盘中变为文件 file1 ,最终生成多个文件
5. merge 归并:将spill 生产的文件归并,并排序,依据分区和字典排序
6. map task 结束,通知appmaster ,reducer来拉取数据
合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
Reduce端的shuffle 过程:
1. reducer 启动多个线程拉取属于自己分区的数据(多个map task 结果的数据)
2. 对于分区内的数据 依据key 值进行排序
3. 分组合并:对相同的key 的value 进行合并 如: hadoop, list<1, 1>
https://cshihong.github.io/2018/05/11/MapReduce%E6%8A%80%E6%9C%AF%E5%8E%9F%E7%90%86/
对比Hadoop:
性能上提升高于100倍。
Spark的中间数据存放在内存中,对于迭代运算的效率更高,进行批处理时更高效。
更低的延时。
Spark提供更多的数据操作类型,编程模型比Hadoop更灵活,开发效率更高。
更高的容错能力(血统机制)。
Hadoop存在如下一些缺点:
表达能力有限
磁盘IO开销大
延迟高
任务之间的衔接涉及IO开销
在前一个任务执行完成之前,其他任务就无法开始,难以胜任复杂、多阶段的计算任务
Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题。
相比于Hadoop MapReduce,Spark主要具有如下优点:
Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比Hadoop MapReduce更灵活。
Spark提供了内存计算,可将中间结果放到内存中,对于迭代运算效率更高 Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制。
Spark是一种开源的分布式并行计算框架,Spark是基于内存迭代计算的,可以分为N个阶段,一个阶段完了可以继续下一阶段的处理,而且Spark作业的中间结果可以保存到内存中,不用再频繁去HDFS或其它数据源读取数据。
Spark 的主要特点:
(1)提供 Cache 机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的 IO 开销;
(2)提供了一套支持 DAG 图的分布式并行计算的编程框架,减少多次计算之间中间结果写到 Hdfs 的开销;
(3)使用多线程池模型减少 Task 启动开稍, shuffle 过程中避免不必要的 sort 操作并减少磁盘 IO 操作。(Hadoop 的 Map 和 reduce 之间的 shuffle 需要 sort)
spark 框架:
- 构建Spark Application的运行环境,启动SparkContext
- SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend,
- Executor 向 SparkContext申请Task
- SparkContext将应用程序分发给Executor
- SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
- Task在Executor上运行,运行完释放所有资源
- Spark运行特点:
- 每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行Task。这种Application隔离机制是有优势的,无论是从调度角度看(每个Driver调度他自己的任务),还是从运行角度看(来自不同Application的Task运行在不同JVM中),当然这样意味着Spark Application不能跨应用程序共享数据,除非将数据写入外部存储系统
- Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了
- 提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换
- Task采用了数据本地性和推测执行的优化机制
RDD 分区是为了计算而不是储存的。
hdfs-block位于存储空间;spark-partition位于计算空间;
hdfs-block的大小是固定的;spark-partition大小是不固定的;
hdfs-block是有冗余的、不会轻易丢失;spark-partition(RDD)没有冗余设计、丢失之后重新计算得到;
RDD 的操作函数(operation)主要分为2种类型 Transformation 和 Action:
Transformation算子 | Map,filter,groupBy,join, union,reduce,sort,partitionBy | 返回值还是 RDD ,不会马上 提交 Spark 集群运行 |
Action算子 | count,collect,take,save, show | 返回值不是 RDD ,会形成 DAG 图,提交 Spark 集群运行 并立即返回结果 |
Transformation 操作不是马上提交 Spark 集群执行的,Spark 在遇到 Transformation 操作时只会记录需要这样的操作,并不会去执行,需要等到有 Action 操作的时候才会真正启动计算过程进行计算.针对每个 Action,Spark 会生成一个 Job, 从数据的创建开始,经过 Transformation, 结尾是 Action 操作.这些操作对应形成一个有向无环图(DAG),形成 DAG 的先决条件是最后的函数操作是一个Action.
RDD 运行流程:
调度任务 将各阶段划分成不同的 任务 (task) ,每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所以必须等当前阶段所有结果数据都计算出来了才能继续
shuffle 是划分 DAG 中 stage 的标志,同时影响 Spark 执行速度的关键步骤. (如何划分 stage )
RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.
窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作.
宽依赖会发生 shuffle 操作.,宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片(不能进行流水化优化)
窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,(可以进行多个分片的并行化处理)
宽依赖:
窄依赖:
依据算子划分stage
(join 需要针对同一个 key 合并,所以需要 shuffle)
运行到每个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中通过网络按照 Task 去读取数据。这些操作会导致很重的网络以及磁盘的I/O,所以 stage 的边界是非常占资源的,在编写 Spark 程序的时候需要尽量避免的 。父 stage 中 partition 个数与子 stage 的 partition 个数可能不同,所以那些产生 stage 边界的 Transformation 常常需要接受一个 numPartition 的参数来觉得子 stage 中的数据将被切分为多少个 partition。
PS:shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗
————————————————
原文链接:https://blog.csdn.net/databatman/article/details/53023818
由于数据量很大,因此要它被切分并存储在各个结点的分区当中。从而当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。
从HDFS 文件中读取:
Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
合理设置分区数量:
1、分区数越多越好吗?
不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
2、分区数太少会有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。
3、合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor
一般合理的分区数设置为总核数的2~3倍
为什么需要持久化: Spark中对于一个RDD执行多次算子(函数操作)的默认原理是这样的(惰性计算):每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。
因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。
调用 persist() 方法对RDD 持久化,并不会马上执行持久化,直到遇到action 类型的操作才会 持久化。unpersist() 释放内存
除了 cache 函数外,缓存还可以使用 persist, cache 是使用的默认缓存选项,一般默认为Memory_only(内存中缓存), persist 则可以在缓存的时候选择任意一种缓存类型.事实上, cache 内部调用的是默认的 persist.
持久化的类型
Java微服务实战296集大型视频-谷粒商城【附代码和课件】
Java开发微服务畅购商城实战【全357集大项目】-附代码和课件
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。