赞
踩
组成Hadoop的最核心框架有三个MapReduce、YARN和Hdfs,分别是分布式计算框架、资源调度框架和分布式文件系统。
Hadoop Distributed File System,简称HDFS,是一个分布式文件系统。
hdfs有三个服务,分别是NameNode,secondary NameNode,DataNode。
1) NameNode(nn):处理客户端读写请求,存储文件的元数据,如文件名,文件目录结构,生成时间,副本数,文件权限,以及每个文件的块列表和块所在的DataNode等。
2)DataNode(dn):存数实际的数据块并执行数据块的读写操作。
3)secondary NameNode(2nn):每隔一段时间对NameNode做元数据备份
Yet Another Resource Negotiator简称YARN ,另一种资源协调者,是Hadoop的资源管理器。
yarn有四个服务,ResourceManager(资源管理器),NodeManager(节点管理器),applicationmaster(任务管理器),container(容器)
1)ResourceManager(rm):集群资源(cpu,内存等)管理者。
2)NodeManager(nm):单个节点资源的管理者。
3)applicationMaster(am):单个任务运行的管理者。
4)container:容器,相当于一台独立的服务器,封装了任务所需的资源,如内存,cpu,磁盘等。
客户端可以有多个
集群上可以运行多个applicationMaster
每个NodeManager上可以有多个container
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
MapReduce将计算过程分为两个阶段:Map和Reduce
1)Map阶段并行处理输入数据
2)Reduce阶段对Map结果进行汇总
客户端Client提交任务到资源管理器(ResourceManager),
资源管理器接收到任务之后去NodeManager节点开启任务(ApplicationMaster),
ApplicationMaster向ResourceManager申请资源, 若有资源ApplicationMaster负责开启任务即MapTask。
每个map独立工作,各自负责检索各自对应的DataNode,将结果记录到HDFS,
DataNode负责存储,NameNode负责记录,2nn负责备份部分数据。
HDFS(Hadoop Distributed File System),它是一个分布式文件系统,适合一次写入,多次读出的场景。
优点:
1)高容错性
数据采用多副本保存方式,当集群中的某些机器宕机了,数据可以从其他正常运行的机器获取
2)适合处理大数据
通过将大文件切分成小的数据块存储到不同服务器上,可以实现一个大文件的存储,同时通过联合多个服务器多块硬盘实现整个存储系统的大容量,大文件的分片存储,不同分片可以进行并行读写操作,进而实现数据的高速访问。
NameNode负责文件元数据的操作,DataNode负责处理文件的读写请求,文件数据流不会经过NameNode的处理,只会跟存储在具体DataNode进行联系,因此NameNode不会成为系统的瓶颈,成百上千台DataNode节点应对文件内容数据流的读写,其吞吐量大大提高了。
3)可靠性
缺点
1)不适合做实时数据访问
2)无法对大量小文件进行存储
存储大量小文件会占用NameNode大量的内存去存储元数据信息
小文件存储的寻址时间会超过读取时间
最佳传输损耗理论:在一次传输中,寻址时间占用总传输时间的1%时,本次传输的损耗最小,为最佳性价比传输!
目前硬件的发展条件,普通磁盘写的速率大概为100M/S, 寻址时间一般为10ms!
(10ms / 1%) * 100M/S = 100M
3)不支持并发写入,文件随机修改
一个文件不允许多个线程同时写,仅支持追加,不支持修改
(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在,并返回结果
(2)客户端请求 Block上传到哪几个DataNode上,NameNode返回DataNode节点
(3)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。dn1、dn2、dn3逐级应答客户端。
(4)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
(5)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。
(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)就近原则挑选一台DataNode(然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
优点:
1)易于编程:简单的实现一些接口就可以完成分布式程序。
2)良好的拓展性:计算资源不足时可以通过增加机器提高能力。
3)高容错性:一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,这个过程由hadoop内部完成。
4)适合PB级以上的海量数据的离线处理
缺点:
1)不擅长实时计算
2)不擅长流式计算:因为MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长有向无环图计算:MapReduce不擅长后一个应用程序的输入为前一个的输出,因为每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
(1)分布式的运算程序往往需要分成至少2个阶段,map阶段和reduce阶段。
(2)map阶段的MapTask并发实例,并行运行,互不相干。
(3)reduce阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
1)一个job的map阶段并行度由客户端在提交job时的切片数决定
2)每个split切片分配一个mapTask并行实例处理
3)默认情况下切片大小=blocksize
4)切片针对每一个文件切片
1)程序先找到数据存储目录
2)遍历目录下的每个文件
3)遍历第一个文件
(1)获取文件大小
(2)计算切片大小(默认情况下切片大小=blocksize)
(3)每次切片时要判断剩下的部分是否大于块的1.1倍,不大于则划分一块切片
(4)将切片信息写到一个切片规划文件中
(5)切片的核心过程在getSplit()方法中完成
(6)InputSplit只记录了切片的元数据信息(起始位置,长度,所在节点等)
4)提交切片规划文件到yarn上,由yarn的MrAppMaster根据切片规划文件开启对应的mapTask。
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
1)TextInputFormat
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
TextInputformat将每一行在文件中的起始偏移量作为 key,每一行的内容作为value。默认以\n或回车键作为一行记录。
2)KeyValueTextInputFormat
KeyValueTextInputFormat是FileInputFormat的一个实现类,每一行为一条记录,被分隔符分割为key,value。
可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,“\t”)设置分割符。
KeyValueTextInputFormat 适合处理输入数据的每一行是两列,并用 tab 分离的形式
3)NLineInputFormat
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。
4)CombineTextInputFormat
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
MapReduce工作流程由submit阶段,mapTask阶段,shuffle阶段,reduce阶段组成
1)客户端提交切片,jar包,xml文件给yarn,由MrAppMaster获取切片信息打开对应数量的mapTask。
2)mapTask通过InputFormat调用RecordReader的read()方法读取外部的数据,解析出key/value.
3)当数据处理完成后,调用OutputCollector.collect()输出结果。它会将生成的key/value分区,并写入环形内存缓冲区中。
4)当环形缓冲区达到80%后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。
5)在环形缓冲区内部,会对数据进行一次分区排序。利用快速排序算法对分区编号进行排序,然后再对key进行排序,这样可以保证同一分区内所有数据按照key有序。
按照分区编号由小到大依次将每个分区中的数据写入到任务工作目录下的临时文件spillN.out中,如果设置了Combiner,则写入文件前会对分区中的数据进行合并操作。
分区内元数据信息会写入到内存索引数据结构SpillRecord中,如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
6)当所有数据处理完成后,MapTask将所有临时文件合并成一个大文件,同时生成相应的索引文件。
MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并10个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
7)Reduce会根据自己的分区,去maptask中拉取属于自己的数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
8)在拉取数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
9)当合并完成后reduce()函数将计算结果写到HDFS上。
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)将运行所需资源提交到HDFS上。资源提交完毕后,申请运行mrAppMaster。
(5)RM将用户的请求初始化成一个Task。
(6)其中一个NodeManager领取到Task任务,并创建容器Container,产生MRAppmaster。
(7)Container从HDFS上拷贝资源到本地。MRAppmaster向RM 申请运行MapTask资源。
(8)RM将运行MapTask任务分配给另外两个NodeManager,并创建容器。
(9)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(10)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(11)ReduceTask向MapTask获取相应分区的数据。
(12)程序运行完毕后,MR会向RM申请注销。
(1)作业提交
客户端向yarn提交一个作业
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。并申请一个作业id。
第2步:RM给Client返回该job资源的提交路径和作业id。
第3步:Client提交jar包、切片信息和XML文件到指定的资源提交路径。提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到调度器中。
第7步:空闲的NM领取到该Job,并创建Container,并产生MRAppmaster,并下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本。
第13步:MrAppMaster等所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
Hadoop作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。
Apache Hadoop3.1.3默认的资源调度器是容量调度器,
CDH框架默认调度器是公平调度器。
1、FIFO调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务。
2、容量调度器(Capacity Scheduler):是Yahoo开发的多用户调度器。
特点
多队列:每个队列可以配置一定的资源量,每个队列采用FIFO调度策略
容量保证:管理员可以为每个队列设置资源,保证最低资源和资源使用上限
灵活性:如果一个队列中的资源有剩余,可以把这些资源共享给需要资源的队列,如果该队列有新的应用程序提交,则会收回对应资源
多用户:支持多用户共享集群和多应用程序同时运行。为了防止一个用户的作业独占资源,该调度器会对同一个用户提交的作业所占的资源量进行限定。
2.1 资源分配算法
(1)队列资源分配:从root开始使用深度优先算法,优先选择资源占用率最低的队列分配资源。
(2)作业资源分配:默认按照提交作业的优先级和提交时间的顺序分配资源。
(3)容量资源分配:按照容器的优先级分配资源,如果优先级相同则按照数据本地性原则
1)任务和数据在同一节点
2)任务和数据在同一机架
3)任务和数据不在同一节点也不在同一机架
3、公平调度器
特点:
多队列:每个队列可以配置一定的资源量,每个队列采用FIFO调度策略
容量保证:管理员可以为每个队列设置资源,保证最低资源和资源使用上限
灵活性:如果一个队列中的资源有剩余,可以把这些资源共享给需要资源的队列,如果该队列有新的应用程序提交,则会收回对应资源
多用户:支持多用户共享集群和多应用程序同时运行。为了防止一个用户的作业独占资源,该调度器会对同一个用户提交的作业所占的资源量进行限定。
队列资源分配方式:
(1)FIFO策略,采用此种方法则与容量调度器一致
(2)Fair策略,Fair策略是一种基于最大最小公平算法实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。
资源分配流程:
1、选择队列
2、选择作业
3、选择容器
实现步骤:
计算比较对象的实际最小资源份额,是否饥饿,资源分配比,资源使用权重比
如果有饥饿,饥饿优先
如果都不饥饿则按照资源使用权重比较小者优先,如果相同则按照提交时间顺序
如果都饥饿,则按照资源分配比较小的优先,如果相同则按照提交时间顺序
(3)DRF策略
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。