赞
踩
NodeManager是Yarn中单节点的代理,它管理Hadoop集群中单个计算节点,其需要与应用程序的ApplicationMaster和集群资源管理器RM交互,从ApplicationMaster上接收到相关Container的执行命令(启动,停止Container);并向RM汇报各个Container的运行状态和节点健康状态,并领取相关的Container的执行命令;其主要的功能包括与RM保持通信,管理Container的生命周期,监控每个Container的资源使用情况,追踪节点健康状况,管理日志以及不同应用程序用到的附属服务;
NodeManager通过两个RPC协议与RM和各个ApplicationMaster进行通信:
接下来从源码的角度介绍一下NodeManager上启动一个Container的流程如下,主要就是container对应的状态机的转化流程:
- public StartContainersResponse startContainers(StartContainersRequest requests) throws YarnException, IOException {
- // 权限验证 ......
-
- List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
- Map<ContainerId, SerializedException> failedContainers =
- new HashMap<ContainerId, SerializedException>();
-
- for (StartContainerRequest request : requests.getStartContainerRequests()) {
- ContainerId containerId = null;
- try {
- // container Token获取验证 ......
-
- // 启动container的具体流程函数
- startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, request);
- succeededContainers.add(containerId);
- } catch (YarnException e) {
- // ......
- }
- }
-
- return StartContainersResponse.newInstance(getAuxServiceMetaData(),
- succeededContainers, failedContainers);
- }
- private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerTokenIdentifier containerTokenIdentifier,
- StartContainerRequest request) throws YarnException, IOException {
- // 权限token验证
-
- ContainerId containerId = containerTokenIdentifier.getContainerID();
- String containerIdStr = containerId.toString();
- String user = containerTokenIdentifier.getApplicationSubmitter();
-
- LOG.info("Start request for " + containerIdStr + " by user " + user);
- // 从请求的参数中获取启动container的启动上下文信息,包括cmd、env、jar等等信息
- ContainerLaunchContext launchContext = request.getContainerLaunchContext();
-
- // 构造ContainerImpl状态机来维护一个Container的运行生命周期
- Container container =
- new ContainerImpl(getConfig(), this.dispatcher,
- launchContext, credentials, metrics, containerTokenIdentifier,
- context);
-
- try {
- if (!serviceStopped) {
- // Create the application
- // 会尝试构造一个ApplicationImpl状态机来描述该节点上对同一个应用程序的所有container的管理
- Application application =
- new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
- if (null == context.getApplications().putIfAbsent(applicationID,
- application)) {
- // 如果这个应用第一次在该nodemanager上启动运行,则调度ApplicationInitEvent事件
- // 来初始化ApplicationImpl状态机
- dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls,
- logAggregationContext));
- }
-
- // 调度ApplicationContainerInitEvent事件来初始化该ContainerImpl状态机
- this.context.getNMStateStore().storeContainer(containerId, request);
- dispatcher.getEventHandler().handle(
- new ApplicationContainerInitEvent(container));
-
- // ......
- }
- } finally {
- this.readLock.unlock();
- }
- }
在ApplicationImpl状态机处理ApplicationContainerInitEvent事件的时候,其会使用调度器调度ContainerInitEvent事件来触发对应container的初始化(该ContainerInitEvent事件的响应者为ContainerManagerImpl#ContainerEventDispatcher,其会调用对应的ContainerImpl.handler(event)来进行事件的处理),其主要初始化RequestResourcesTransition包括:下载container所对应的运行任务的资源jar包,env环境变量等等。在ContainerImpl本地化资源完成后,其会收到来自本地资源服务LocalizedResource发送的Resource_localized事件标识本地资源已经异步下载完全。之后ContainerImpl将会调度处理该事件(调用hook状态转移函数LocalizedTransition来触发当前container的LaunchEven操作);
- static class LocalizedTransition implements
- MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
- @Override
- public ContainerState transition(ContainerImpl container,
- ContainerEvent event) {
- // 判断本地资源的下载状态
- ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
- List<String> syms =
- container.pendingResources.remove(rsrcEvent.getResource());
- container.localizedResources.put(rsrcEvent.getLocation(), syms);
- if (!container.pendingResources.isEmpty()) {
- return ContainerState.LOCALIZING;
- }
-
- // 当前资源已经下载完毕
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(LocalizationEventType.
- CONTAINER_RESOURCES_LOCALIZED, container));
-
- // 触发container的LaunchEvent事件来启动container
- container.sendLaunchEvent();
- container.metrics.endInitingContainer();
- return ContainerState.LOCALIZED;
- }
- }
-
- private void sendLaunchEvent() {
- ContainersLauncherEventType launcherEvent =
- ContainersLauncherEventType.LAUNCH_CONTAINER;
- if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
- // try to recover a container that was previously launched
- launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
- }
- dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(this, launcherEvent));
- }
在ContainerManagerImpl中定义了处理ContainersLauncherEventType事件对于的事件处理器,如下:dispatcher.register(ContainersLauncherEventType.class, containersLauncher);可以知道在containerImpl向调度器发送了ContainersLauncherEvent事件之后,对应的ContainersLauncher对象实例会对该LAUNCH_CONTAINER事件进行对应的处理:
- // ContainersLauncher
- public void handle(ContainersLauncherEvent event) {
- // TODO: ContainersLauncher launches containers one by one!!
- Container container = event.getContainer();
- ContainerId containerId = container.getContainerId();
- switch (event.getType()) {
- case LAUNCH_CONTAINER: // container启动
- Application app =
- context.getApplications().get(
- containerId.getApplicationAttemptId().getApplicationId());
-
- // 封装成ContainerLaunch对象,并在线程池中异步执行启动container的命令
- ContainerLaunch launch =
- new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
- event.getContainer(), dirsHandler, containerManager);
- containerLauncher.submit(launch);
- running.put(containerId, launch);
- break;
- case RECOVER_CONTAINER: // container恢复
- app = context.getApplications().get(
- containerId.getApplicationAttemptId().getApplicationId());
- launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
- exec, app, event.getContainer(), dirsHandler, containerManager);
- containerLauncher.submit(launch);
- running.put(containerId, launch);
- break;
- case CLEANUP_CONTAINER: // container清理
- ContainerLaunch launcher = running.remove(containerId);
- if (launcher == null) {
- // Container not launched. So nothing needs to be done.
- return;
- }
- // Cleanup a container whether it is running/killed/completed, so that
- // no sub-processes are alive.
- try {
- launcher.cleanupContainer();
- } catch (IOException e) {
- LOG.warn("Got exception while cleaning container " + containerId
- + ". Ignoring.");
- }
- break;
- }
- }
- // ContainerLaunch
- @SuppressWarnings("unchecked") // dispatcher not typed
- public Integer call() {
- final ContainerLaunchContext launchContext = container.getLaunchContext();
- // 资源、本地文件tmp、logdir设置等等
- // 命令cmd设置 及 本地可执行环境env变量设置
- // 可执行环境classpath、jar路径设置等等
-
- // LaunchContainer is a blocking call. We are here almost means the
- // container is launched, so send out the event.
- dispatcher.getEventHandler().handle(new ContainerEvent(
- containerID,
- ContainerEventType.CONTAINER_LAUNCHED));
- context.getNMStateStore().storeContainerLaunched(containerID);
-
- // 调用ContainerExecutor来进行container的具体启动操作
- // 其有两种基本的实现:DefaultContainerExecutor和LinuxContainerExecutor
- // 最终使用shell命令来启动对应container运行任务的进程。
- exec.activateContainer(containerID, pidFilePath);
- ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
- nmPrivateTokensPath, user, appIdStr, containerWorkDir,
- localDirs, logDirs);
-
- LOG.info("Container " + containerIdStr + " succeeded ");
- dispatcher.getEventHandler().handle(
- new ContainerEvent(containerID,
- ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
- return 0;
- }
其container运行启动过程如下所示:
NodeManager的内部架构如下图所示:
NodeManager主要组件也是通过事件进行交互的,这使得组件能够异步并发完成各种功能;如下图所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。