当前位置:   article > 正文

Yarn_yarm

yarm

  YARN是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

Yarn的架构

在这里插入图片描述

  • Yarn 资源管理器(ResouceManager):协调集群计算资源的分配,主要接收客户端任务请求,接收和监控NodeManager的资源情况汇报,负责资源的分配与调度,启动和监控ApplicationMaster。包含主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)
      定时调度器(Scheduler):从本质上来说,定时调度器就是一种策略。当 Client 提交一个任务的时候,它会根据所需要的资源以及当前集群的资源状况进行分配。具有容量队列的限制条件,将系统中的资源分配给正在运行的一个应用程序先进先出调度器 :一个作业运行完了,另一个才能运行
      应用管理器(ApplicationManager):应用管理器就是负责管理 Client 用户提交的应用。监控应用的工作正是由应用管理器(ApplicationManager)完成的。负责系统中所有的job,包括job的提交与调度器协商资源来启动ApplicationMaster(AM)和监控(AM)运行状态,并且失败的时候能够重新启动它,更新分配给一个新的Container容器的进度或者状态,不负责资源管理,只负责job。其中,Container是Yarn中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源);ApplicationMaster负责作业的监控并跟踪应用执行状态来重启失败任务的,主要是单个Application(Job)的task管理和调度,向RM进行资源的申请,向NM发出launchContainer指令,接收NM的task处理状态信息。一个job只有一个主程序
  • Yarn 节点管理器(NodeManager):启动和监视集群中每个节点的计算容器,并监控它们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler 提供这些资源使用报告
  • YarnChild
    真正执行task的进程
  • Container
      Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当 AM 向 RM 申请资源时,RM 为 AM 返回的资源是用 Container 表示的。YARN 会为每个任务分配一个 Container,该任务只能使用该 Container 中描述的资源

Yarn的内置调度器

  1. FIFO先进先出,一个的简单调度器,适合低负载集群(适合任务数量不多的情况下使用)
  2. Capacity调度器,给不同队列(即用户或用户组)分配一个预期最小容量,在每个队列内部用层次化的FIFO来调度多个应用程序。(适用于有很多小的任务跑,需要占很多队列,不使用队列,会造成资源的浪费)
    在这里插入图片描述

Capacity调度器的启用:
在ResourceManager节点上的yarn-site.xml设置

Property===>yarn.resourcemanager.scheduler.class
Value=====>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
  • 1
  • 2

Capacity调度器的配置:
在目录$HADOOP_HOME/hadoop/etc/hadoop/capacity-scheduler.xml。修改完成后,需要执行下面的命令:

$HADOOP_YARN_HOME/bin/yarn rmadmin -refreshQueues    //使功能动态生效
  • 1
  1. Fair公平调度器,针对不同的应用(也可以为用户或用户组),每个应用属于一个队列,主旨是让每个应用分配的资源大体相当。(当然可以设置权重),若是只有一个应用,那集群所有资源都是他的。 适用情况:共享大集群、队列之间有较大差别(生产使用)
    在这里插入图片描述

Yarn的工作流程

  客户端提交job给 ApplicationManager 连接NodeManager去申请一个Container的容器,这个容器运行作业的ApplicationMaster的主程序,启动后向ApplicationManager进行注册,然后ApplicationMaster向ResourceManager申请资源,申请job的提交资源staging-dir,还会拿到ResourceManager为这个job产生的jobId,并把staging-dir和jobId合并在一起形成特定路径。然后客户端再把这些资源提交到HDFS相应的路径下面。随后,Yarn框架会把本次job所要用到的资源放到一个任务队列里面描述出来,拿到一个资源的列表,和对应的NodeManager进行通信,启动对应的Container容器,运行 ReduceTask和MapTask (两个先后运行/顺序随机运行),它们是向ApplicationMaster进行汇报它们的运行状态, 当所有作业运行完成后还需要向ApplicationManager进行汇报并注销和关闭,这时所有资源会回收
  Yarn中,它按照实际资源需求为每个任务分配资源,比如一个任务需要1GB内存,1个CPU,则为其分配对应的资源。而资源是用Container表示的,Container是一个抽象概念,它实际上是一个JAVA对象,里面有资源描述(资源所在节点,资源优先级,资源量,比如CPU量,内存量等)。当一个ApplicationMaster向RM申请资源时,RM会以Container的形式将资源发送给对应的ApplicationMaster,ApplicationMaster收到Container后,与对应的NodeManager通信,告诉它要利用这个Container运行某个任务

