当前位置:   article > 正文

关于Yarn源码那些事-前传之NodeManager(一) 初始化篇_yarnconfiguration初始化信息

yarnconfiguration初始化信息

本文,说下NodeManager篇:本文重在于介绍初始化部分:

还是从start-yarn.sh的脚本追本溯源,最后发现启动的类是NodeManager:

package org.apache.hadoop.yarn.server.nodemanager;
  1. public static void main(String[] args) {
  2. Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  3. StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
  4. NodeManager nodeManager = new NodeManager();
  5. Configuration conf = new YarnConfiguration();
  6. nodeManager.initAndStartNodeManager(conf, false);
  7. }

直接从main方法开始看起:

		NodeManager nodeManager = new NodeManager();

看这句话,是NodeManager的初始化,采用的是父类CompositeService的方法,最终调度到AbstractService内:

  1. /**
  2. * Construct the service.
  3. * @param name service name
  4. */
  5. public AbstractService(String name) {
  6. this.name = name;
  7. stateModel = new ServiceStateModel(name);
  8. }
  1. /**
  2. * Implements the service state model.
  3. */
  4. @Public
  5. @Evolving
  6. public class ServiceStateModel
  1. /**
  2. * Create the service state model in the {@link Service.STATE#NOTINITED}
  3. * state.
  4. */
  5. public ServiceStateModel(String name) {
  6. this(name, Service.STATE.NOTINITED);
  7. }
  1. /** Constructed but not initialized */
  2. NOTINITED(0, "NOTINITED"),

初始化的过程中,给NodeManager初始化了一个状态模型,服务初始状态是STATE.NOTINITED,构建而未曾初始化,这里,我们必须要对状态集中注意力,因为yarn重要的核心就在于基于状态转换的异步处理机制。

接下来,看相应配置的初始化:

		Configuration conf = new YarnConfiguration();

这个没什么可说的,等到用到相应配置再看,YarnConfiguration内部定义了很多的相应参数,前面这几句代码看起来都简单,那么,重头戏肯定就在这里了:

		nodeManager.initAndStartNodeManager(conf, false);
  1. private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
  2. try {
  3. // Remove the old hook if we are rebooting.
  4. if (hasToReboot && null != nodeManagerShutdownHook) {
  5. ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
  6. }
  7. nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
  8. ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY);
  9. // System exit should be called only when NodeManager is instantiated from
  10. // main() funtion
  11. this.shouldExitOnShutdownEvent = true;
  12. this.init(conf);
  13. this.start();
  14. } catch (Throwable t) {
  15. LOG.fatal("Error starting NodeManager", t);
  16. System.exit(-1);
  17. }
  18. }

果然,前面的检查和钩子我们不看了,直接研究其init方法。

  1. @Override
  2. public void init(Configuration conf) {
  3. if (conf == null) {
  4. throw new ServiceStateException("Cannot initialize service "
  5. + getName() + ": null configuration");
  6. }
  7. if (isInState(STATE.INITED)) {
  8. return;
  9. }
  10. synchronized (stateChangeLock) {
  11. if (enterState(STATE.INITED) != STATE.INITED) {
  12. setConfig(conf);
  13. try {
  14. serviceInit(config);
  15. if (isInState(STATE.INITED)) {
  16. //if the service ended up here during init,
  17. //notify the listeners
  18. notifyListeners();
  19. }
  20. } catch (Exception e) {
  21. noteFailure(e);
  22. ServiceOperations.stopQuietly(LOG, this);
  23. throw ServiceStateException.convert(e);
  24. }
  25. }
  26. }
  27. }

init方法,最终调用到了AbstractService的init方法,而内部的重要实现则是serviceInit方法,这是NodeManager自身的方法,注意,这里的判断都是能通过的,因为我们最初的状态时NOTINITED。

我们看看里面调用的serviceInit方法传入的参数,发现传入的是AbstractService内部的conf,而这个conf是从哪儿加载来的?

  1. protected void serviceInit(Configuration conf) throws Exception {
  2. if (conf != config) {
  3. LOG.debug("Config has been overridden during init");
  4. setConfig(conf);
  5. }
  6. }

原来在这里,把我们的YarnConfiguration加载为了AbstractService内部的Configuration:

我们看serviceInit方法:

  1. @Override
  2. protected void serviceInit(Configuration conf) throws Exception {
  3. conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
  4. rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
  5. YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
  6. initAndStartRecoveryStore(conf);
  7. NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf, nmStore);
  8. NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(nmStore);
  9. recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
  10. this.aclsManager = new ApplicationACLsManager(conf);
  11. ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
  12. DefaultContainerExecutor.class, ContainerExecutor.class), conf);
  13. try {
  14. exec.init();
  15. } catch (IOException e) {
  16. throw new YarnRuntimeException("Failed to initialize container executor", e);
  17. }
  18. DeletionService del = createDeletionService(exec);
  19. addService(del);
  20. // NodeManager level dispatcher
  21. this.dispatcher = new AsyncDispatcher();
  22. nodeHealthChecker = new NodeHealthCheckerService();
  23. addService(nodeHealthChecker);
  24. dirsHandler = nodeHealthChecker.getDiskHandler();
  25. this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore);
  26. nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
  27. NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
  28. addService(nodeResourceMonitor);
  29. containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);
  30. addService(containerManager);
  31. ((NMContext) context).setContainerManager(containerManager);
  32. WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager,
  33. dirsHandler);
  34. addService(webServer);
  35. ((NMContext) context).setWebServer(webServer);
  36. dispatcher.register(ContainerManagerEventType.class, containerManager);
  37. dispatcher.register(NodeManagerEventType.class, this);
  38. addService(dispatcher);
  39. DefaultMetricsSystem.initialize("NodeManager");
  40. // StatusUpdater should be added last so that it get started last
  41. // so that we make sure everything is up before registering with RM.
  42. addService(nodeStatusUpdater);
  43. ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
  44. super.serviceInit(conf);
  45. // TODO add local dirs to del
  46. }

内容很长,抽丝剥茧,一点点看。

首先是initAndStartRecoveryStore:

  1. private void initAndStartRecoveryStore(Configuration conf) throws IOException {
  2. boolean recoveryEnabled = conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
  3. YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
  4. if (recoveryEnabled) {
  5. FileSystem recoveryFs = FileSystem.getLocal(conf);
  6. String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
  7. if (recoveryDirName == null) {
  8. throw new IllegalArgumentException(
  9. "Recovery is enabled but " + YarnConfiguration.NM_RECOVERY_DIR + " is not set.");
  10. }
  11. Path recoveryRoot = new Path(recoveryDirName);
  12. recoveryFs.mkdirs(recoveryRoot, new FsPermission((short) 0700));
  13. nmStore = new NMLeveldbStateStoreService();
  14. } else {
  15. nmStore = new NMNullStateStoreService();
  16. }
  17. nmStore.init(conf);
  18. nmStore.start();
  19. }

默认情况下,recoveryEnabled为false,我们直接分析else的代码,其中的init方法,最后还是要走自己的serviceInit方法:

  1. /** Initialize the state storage */
  2. @Override
  3. public void serviceInit(Configuration conf) throws IOException {
  4. initStorage(conf);
  5. }

initStorage内无动作,而且start方法调用的storeStorage方法内也无实现:

继续往下看:

  1. ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
  2. DefaultContainerExecutor.class, ContainerExecutor.class), conf);
  3. try {
  4. exec.init();
  5. } catch (IOException e) {
  6. throw new YarnRuntimeException("Failed to initialize container executor", e);
  7. }

在RM和NM的交互中,Container经常被使用到,而在NodeManager初始化的时候,其就必须知道自己到底有多少可用的Container,而实际的计算和分配,则是由ContainerExecutor来实现的,默认的实现类是:DefaultContainerExecutor:

  1. // NodeManager level dispatcher
  2. this.dispatcher = new AsyncDispatcher();

这句注释很明确,NodeManager级别的调度器,自然还有其他level的调度器,而这个主要用于管理需要NodeManager来处理的事件:

  1. nodeHealthChecker = new NodeHealthCheckerService();
  2. addService(nodeHealthChecker);
  3. dirsHandler = nodeHealthChecker.getDiskHandler();

这个nodeHealthChecker,是用于NM节点健康状态的检测,这里调用的addService,是为了最后的统一初始化调用,所以我们要看看其内部的serviceInit方法:

  1. @Override
  2. protected void serviceInit(Configuration conf) throws Exception {
  3. if (NodeHealthScriptRunner.shouldRun(conf)) {
  4. nodeHealthScriptRunner = new NodeHealthScriptRunner();
  5. addService(nodeHealthScriptRunner);
  6. }
  7. addService(dirsHandler);
  8. super.serviceInit(conf);
  9. }
  1. /*
  2. * Method which initializes the values for the script path and interval time.
  3. */
  4. @Override
  5. protected void serviceInit(Configuration conf) throws Exception {
  6. this.conf = conf;
  7. this.nodeHealthScript =
  8. conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
  9. this.intervalTime = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
  10. YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
  11. this.scriptTimeout = conf.getLong(
  12. YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
  13. YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
  14. String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
  15. new String[] {});
  16. timer = new NodeHealthMonitorExecutor(args);
  17. super.serviceInit(conf);
  18. }
  1. public NodeHealthMonitorExecutor(String[] args) {
  2. ArrayList<String> execScript = new ArrayList<String>();
  3. execScript.add(nodeHealthScript);
  4. if (args != null) {
  5. execScript.addAll(Arrays.asList(args));
  6. }
  7. shexec = new ShellCommandExecutor(execScript
  8. .toArray(new String[execScript.size()]), null, null, scriptTimeout);
  9. }
