赞
踩
本文,说下NodeManager篇:本文重在于介绍初始化部分:
还是从start-yarn.sh的脚本追本溯源,最后发现启动的类是NodeManager:
package org.apache.hadoop.yarn.server.nodemanager;
- public static void main(String[] args) {
- Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
- NodeManager nodeManager = new NodeManager();
- Configuration conf = new YarnConfiguration();
- nodeManager.initAndStartNodeManager(conf, false);
- }
直接从main方法开始看起:
NodeManager nodeManager = new NodeManager();
看这句话,是NodeManager的初始化,采用的是父类CompositeService的方法,最终调度到AbstractService内:
- /**
- * Construct the service.
- * @param name service name
- */
- public AbstractService(String name) {
- this.name = name;
- stateModel = new ServiceStateModel(name);
- }
- /**
- * Implements the service state model.
- */
- @Public
- @Evolving
- public class ServiceStateModel
- /**
- * Create the service state model in the {@link Service.STATE#NOTINITED}
- * state.
- */
- public ServiceStateModel(String name) {
- this(name, Service.STATE.NOTINITED);
- }
- /** Constructed but not initialized */
- NOTINITED(0, "NOTINITED"),
初始化的过程中,给NodeManager初始化了一个状态模型,服务初始状态是STATE.NOTINITED,构建而未曾初始化,这里,我们必须要对状态集中注意力,因为yarn重要的核心就在于基于状态转换的异步处理机制。
接下来,看相应配置的初始化:
Configuration conf = new YarnConfiguration();
这个没什么可说的,等到用到相应配置再看,YarnConfiguration内部定义了很多的相应参数,前面这几句代码看起来都简单,那么,重头戏肯定就在这里了:
nodeManager.initAndStartNodeManager(conf, false);
- private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
- try {
-
- // Remove the old hook if we are rebooting.
- if (hasToReboot && null != nodeManagerShutdownHook) {
- ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
- }
-
- nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
- ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY);
- // System exit should be called only when NodeManager is instantiated from
- // main() funtion
- this.shouldExitOnShutdownEvent = true;
- this.init(conf);
- this.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting NodeManager", t);
- System.exit(-1);
- }
- }
果然,前面的检查和钩子我们不看了,直接研究其init方法。
- @Override
- public void init(Configuration conf) {
- if (conf == null) {
- throw new ServiceStateException("Cannot initialize service "
- + getName() + ": null configuration");
- }
- if (isInState(STATE.INITED)) {
- return;
- }
- synchronized (stateChangeLock) {
- if (enterState(STATE.INITED) != STATE.INITED) {
- setConfig(conf);
- try {
- serviceInit(config);
- if (isInState(STATE.INITED)) {
- //if the service ended up here during init,
- //notify the listeners
- notifyListeners();
- }
- } catch (Exception e) {
- noteFailure(e);
- ServiceOperations.stopQuietly(LOG, this);
- throw ServiceStateException.convert(e);
- }
- }
- }
- }
init方法,最终调用到了AbstractService的init方法,而内部的重要实现则是serviceInit方法,这是NodeManager自身的方法,注意,这里的判断都是能通过的,因为我们最初的状态时NOTINITED。
我们看看里面调用的serviceInit方法传入的参数,发现传入的是AbstractService内部的conf,而这个conf是从哪儿加载来的?
- protected void serviceInit(Configuration conf) throws Exception {
- if (conf != config) {
- LOG.debug("Config has been overridden during init");
- setConfig(conf);
- }
- }
原来在这里,把我们的YarnConfiguration加载为了AbstractService内部的Configuration:
我们看serviceInit方法:
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
-
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
- rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
- YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
-
- initAndStartRecoveryStore(conf);
-
- NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf, nmStore);
-
- NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(nmStore);
-
- recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
-
- this.aclsManager = new ApplicationACLsManager(conf);
-
- ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
- DefaultContainerExecutor.class, ContainerExecutor.class), conf);
- try {
- exec.init();
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed to initialize container executor", e);
- }
- DeletionService del = createDeletionService(exec);
- addService(del);
-
- // NodeManager level dispatcher
- this.dispatcher = new AsyncDispatcher();
-
- nodeHealthChecker = new NodeHealthCheckerService();
- addService(nodeHealthChecker);
- dirsHandler = nodeHealthChecker.getDiskHandler();
-
- this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore);
-
- nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
-
- NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
- addService(nodeResourceMonitor);
-
- containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);
- addService(containerManager);
- ((NMContext) context).setContainerManager(containerManager);
-
- WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager,
- dirsHandler);
- addService(webServer);
- ((NMContext) context).setWebServer(webServer);
-
- dispatcher.register(ContainerManagerEventType.class, containerManager);
- dispatcher.register(NodeManagerEventType.class, this);
- addService(dispatcher);
-
- DefaultMetricsSystem.initialize("NodeManager");
-
- // StatusUpdater should be added last so that it get started last
- // so that we make sure everything is up before registering with RM.
- addService(nodeStatusUpdater);
- ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
-
- super.serviceInit(conf);
- // TODO add local dirs to del
- }
内容很长,抽丝剥茧,一点点看。
首先是initAndStartRecoveryStore:
- private void initAndStartRecoveryStore(Configuration conf) throws IOException {
- boolean recoveryEnabled = conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
- YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
- if (recoveryEnabled) {
- FileSystem recoveryFs = FileSystem.getLocal(conf);
- String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
- if (recoveryDirName == null) {
- throw new IllegalArgumentException(
- "Recovery is enabled but " + YarnConfiguration.NM_RECOVERY_DIR + " is not set.");
- }
- Path recoveryRoot = new Path(recoveryDirName);
- recoveryFs.mkdirs(recoveryRoot, new FsPermission((short) 0700));
- nmStore = new NMLeveldbStateStoreService();
- } else {
- nmStore = new NMNullStateStoreService();
- }
- nmStore.init(conf);
- nmStore.start();
- }
默认情况下,recoveryEnabled为false,我们直接分析else的代码,其中的init方法,最后还是要走自己的serviceInit方法:
- /** Initialize the state storage */
- @Override
- public void serviceInit(Configuration conf) throws IOException {
- initStorage(conf);
- }
initStorage内无动作,而且start方法调用的storeStorage方法内也无实现:
继续往下看:
- ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
- DefaultContainerExecutor.class, ContainerExecutor.class), conf);
- try {
- exec.init();
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed to initialize container executor", e);
- }
在RM和NM的交互中,Container经常被使用到,而在NodeManager初始化的时候,其就必须知道自己到底有多少可用的Container,而实际的计算和分配,则是由ContainerExecutor来实现的,默认的实现类是:DefaultContainerExecutor:
- // NodeManager level dispatcher
- this.dispatcher = new AsyncDispatcher();
这句注释很明确,NodeManager级别的调度器,自然还有其他level的调度器,而这个主要用于管理需要NodeManager来处理的事件:
- nodeHealthChecker = new NodeHealthCheckerService();
- addService(nodeHealthChecker);
- dirsHandler = nodeHealthChecker.getDiskHandler();
这个nodeHealthChecker,是用于NM节点健康状态的检测,这里调用的addService,是为了最后的统一初始化调用,所以我们要看看其内部的serviceInit方法:
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- if (NodeHealthScriptRunner.shouldRun(conf)) {
- nodeHealthScriptRunner = new NodeHealthScriptRunner();
- addService(nodeHealthScriptRunner);
- }
- addService(dirsHandler);
- super.serviceInit(conf);
- }
- /*
- * Method which initializes the values for the script path and interval time.
- */
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- this.conf = conf;
- this.nodeHealthScript =
- conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
- this.intervalTime = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
- YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
- this.scriptTimeout = conf.getLong(
- YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
- String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
- new String[] {});
- timer = new NodeHealthMonitorExecutor(args);
- super.serviceInit(conf);
- }
- public NodeHealthMonitorExecutor(String[] args) {
- ArrayList<String> execScript = new ArrayList<String>();
- execScript.add(nodeHealthScript);
- if (args != null) {
- execScript.addAll(Arrays.asList(args));
- }
- shexec = new ShellCommandExecutor(execScript
- .toArray(new String[execScript.size()]), null, null, scriptTimeout);
- }
追本溯源过来,我们发现里面定义了一个定时的脚本执行,来定时检测NM的健康状况。
this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore);
这句话看似很简单,实际上是代码的集中化,内部的构造非常重要,我们看看:
- public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler,
- ApplicationACLsManager aclsManager, NMStateStoreService stateStore) {
- this.containerTokenSecretManager = containerTokenSecretManager;
- this.nmTokenSecretManager = nmTokenSecretManager;
- this.dirsHandler = dirsHandler;
- this.aclsManager = aclsManager;
- this.nodeHealthStatus.setIsNodeHealthy(true);
- this.nodeHealthStatus.setHealthReport("Healthy");
- this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
- this.stateStore = stateStore;
- }
在说RM结构的时候,有个rmContext的大管家,而这里,NMContext其实就是每个NM的大管家。
nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
所以说Hadoop的代码写的都很清晰明了,一眼就能看出来这个类是用于NM节点状态定时更新的,因为最终需要把这个服务加入到serviceList,我们要看看其初始化的逻辑:
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
- float vMemToPMem = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
- YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
- int virtualMemoryMb = (int) Math.ceil(memoryMb * vMemToPMem);
-
- int virtualCores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
-
- this.totalResource = Resource.newInstance(memoryMb, virtualCores);
- metrics.addResource(totalResource);
- this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
- this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
-
- this.minimumResourceManagerVersion = conf.get(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
- YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
-
- // Default duration to track stopped containers on nodemanager is 10Min.
- // This should not be assigned very large value as it will remember all the
- // containers stopped during that time.
- durationToTrackStoppedContainers = conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 600000);
- if (durationToTrackStoppedContainers < 0) {
- String message = "Invalid configuration for " + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS
- + " default " + "value is 10Min(600000).";
- LOG.error(message);
- throw new YarnException(message);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" + durationToTrackStoppedContainers);
- }
- super.serviceInit(conf);
- LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory="
- + virtualMemoryMb + " virtual-cores=" + virtualCores);
- }
这里,发现了很多从YarnConfiguration加载的东西,我们也就知道为什么默认的NM上只加载了8G的内容给Container使用了,也知道虚拟内存和物理内存的2.1的比例,同时默认占用8个核来使用,这就是NodeManager实际占用到的资源,可供分给Container来使用的资源:
接下来,我们看这部分,用于NM资源的监控,并看看其serviceInit方法:
- NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
- addService(nodeResourceMonitor);
有点怀疑这是个bug,因为这个类根本没用到,虽然加到service内,但内部不会初始化。
接着,看container的管理器:
- containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);
- addService(containerManager);
- ((NMContext) context).setContainerManager(containerManager);
我们看看其初始化,捡重点的代码:
- // ContainerManager level dispatcher.
- dispatcher = new AsyncDispatcher();
其内部有自己的dispatcher,用于处理下面的事件:
- dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
- dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher());
- dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
- dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
- dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
- dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
平时我们需要分配container和启动container,都是由该类来负责的,最重要的就是container启动的时候,可以看到这段代码在ContainerLauncher内:此处不多说了。
该类重要的代码在初始化时候基本实现完毕,所以不看其serviceInit方法了:
- WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager,
- dirsHandler);
- addService(webServer);
我们知道,NM自身也是有webapp监控的,而其创建的过程,就是在此处:
- public WebServer(Context nmContext, ResourceView resourceView, ApplicationACLsManager aclsManager,
- LocalDirsHandlerService dirsHandler) {
- super(WebServer.class.getName());
- this.nmContext = nmContext;
- this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
- }
其serviceInit为空,不看了。
- dispatcher.register(ContainerManagerEventType.class, containerManager);
- dispatcher.register(NodeManagerEventType.class, this);
- addService(dispatcher);
-
- DefaultMetricsSystem.initialize("NodeManager");
-
- // StatusUpdater should be added last so that it get started last
- // so that we make sure everything is up before registering with RM.
- addService(nodeStatusUpdater);
- ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
剩下的代码如上,不与分析了。
下文,将会讲述下其相关的服务启动。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。