ResourceManager和NodeManager交互

ResourceManager

从ResourceManager的mian方法中有两个重要的方法

//会实现本类中的serviceInit方法
resourceManager.init(conf);
resourceManager.start();
  • 1
  • 2
  • 3

此处调用父类AbstractService的init()方法,在内部实现serviceInit方法,由匹配的实现类来达到初始化的目的。基本所有重要服务的初始化方法都是调用AbstractService的init()方法实现的。其源码如下:

public void init(Configuration conf) {
    //服务配置是否为空
    if (conf == null) {
        throw new ServiceStateException("Cannot initialize service "
        + getName() + ": null configuration");
    }
    //服务是否已经初始化
    if (isInState(STATE.INITED)) {
        return;
    }
    synchronized (stateChangeLock) {
        /*相当于对服务做一次校验,确保服务初始化成功*/
        if (enterState(STATE.INITED) != STATE.INITED) {
            setConfig(conf);
            try {
                //服务初始化,会进入子类的同名函数
                serviceInit(config);
                if (isInState(STATE.INITED)) {
                    notifyListeners();
                }
            } catch (Exception e) {
             noteFailure(e);
             ServiceOperations.stopQuietly(LOG, this);
             throw ServiceStateException.convert(e);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

start方法同init()方法一样,都是去调用父类AbstractService的start()方法,然后调用内部的serviceStart()方法由对应的实现类来实现的,与init()方法一样,许多重要服务的start方法都是采用这样的模式实现的。

public void start() {
    if (isInState(STATE.STARTED)) {
        return;
    }
    synchronized (stateChangeLock) {
        if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
            try {
                startTime = System.currentTimeMillis();
                if (isInState(STATE.STARTED)) {
                    serviceStart();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Service " + getName() + " is started");
                    }
                    notifyListeners();
                }
            } catch (Exception e) {
                noteFailure(e);
                ServiceOperations.stopQuietly(LOG, this);
                throw ServiceStateException.convert(e);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

ResourceManager.serviceInit初始化方法中有一个很重要的方法createAndInitActiveServices()。该方法内部实现了许多重要服务的初始化。

// load core-site.xml
 // load yarn-site.xml
 //前面的代码关键就做了初始化配置文件,读取配置文件,设置配置文件上下文这类的事情,就不多做分析了。
 
// 注册了一个调度器,用于内部事件调度处理
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
 
//这个方法内部实现了许多重要的服务初始化的过程,其实真正需要分析的就是这个方法。
//这是由RM的一个内部类RMActiveServices实现的。
createAndInitActiveServices(); 
super.serviceInit(this.conf);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

NodeManager与ResourceManager保持心跳的关键类的初始化方法也是一些配置参数的初始化

resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
  • 1
  • 2
  • 3

接下来看一下ResourceManager.serviceStart()

protected void serviceStart() throws Exception {
    if (this.rmContext.isHAEnabled()) {
        transitionToStandby(true);
    } else {
        transitionToActive();
    }
    startWepApp();
    if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,false)) {
        int port = webApp.port();
        WebAppUtils.setRMWebAppPort(conf, port);
    }
    super.serviceStart();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

transitionToActive()最终调用的是AsyncDispatcher.serviceStart(),内部新建了一个死循环线程,用来处理阻塞队列中的各个事件,最终会调用createThread()

 Runnable createThread(){
    return new Runnable(){
        @Override 
        public void run(){
            while(!stopped&&!Thread.currentThread().isInterrupted()){
                drained=eventQueue.isEmpty();
                if(blockNewEvents){
                    synchronized(waitForDrained){
                        if(drained){
                            waitForDrained.notify();
                        }
                    }
                }
                Event event;
                try{
                    event=eventQueue.take();
                }catch(InterruptedException ie){
                    if(!stopped){
                        LOG.warn("AsyncDispatcher thread interrupted",ie);
                    }
                    return;
                }
                if(event!=null){
                    dispatch(event);
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

super.serviceStart(),内部就是将每一个service启动,至此ResourceManager的启动就完毕了

protected void serviceStart() throws Exception {
    List<Service> services = getServices();
    if (LOG.isDebugEnabled()) {
        LOG.debug(getName() + ": starting services, size=" + services.size());
    }
    for (Service service : services) {
        service.start();
    }
    super.serviceStart();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

NodeManager

main()方法

public static void main(String[] args) throws IOException {
    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
    NodeManager nodeManager = new NodeManager();
    Configuration conf = new YarnConfiguration();
    new GenericOptionsParser(conf, args);
    nodeManager.initAndStartNodeManager(conf, false);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

调用了initAndStartNodeManager(conf, false),这个方法就是执行了init()以及start()。serviceInit()方法,同ResourceManager一样,做了许多重要方法的初始化,这里着重介绍NodeStatusUpdaterImpl,可以看到该初始化方法中new了一个NodeStatusUpdaterImpl,直接去看NodeStatusUpdaterImpl的初始化方法和启动方法

nodeStatusUpdater =createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,metrics);
}
  • 1
  • 2
  • 3
  • 4
  • 5

初始化方法中大多都是配置文件的读取,而后去关注重点的start方法

this.nodeId = this.context.getNodeId();
this.httpPort = this.context.getHttpPort();
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
try {
    //注册启动以保证ContainerManager可以获得每一台NM的标记
    this.resourceTracker = getRMClient();
    //配置必要配置信息,和安全认证操作利用Hadoop RPC远程调用RM端ResourcesTrackerService下的registerNodeManager()方法
    // 详细见后面ResourcesTrackerService下的registerNodeManager()代码分析
    registerWithRM();
        
    super.serviceStart();
    // 创建一个线程,然后启动,所有操作都在运行while的循环中
    //设置、获取和输出必要配置信息,其中比较重要的调用getNodeStatus()方法,
    // 获取本地Container和本地Node的状态,以供后面的nodeHeartbeat()方法使用
    //通过Hadoop RPC远程调用RM端ResourcesTrackerService下的nodeHeartbeat()函数,
    // 用while循环以一定时间间隔向RM发送心跳信息,心跳操作见下面ResourcesTrackerService下的nodeHeartbeat()函数
    //nodeHeartbeat()将返回给NM信息,根据返回的response,
    // 根据response返回的信息标记不需要的Container和Application发送相应的
    // FINISH_CONTAINERS和 FINISH_APPS给ContainerManager,进行清理操作----详细见后面的代码分析
    startStatusUpdater();
} catch (Exception e) {
    String errorMessage = "Unexpected error starting NodeStatusUpdater";
    LOG.error(errorMessage, e);
    throw new YarnRuntimeException(e);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

可归纳出两个重要的方法registerWithRM(),startStatusUpdater(),这两个方法会通过RPC远程调用上文说过的等待被调用ResourceTracker中的两个接口 registerNodeManager()和nodeHeartbeat()

NodeManager与ResourceManager间的心跳通信

  registerWithRM()会通过RPC通信调用ResourceTracker中的接口 registerNodeManager(),所以我们直接去看这个 registerNodeManager(),内部做了一些节点健康信息的检查,资源信息的检查,从下列代码中可以看到还会检查该node是否是新的node,如果是就会触发STARTRED时间,如果不是在触发RECONNECTED事件

RMNode oldNode=this.rmContext.getRMNodes().putIfAbsent(nodeId,rmNode);
if(oldNode==null) {
    this.rmContext.getDispatcher().getEventHandler().handle(
            new RMNodeStartedEvent(nodeId,request.getNMContainerStatuses(),
        request.getRunningApplications()));
} else {
    LOG.info("Reconnect from the node at: "+host);
    this.nmLivelinessMonitor.unregister(nodeId);
    oldNode.resetLastNodeHeartBeatResponse();
    this.rmContext
        .getDispatcher()
        .getEventHandler()
        .handle(
                new RMNodeReconnectEvent(nodeId,rmNode,request
        .getRunningApplications(),request.getNMContainerStatuses()));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

此次分析的是NodeManager的启动,自然去分析STARTED事件,状态发生了转移触发AddNodeTransition函数,该函数内显示的添加了active nodes,并向调度器发送了一个事件NODE_ADDED,告诉调度器有节点增加,用于计算集群中总共可调用的资源,到此节点就已经注册成功了。

//资源判断
root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
updateMaximumAllocation(schedulerNode, true);
  • 1
  • 2
  • 3
  • 4
  • 5

然后代码往后走,调用了ResourceTracker中的接 口nodeHeartbeat(),心跳方法中最终要的就是提交了一个STATUS_UPDATE事件

// 4. Send status to RMNode, saving the latest response.
 this.rmContext.getDispatcher().getEventHandler().handle(
 new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
 remoteNodeStatus.getContainersStatuses(), 
 remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
  • 1
  • 2
  • 3
  • 4
  • 5

从下面代码中可以看出该事件对应的方法

.addTransition(NodeState.RUNNING,
 EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
 RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
  • 1
  • 2
  • 3

在没有任务提交时,心跳方法内部就是汇报了自己节点资源的可用信息和已用信息以及汇报健康状态,最后返回RUNNING状态,表示NodeManager正常运行中,整个心跳方法是卸载while循环里的,没有异常就不会退出

//TODO:对container进行分组,分为a)刚申请到还未使用的 b)运行完毕需要回收的
rmNode.handleContainerStatus(statusEvent.getContainers());
if(rmNode.nextHeartBeat) {
    rmNode.nextHeartBeat = false;
    rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode));
}
// Update DTRenewer in secure mode to keep these apps alive. Today this is
// needed for log-aggregation to finish long after the apps are gone.
if (UserGroupInformation.isSecurityEnabled()) {
    rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
            statusEvent.getKeepAliveAppIds());
}
return NodeState.RUNNING;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

YARN 的高可用

在这里插入图片描述
ResourceManager

  1. 启动时会通过向ZooKeeper的/rmstore目录写一个lock文件,写成功则为active,否则standby。standby ResourceManager会一直监控lock文件的是否存在,如果不存在就会尝试去创建,争取为active ResourceManager
  2. 会接收客户端的任务请求,接收和监控NodeManager的资源的汇报,负责资源的分配与调度
  3. 启动和监控 ApplicationMaster

  处于Standby状态的ResourceManager状态转换可以手工完成,也可以自动完成。手工完成时通过命令行的管理命令(命令是“yarn rmadmin”)。自动完成是通过配置自动故障转移(automatic-failover),使用集成的failover-controller完成状态的自动切换。
  自动故障转移是依赖于ZooKeeper集群,依赖ZooKeeper的ActiveStandbyElector会嵌入到ResourceManager中,当Active状态的ResourceManager失效时,处于 Standby状态的ResourceManager就会被选举为Active状态的,实现切换。注意:这里没有ZooKeeperFailoverController进程,这点和HDFS的HA不同。
  对于客户端而言,必须知道所有的ResourceManager中。因此,需要在yarn-site.xml中配置所有的ResourceManager。那么,当一个Active状态的ResourceManager失效时,客户端怎么办哪?客户端会采用轮询机制,轮询配置在yarn-site.xml中的ResourceManager,直到找到一个active状态的ResourceManager。

NodeManager
节点上的资源的管理,启动container容器运行task的计算,上报资源,container情况汇报给RM和任务的处理情况汇报给 ApplicationMaster

ApplicationMaster
运行在NodeManager机器上的container。用于单个application的task的管理和调度,并向ResourceManager进行资源的申请,
向NodeManager发出 launch container指令,接收NodeManager的task的处理状态信息

RMstatestore

  1. ResourceManager的作业信息存储在ZooKeeper的/rmstore下,active ResourceManager向这个目录写APP(作业)信息
  2. 当active ResourceManager挂了,另外一个standby ResourceManager成功转换为active ResourceManager后,会从/rmstore目录读取相应的作业信息,重新构建作业的内存信息。然后启动内部服务,开始接收NodeManager的心跳,构建集群资源的信息,并接收客户端的提交作业的请求等

ZKFC:自动故障转移只作为ResourceManager进程的一个线程而非独立的守护进程来启动

HDFS&Yarn HA架构两者区别
1、ZKFC一个是进程,一个是线程
2、HDFS HA有独立的数据中间件的集群维护,Yarn HA作业调度信息维护在zk里面
3、HDFS中的DataNode 会向两个NameNode同时发送心跳。Yarn中NodeManager只会向active ResourceManager上报资源

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/866121
推荐阅读
相关标签
  

闽ICP备14008679号