追本溯源过来,我们发现里面定义了一个定时的脚本执行,来定时检测NM的健康状况。
		this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore);

这句话看似很简单,实际上是代码的集中化,内部的构造非常重要,我们看看:

  1. public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
  2. NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler,
  3. ApplicationACLsManager aclsManager, NMStateStoreService stateStore) {
  4. this.containerTokenSecretManager = containerTokenSecretManager;
  5. this.nmTokenSecretManager = nmTokenSecretManager;
  6. this.dirsHandler = dirsHandler;
  7. this.aclsManager = aclsManager;
  8. this.nodeHealthStatus.setIsNodeHealthy(true);
  9. this.nodeHealthStatus.setHealthReport("Healthy");
  10. this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
  11. this.stateStore = stateStore;
  12. }

在说RM结构的时候,有个rmContext的大管家,而这里,NMContext其实就是每个NM的大管家。

		nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);

所以说Hadoop的代码写的都很清晰明了,一眼就能看出来这个类是用于NM节点状态定时更新的,因为最终需要把这个服务加入到serviceList,我们要看看其初始化的逻辑:

  1. @Override
  2. protected void serviceInit(Configuration conf) throws Exception {
  3. int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
  4. float vMemToPMem = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
  5. YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
  6. int virtualMemoryMb = (int) Math.ceil(memoryMb * vMemToPMem);
  7. int virtualCores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
  8. this.totalResource = Resource.newInstance(memoryMb, virtualCores);
  9. metrics.addResource(totalResource);
  10. this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
  11. this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
  12. YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
  13. this.minimumResourceManagerVersion = conf.get(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
  14. YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
  15. // Default duration to track stopped containers on nodemanager is 10Min.
  16. // This should not be assigned very large value as it will remember all the
  17. // containers stopped during that time.
  18. durationToTrackStoppedContainers = conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 600000);
  19. if (durationToTrackStoppedContainers < 0) {
  20. String message = "Invalid configuration for " + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS
  21. + " default " + "value is 10Min(600000).";
  22. LOG.error(message);
  23. throw new YarnException(message);
  24. }
  25. if (LOG.isDebugEnabled()) {
  26. LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" + durationToTrackStoppedContainers);
  27. }
  28. super.serviceInit(conf);
  29. LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory="
  30. + virtualMemoryMb + " virtual-cores=" + virtualCores);
  31. }

这里,发现了很多从YarnConfiguration加载的东西,我们也就知道为什么默认的NM上只加载了8G的内容给Container使用了,也知道虚拟内存和物理内存的2.1的比例,同时默认占用8个核来使用,这就是NodeManager实际占用到的资源,可供分给Container来使用的资源:

接下来,我们看这部分,用于NM资源的监控,并看看其serviceInit方法:

  1. NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
  2. addService(nodeResourceMonitor);

有点怀疑这是个bug,因为这个类根本没用到,虽然加到service内,但内部不会初始化。

接着,看container的管理器:

  1. containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler);
  2. addService(containerManager);
  3. ((NMContext) context).setContainerManager(containerManager);

我们看看其初始化,捡重点的代码:

  1. // ContainerManager level dispatcher.
  2. dispatcher = new AsyncDispatcher();

其内部有自己的dispatcher,用于处理下面的事件:

  1. dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher());
  2. dispatcher.register(ApplicationEventType.class, new ApplicationEventDispatcher());
  3. dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
  4. dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
  5. dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
  6. dispatcher.register(ContainersLauncherEventType.class, containersLauncher);

平时我们需要分配container和启动container,都是由该类来负责的,最重要的就是container启动的时候,可以看到这段代码在ContainerLauncher内:此处不多说了。

该类重要的代码在初始化时候基本实现完毕,所以不看其serviceInit方法了:

  1. WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager,
  2. dirsHandler);
  3. addService(webServer);

我们知道,NM自身也是有webapp监控的,而其创建的过程,就是在此处:

  1. public WebServer(Context nmContext, ResourceView resourceView, ApplicationACLsManager aclsManager,
  2. LocalDirsHandlerService dirsHandler) {
  3. super(WebServer.class.getName());
  4. this.nmContext = nmContext;
  5. this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
  6. }

其serviceInit为空,不看了。

  1. dispatcher.register(ContainerManagerEventType.class, containerManager);
  2. dispatcher.register(NodeManagerEventType.class, this);
  3. addService(dispatcher);
  4. DefaultMetricsSystem.initialize("NodeManager");
  5. // StatusUpdater should be added last so that it get started last
  6. // so that we make sure everything is up before registering with RM.
  7. addService(nodeStatusUpdater);
  8. ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);

剩下的代码如上,不与分析了。

下文,将会讲述下其相关的服务启动。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/865740
推荐阅读
相关标签
  

闽ICP备14008679号