赞
踩
YARN是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
Capacity调度器的启用:
在ResourceManager节点上的yarn-site.xml设置
Property===>yarn.resourcemanager.scheduler.class
Value=====>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
Capacity调度器的配置:
在目录$HADOOP_HOME/hadoop/etc/hadoop/capacity-scheduler.xml。修改完成后,需要执行下面的命令:
$HADOOP_YARN_HOME/bin/yarn rmadmin -refreshQueues //使功能动态生效
客户端提交job给 ApplicationManager 连接NodeManager去申请一个Container的容器,这个容器运行作业的ApplicationMaster的主程序,启动后向ApplicationManager进行注册,然后ApplicationMaster向ResourceManager申请资源,申请job的提交资源staging-dir,还会拿到ResourceManager为这个job产生的jobId,并把staging-dir和jobId合并在一起形成特定路径。然后客户端再把这些资源提交到HDFS相应的路径下面。随后,Yarn框架会把本次job所要用到的资源放到一个任务队列里面描述出来,拿到一个资源的列表,和对应的NodeManager进行通信,启动对应的Container容器,运行 ReduceTask和MapTask (两个先后运行/顺序随机运行),它们是向ApplicationMaster进行汇报它们的运行状态, 当所有作业运行完成后还需要向ApplicationManager进行汇报并注销和关闭,这时所有资源会回收
Yarn中,它按照实际资源需求为每个任务分配资源,比如一个任务需要1GB内存,1个CPU,则为其分配对应的资源。而资源是用Container表示的,Container是一个抽象概念,它实际上是一个JAVA对象,里面有资源描述(资源所在节点,资源优先级,资源量,比如CPU量,内存量等)。当一个ApplicationMaster向RM申请资源时,RM会以Container的形式将资源发送给对应的ApplicationMaster,ApplicationMaster收到Container后,与对应的NodeManager通信,告诉它要利用这个Container运行某个任务
从ResourceManager的mian方法中有两个重要的方法
//会实现本类中的serviceInit方法
resourceManager.init(conf);
resourceManager.start();
此处调用父类AbstractService的init()方法,在内部实现serviceInit方法,由匹配的实现类来达到初始化的目的。基本所有重要服务的初始化方法都是调用AbstractService的init()方法实现的。其源码如下:
public void init(Configuration conf) { //服务配置是否为空 if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } //服务是否已经初始化 if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { /*相当于对服务做一次校验,确保服务初始化成功*/ if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { //服务初始化,会进入子类的同名函数 serviceInit(config); if (isInState(STATE.INITED)) { notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
start方法同init()方法一样,都是去调用父类AbstractService的start()方法,然后调用内部的serviceStart()方法由对应的实现类来实现的,与init()方法一样,许多重要服务的start方法都是采用这样的模式实现的。
public void start() { if (isInState(STATE.STARTED)) { return; } synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); if (isInState(STATE.STARTED)) { serviceStart(); if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
ResourceManager.serviceInit初始化方法中有一个很重要的方法createAndInitActiveServices()。该方法内部实现了许多重要服务的初始化。
// load core-site.xml
// load yarn-site.xml
//前面的代码关键就做了初始化配置文件,读取配置文件,设置配置文件上下文这类的事情,就不多做分析了。
// 注册了一个调度器,用于内部事件调度处理
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
//这个方法内部实现了许多重要的服务初始化的过程,其实真正需要分析的就是这个方法。
//这是由RM的一个内部类RMActiveServices实现的。
createAndInitActiveServices();
super.serviceInit(this.conf);
NodeManager与ResourceManager保持心跳的关键类的初始化方法也是一些配置参数的初始化
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
接下来看一下ResourceManager.serviceStart()
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
transitionToStandby(true);
} else {
transitionToActive();
}
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart();
}
transitionToActive()最终调用的是AsyncDispatcher.serviceStart(),内部新建了一个死循环线程,用来处理阻塞队列中的各个事件,最终会调用createThread()
Runnable createThread(){ return new Runnable(){ @Override public void run(){ while(!stopped&&!Thread.currentThread().isInterrupted()){ drained=eventQueue.isEmpty(); if(blockNewEvents){ synchronized(waitForDrained){ if(drained){ waitForDrained.notify(); } } } Event event; try{ event=eventQueue.take(); }catch(InterruptedException ie){ if(!stopped){ LOG.warn("AsyncDispatcher thread interrupted",ie); } return; } if(event!=null){ dispatch(event); } } } } }
super.serviceStart(),内部就是将每一个service启动,至此ResourceManager的启动就完毕了
protected void serviceStart() throws Exception {
List<Service> services = getServices();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": starting services, size=" + services.size());
}
for (Service service : services) {
service.start();
}
super.serviceStart();
}
main()方法
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
NodeManager nodeManager = new NodeManager();
Configuration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);
nodeManager.initAndStartNodeManager(conf, false);
}
调用了initAndStartNodeManager(conf, false),这个方法就是执行了init()以及start()。serviceInit()方法,同ResourceManager一样,做了许多重要方法的初始化,这里着重介绍NodeStatusUpdaterImpl,可以看到该初始化方法中new了一个NodeStatusUpdaterImpl,直接去看NodeStatusUpdaterImpl的初始化方法和启动方法
nodeStatusUpdater =createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,metrics);
}
初始化方法中大多都是配置文件的读取,而后去关注重点的start方法
this.nodeId = this.context.getNodeId(); this.httpPort = this.context.getHttpPort(); this.nodeManagerVersionId = YarnVersionInfo.getVersion(); try { //注册启动以保证ContainerManager可以获得每一台NM的标记 this.resourceTracker = getRMClient(); //配置必要配置信息,和安全认证操作利用Hadoop RPC远程调用RM端ResourcesTrackerService下的registerNodeManager()方法 // 详细见后面ResourcesTrackerService下的registerNodeManager()代码分析 registerWithRM(); super.serviceStart(); // 创建一个线程,然后启动,所有操作都在运行while的循环中 //设置、获取和输出必要配置信息,其中比较重要的调用getNodeStatus()方法, // 获取本地Container和本地Node的状态,以供后面的nodeHeartbeat()方法使用 //通过Hadoop RPC远程调用RM端ResourcesTrackerService下的nodeHeartbeat()函数, // 用while循环以一定时间间隔向RM发送心跳信息,心跳操作见下面ResourcesTrackerService下的nodeHeartbeat()函数 //nodeHeartbeat()将返回给NM信息,根据返回的response, // 根据response返回的信息标记不需要的Container和Application发送相应的 // FINISH_CONTAINERS和 FINISH_APPS给ContainerManager,进行清理操作----详细见后面的代码分析 startStatusUpdater(); } catch (Exception e) { String errorMessage = "Unexpected error starting NodeStatusUpdater"; LOG.error(errorMessage, e); throw new YarnRuntimeException(e); }
可归纳出两个重要的方法registerWithRM(),startStatusUpdater(),这两个方法会通过RPC远程调用上文说过的等待被调用ResourceTracker中的两个接口 registerNodeManager()和nodeHeartbeat()
registerWithRM()会通过RPC通信调用ResourceTracker中的接口 registerNodeManager(),所以我们直接去看这个 registerNodeManager(),内部做了一些节点健康信息的检查,资源信息的检查,从下列代码中可以看到还会检查该node是否是新的node,如果是就会触发STARTRED时间,如果不是在触发RECONNECTED事件
RMNode oldNode=this.rmContext.getRMNodes().putIfAbsent(nodeId,rmNode); if(oldNode==null) { this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStartedEvent(nodeId,request.getNMContainerStatuses(), request.getRunningApplications())); } else { LOG.info("Reconnect from the node at: "+host); this.nmLivelinessMonitor.unregister(nodeId); oldNode.resetLastNodeHeartBeatResponse(); this.rmContext .getDispatcher() .getEventHandler() .handle( new RMNodeReconnectEvent(nodeId,rmNode,request .getRunningApplications(),request.getNMContainerStatuses())); }
此次分析的是NodeManager的启动,自然去分析STARTED事件,状态发生了转移触发AddNodeTransition函数,该函数内显示的添加了active nodes,并向调度器发送了一个事件NODE_ADDED,告诉调度器有节点增加,用于计算集群中总共可调用的资源,到此节点就已经注册成功了。
//资源判断
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
updateMaximumAllocation(schedulerNode, true);
然后代码往后走,调用了ResourceTracker中的接 口nodeHeartbeat(),心跳方法中最终要的就是提交了一个STATUS_UPDATE事件
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
从下面代码中可以看出该事件对应的方法
.addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
在没有任务提交时,心跳方法内部就是汇报了自己节点资源的可用信息和已用信息以及汇报健康状态,最后返回RUNNING状态,表示NodeManager正常运行中,整个心跳方法是卸载while循环里的,没有异常就不会退出
//TODO:对container进行分组,分为a)刚申请到还未使用的 b)运行完毕需要回收的
rmNode.handleContainerStatus(statusEvent.getContainers());
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode));
}
// Update DTRenewer in secure mode to keep these apps alive. Today this is
// needed for log-aggregation to finish long after the apps are gone.
if (UserGroupInformation.isSecurityEnabled()) {
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
}
return NodeState.RUNNING;
ResourceManager
:
处于Standby状态的ResourceManager状态转换可以手工完成,也可以自动完成。手工完成时通过命令行的管理命令(命令是“yarn rmadmin”)。自动完成是通过配置自动故障转移(automatic-failover),使用集成的failover-controller完成状态的自动切换。
自动故障转移是依赖于ZooKeeper集群,依赖ZooKeeper的ActiveStandbyElector会嵌入到ResourceManager中,当Active状态的ResourceManager失效时,处于 Standby状态的ResourceManager就会被选举为Active状态的,实现切换。注意:这里没有ZooKeeperFailoverController进程,这点和HDFS的HA不同。
对于客户端而言,必须知道所有的ResourceManager中。因此,需要在yarn-site.xml中配置所有的ResourceManager。那么,当一个Active状态的ResourceManager失效时,客户端怎么办哪?客户端会采用轮询机制,轮询配置在yarn-site.xml中的ResourceManager,直到找到一个active状态的ResourceManager。
NodeManager
:
节点上的资源的管理,启动container容器运行task的计算,上报资源,container情况汇报给RM和任务的处理情况汇报给 ApplicationMaster
ApplicationMaster
:
运行在NodeManager机器上的container。用于单个application的task的管理和调度,并向ResourceManager进行资源的申请,
向NodeManager发出 launch container指令,接收NodeManager的task的处理状态信息
RMstatestore
:
ZKFC
:自动故障转移只作为ResourceManager进程的一个线程而非独立的守护进程来启动
HDFS&Yarn HA架构两者区别
1、ZKFC一个是进程,一个是线程
2、HDFS HA有独立的数据中间件的集群维护,Yarn HA作业调度信息维护在zk里面
3、HDFS中的DataNode 会向两个NameNode同时发送心跳。Yarn中NodeManager只会向active ResourceManager上报资源
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。