赞
踩
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。
NodeManager整体架构:
接下来将按照启动NodeManager时代码执行的顺序为主线进行代码分析。
主要代码:
- public static void main(String[] args) {
- Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- //打印Nodemangager启动和关闭时的日志信息
- StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
- //创建NodeManager对象
- NodeManager nodeManager = new NodeManager();
- //加载配置文件初始化
- Configuration conf = new YarnConfiguration();
- //初始化并启动NodeManager
- nodeManager.initAndStartNodeManager(conf, false);
- }
- private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
- try {
-
- // Remove the old hook if we are rebooting.
- if (hasToReboot && null != nodeManagerShutdownHook) {
- ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
- }
- //增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService
- nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
- ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
- SHUTDOWN_HOOK_PRIORITY);
- //调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
- this.init(conf);
- //启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
- this.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting NodeManager", t);
- System.exit(-1);
- }
- }
(1)init方法是从Service接口,在AbstractService抽象类中得到实现。在AbstractService类中的init方法调用protected 类型的serviceInit。在其子类NodeMananger中重写了serviceInit方法。
AbstractService抽象类中init方法实现:
- @Override
- public void init(Configuration conf) {
- if (conf == null) {
- throw new ServiceStateException("Cannot initialize service "
- + getName() + ": null configuration");
- }
- if (isInState(STATE.INITED)) {
- return;
- }
- synchronized (stateChangeLock) {
- if (enterState(STATE.INITED) != STATE.INITED) {
- setConfig(conf);
- try {
- serviceInit(config);
- if (isInState(STATE.INITED)) {
- //if the service ended up here during init,
- //notify the listeners
- notifyListeners();
- }
- } catch (Exception e) {
- noteFailure(e);
- ServiceOperations.stopQuietly(LOG, this);
- throw ServiceStateException.convert(e);
- }
- }
- }
- }
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
主要代码:
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
-
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
- NMContainerTokenSecretManager containerTokenSecretManager =
- new NMContainerTokenSecretManager(conf);
-
- NMTokenSecretManagerInNM nmTokenSecretManager =
- new NMTokenSecretManagerInNM();
-
- this.aclsManager = new ApplicationACLsManager(conf);
- //始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
- //包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
- //决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
- ContainerExecutor exec = ReflectionUtils.newInstance(
- conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
- DefaultContainerExecutor.class, ContainerExecutor.class), conf);
- try {
- exec.init();
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed to initialize container executor", e);
- }
- DeletionService del = createDeletionService(exec);
- addService(del);
-
- // NodeManager level dispatcher 异步分发器
- this.dispatcher = new AsyncDispatcher();
- //可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy
- nodeHealthChecker = new NodeHealthCheckerService();
- addService(nodeHealthChecker);
- dirsHandler = nodeHealthChecker.getDiskHandler();
-
- this.context = createNMContext(containerTokenSecretManager,
- nmTokenSecretManager);
- //创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态).
- //这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
- // 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started)
- NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
- addService(nodeResourceMonitor);
- //创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议
- containerManager =
- createContainerManager(context, exec, del, nodeStatusUpdater,
- this.aclsManager, dirsHandler);
- addService(containerManager);
- ((NMContext) context).setContainerManager(containerManager);
-
- // 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042
- WebServer webServer = createWebServer(context, containerManager
- .getContainersMonitor(), this.aclsManager, dirsHandler);
- addService(webServer);
- ((NMContext) context).setWebServer(webServer);
-
- dispatcher.register(ContainerManagerEventType.class, containerManager);
- dispatcher.register(NodeManagerEventType.class, this);
- addService(dispatcher);
- //初始化监控
- DefaultMetricsSystem.initialize("NodeManager");
-
- // StatusUpdater should be added last so that it get started last
- // so that we make sure everything is up before registering with RM.
- addService(nodeStatusUpdater);
-
- super.serviceInit(conf);
- // TODO add local dirs to del
- }
AbstractService中start方法的具体实现:
- @Override
- public void start() {
- if (isInState(STATE.STARTED)) {
- return;
- }
- //enter the started state
- synchronized (stateChangeLock) {
- if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
- try {
- startTime = System.currentTimeMillis();
- //会调用子类NN中重写的同名方法
- serviceStart();
- if (isInState(STATE.STARTED)) {
- //if the service started (and isn't now in a later state), notify
- if (LOG.isDebugEnabled()) {
- LOG.debug("Service " + getName() + " is started");
- }
- notifyListeners();
- }
- } catch (Exception e) {
- noteFailure(e);
- ServiceOperations.stopQuietly(LOG, this);
- throw ServiceStateException.convert(e);
- }
- }
- }
- }
NodeManager中重写的serviceStart方法的主要代码:
- @Override
- protected void serviceStart() throws Exception {
- try {
- doSecureLogin();
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed NodeManager login", e);
- }
- super.serviceStart();
- }
http://www.technology-mania.com/2014/05/an-insight-into-hadoop-yarn-nodemanager.html
http://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。