赞
踩
优质博文:IT-BLOG-CN
Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 mapreduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn 的重要概念:
【1】Yarn并不清楚用户提交的程序的运行机制;
【2】Yarn只提供运算资源的调度(用户程序向 Yarn申请资源,Yarn就负责分配资源);
【3】Yarn中的主管角色叫 ResourceManager;
【4】Yarn中具体提供运算资源的角色叫 NodeManager;
【5】Yarn其实与运行的用户程序完全解耦,就意味着 Yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序……
【6】Spark、Storm、Flink等运算框架都可以整合在Yarn上运行,只要他们各自的框架中有符合 Yarn规范的资源请求机制即可;
【7】Yarn是一个通用的资源调度平台,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。
从 YARN的架构图来看,它主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成:
【1】ResourceManager(RM): YARN分层结构的本质是 ResourceManager。这个实体控制整个集群并管理应用程序计算时的资源分配。ResourceManager将各个资源(计算、内存、带宽等)精心安排给 NodeManager(YARN的每节点代理)。ResourceManager还与 ApplicationMaster一起分配资源,与 NodeManager一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster承担了以前的 TaskTracker的一些角色,ResourceManager承担了 JobTracker 的角色。总的来说,RM有以下作用
(1)处理客户端请求;
(2)启动或监控 ApplicationMaster;
(3)监控 NodeManager;
(4)资源的分配与调度;
【2】ApplicationMaster(AM): ApplicationMaster管理在 YARN内运行的每个应用程序实例。ApplicationMaster负责协调来自 ResourceManager的资源,并通过 NodeManager监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从YARN角度讲,ApplicationMaster是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。总的来说,AM有以下作用:
(1)负责数据的切分;
(2)为应用程序申请资源并分配给内部的任务;
(3)任务的监控与容错;
【3】NodeManager(NM): NodeManager 管理 YARN集群中的每个节点。NodeManager 提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1通过插槽管理Map 和Reduce任务的执行,而 NodeManager管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。 总的来说,NM有以下作用:
(1)管理单个节点上的资源;
(2)处理来自 ResourceManager的命令;
(3)处理来自 ApplicationMaster的命令;
【4】Container: Container 是 YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用 Container表示的。YARN 会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。总的来说,Container有以下作用:对任务运行环境进行抽象,封装CPU、内存等多维度的资源以及环境变量、启动命令等任务运行相关的信息。
要使用一个 YARN集群,首先需要一个包含应用程序的客户的请求。ResourceManager 协商一个容器的必要资源,启动一个ApplicationMaster 来表示已提交的应用程序。通过使用一个资源请求协议,ApplicationMaster协商每个节点上供应用程序使用的资源容器。执行应用程序时,ApplicationMaster监视容器直到完成。当应用程序完成时,ApplicationMaster从 ResourceManager注销其容器,执行周期就完成了。
通过上面的讲解,应该明确的一点是,旧的 Hadoop架构受到了 JobTracker的高度约束,JobTracker负责整个集群的资源管理和作业调度。新的 YARN架构打破了这种模型,允许一个新 ResourceManager管理跨应用程序的资源使用,ApplicationMaster负责管理作业的执行。这一更改消除了一处瓶颈,还改善了将 Hadoop集群扩展到比以前大得多的配置的能力。此外,不同于传统的MapReduce,YARN允许使用MPI( Message Passing Interface) 等标准通信模式,同时执行各种不同的编程模型,包括图形处理、迭代式处理、机器学习和一般集群计算。
啊,好大一坨。。。
工作机制详解:
【0】Mr 程序提交到客户端所在的节点;
【1】Yarnrunner 向 Resourcemanager 申请一个 Application;
【2】rm 将该应用程序的资源路径返回给 yarnrunner;
【3】该程序将运行所需资源提交到 HDFS上;
【4】程序资源提交完毕后,申请运行 mrAppMaster;
【5】RM 将用户的请求初始化成一个task;
【6】其中一个 NodeManager领取到 task任务;
【7】该 NodeManager创建容器 Container,并产生 MRAppmaster;
【8】Container 从 HDFS上拷贝资源到本地;
【9】MRAppmaster向 RM 申请运行 maptask容器;
【10】RM 将运行 maptask任务分配给另外两个 NodeManager,另两个 NodeManager分别领取任务并创建容器;
【11】MR向两个接收到任务的 NodeManager发送程序启动脚本,这两个 NodeManager分别启动 maptask,maptask对数据分区排序。
【12】MRAppmaster 向 RM申请2个容器,运行reduce task;
【13】reduce task向 maptask获取相应分区的数据;
【14】程序运行完毕后,MR会向 RM注销自己;
目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler 和 Fair Scheduler。Hadoop2.7.2 默认的资源调度器是Capacity Scheduler。具体设置详见:yarn-default.xml 文件
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
【1】先进先出调度器(FIFO): FIFO 是 Hadoop中默认的调度器,也是一种批处理调度器。它先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业。原理图如下所示。
比如,一个 TaskTracker正好有一个空闲的slot,此时FIFO调度器的队列已经排好序,就选择排在最前面的任务job1,job1包含很多 map task和 reduce task。假如空闲资源是map slot,我们就选择 job1中的 map task。假如 map task0要处理的数据正好存储在该 TaskTracker 节点上,根据数据的本地性,调度器把 map task0分配给该 TaskTracker。FIFO 调度器整体就是这样一个过程。
【2】容量调度器(Capacity Scheduler): 支持多个队列,每个队列可配置一定的资源量,每个队列采用FIFO调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制。 原理图如下所示。
比如我们分为三个队列:queueA、queueB和 queueC,每个队列的 job 按照到达时间排序。假如这里有100个slot,queueA分配20%的资源,可配置最多运行15个task,queueB 分配50%的资源,可配置最多运行25个task,queueC分配30%的资源,可配置最多运行25个task。这三个队列同时按照任务的先后顺序依次执行,比如,job11、job21和job31分别排在队列最前面,是最先运行,也是同时运行。
【3】公平调度器(Fair Scheduler): 同计算能力调度器类似,支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源。原理图如下所示
比如有三个队列:queueA、queueB和queueC,每个队列中的 job 按照优先级分配资源,优先级越高分配的资源越多,但是每个 job 都会分配到资源以确保公平。在资源有限的情况下,每个 job 理想情况下获得的计算资源与实际获得的计算资源存在一种差距, 这个差距就叫做缺额。在同一个队列中,job的资源缺额越大,越先获得资源优先执行。作业是按照缺额的高低来先后执行的,而且可以看到上图有多个作业同时运行。
【1】作业提交: client调用 job.waitForCompletion方法,向整个集群提交MapReduce作业 (第1步) 。 新的作业ID(应用ID)由资源管理器分配(第2步)。作业的client核实作业的输出,计算输入的split,将作业的资源(包括Jar包、配置文件、split信息)拷贝给HDFS(第3步)。最后,通过调用资源管理器的 submitApplication()来提交作业(第4步)。
【2】作业初始化: 当资源管理器收到 submitApplciation()的请求时,就将该请求发给调度器(scheduler),调度器分配container,然后资源管理器在该 container内启动应用管理器进程,由节点管理器监控(第5步)。
MapReduce作业的应用管理器是一个主类为 MRAppMaster的 Java应用。其通过创造一些 bookkeeping对象来监控作业的进度,得到任务的进度和完成报告(第6步)。然后其通过分布式文件系统得到由客户端计算好的输入split(第7步)。然后为每个输入split创建一个map任务,根据mapreduce.job.reduces创建reduce任务对象。
【3】任务分配: 如果作业很小, 应用管理器会选择在其自己的 JVM中运行任务。如果不是小作业, 那么应用管理器向资源管理器请求container来运行所有的map和reduce任务(第8步)。这些请求是通过心跳来传输的, 包括每个map任务的数据位置, 比如存放输入split的主机名和机架(rack)。调度器利用这些信息来调度任务, 尽量将任务分配给存储数据的节点, 或者分配给和存放输入split的节点相同机架的节点。
【4】任务运行: 当一个任务由资源管理器的调度器分配给一个container后, 应用管理器通过联系节点管理器来启动container(第9步)。任务由一个主类为YarnChild的Java应用执行。在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及分布式缓存的所有文件(第10步)。最后, 运行map或reduce任务(第11步)。YarnChild 运行在一个专用的JVM中, 但是 YARN不支持JVM重用。
【5】进度和状态更新: YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
【6】作业完成: 除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和container会清理工作状态,OutputCommiter 的作业清理方法也会被调用。作业的信息会被作业历史服务器存储以备之后用户核查。
【1】作业完成时间取决于最慢的任务完成时间: 一个作业由若干个 Map任务和 Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢。
典型案例:系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?
【2】推测执行机制: 发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
【3】执行推测任务的前提条件:
(1)每个 task只能有一个备份任务;
(2)当前 job已完成的task必须不小于0.05(5%);
(3)开启推测执行参数设置。Hadoop2.7.2 mapred-site.xml文件中默认是打开的。
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
【4】不能启用推测执行机制情况:
(1)任务间存在严重的负载倾斜;
(2)特殊任务,比如任务向数据库中写数据。
【5】算法原理:假设某一时刻,任务T的执行进度为 progress,则可通过一定的算法推测出该任务的最终完成时刻estimateEndTime。另一方面,如果此刻为该任务启动一个备份任务,则可推断出它可能的完成时刻estimateEndTime`,于是可得出以下几个公式:
estimateEndTime=estimatedRunTime+taskStartTime
estimatedRunTime=(currentTimestamp-taskStartTime)/progress
estimateEndTime`= currentTimestamp+averageRunTime
其中,
currentTimestamp为当前时刻;
taskStartTime为该任务的启动时刻;
averageRunTime为已经成功运行完成的任务的平均运行时间;
progress是任务运行的比例(0.1-1);
这样,MRv2总是选择(estimateEndTime- estimateEndTime·)差值最大的任务,并为之启动备份任务。为了防止大量任务同时启动备份任务造成的资源浪费,MRv2为每个作业设置了同时启动的备份任务数目上限。
推测执行机制实际上采用了经典的优化算法: 以空间换时间,它同时启动多个相同任务处理相同的数据,并让这些任务竞争以缩短数据处理时间。显然,这种方法需要占用更多的计算资源。在集群资源紧缺的情况下,应合理使用该机制,争取在多用少量资源的情况下,减少作业的计算时间。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。