在yarn资源调度系统中,Client端负责为ApplicationMaster申请资源,并提交ApplicationMaster给ResourceManager。下面讲述了client端的处理过程,从framework request 到对应的ApplicationMaster运行起来。
1. 起点方法 service.onFrameworkRequestsUpdated(frameworkRequest) (位于xxx.service.RequestManager.java)
frameworkRequest 的类型是Map<String, FrameworkRequest>
在该方法中会调用createApplication()
2. createApplication() (位于xxx.service.Service.java)
在该方法中,取出所有Framework状态为FRAMEWORK_WAITING的对象,对每一个具体的对象调用createApplication(frameworkStatus)。
FrameworkStatus类中存储了Framework的有关信息,有以下属性:


private String frameworkName; private Integer frameworkVersion; // Framework dynamic status private FrameworkState frameworkState; private RetryPolicyState frameworkRetryPolicyState; private Long frameworkCreatedTimestamp; private Long frameworkCompletedTimestamp; // Framework's current associated Application status // Note other status can be retrieved from RM private String applicationId; private Float applicationProgress; private String applicationTrackingUrl; private Long applicationLaunchedTimestamp; private Long applicationCompletedTimestamp; private Integer applicationExitCode; private String applicationExitDiagnostics; private ExitType applicationExitType;
在创建一个具体的applicationMaster时,需要准备提交上下文(ApplicationSubmissionContext)。这里注意,是由yarnClient对象来获取的,而这个对象是service的属性,也就是说它管理所有framework的提交上下文。后面会再新产生一个yarnClient对象来负责每一个具体的Framework。


ApplicationSubmissionContext applicationContext = yarnClient.createApplication().getApplicationSubmissionContext();
下面开始假设有一个具体的Framework A。
准备提交上下文后,Framework A 的状态变为APPLICATION_CREATED
接下来需要填充这个对象,在里面设置更具体的信息,调用方法setupApplicationContext(frameworkStatusSnapshot, applicationContext) 实现。
3. setupApplicationContext(frameworkStatusSnapshot, applicationContext) (位于xxx.service.Service.java)
这个方法中先对Framework进行一个检查,然后再设置需要申请哪些资源,是最复杂的一步了。
1) 查询statusManager和 requestManager中是否有这个Framework,如果存在,进行下面的资源设置。不存在报warning,返回。
2) 根据FrameworkDescriptor对象获取Framework的资源申请信息,放到PlatformSpecificParametersDescriptor类型的对象中。这里应该是由用户提交的文件中获取的。其中ResourceDescriptor类中具体描述了CPU、memory、GPU等信息。下面分别是FrameworkDescriptor、PlatformSpecificParametersDescriptor及ResourceDescriptor中的属性:


public class FrameworkDescriptor implements Serializable { @Valid @Size(max = 512) private String description; @Valid @NotNull // version change will trigger the whole Framework NonRolling Upgrade private Integer version; @Valid @NotNull private RetryPolicyDescriptor retryPolicy = new RetryPolicyDescriptor(); @Valid private ParentFrameworkDescriptor parentFramework; @Valid @NotNull private UserDescriptor user = UserDescriptor.newInstance(); @Valid @NotEmpty @MapKeyNamingValidation private Map<String, TaskRoleDescriptor> taskRoles; @Valid @NotNull private PlatformSpecificParametersDescriptor platformSpecificParameters = new PlatformSpecificParametersDescriptor();


// Computation Platform Specific Parameters public class PlatformSpecificParametersDescriptor implements Serializable { @Valid @GpuConsistentValidation // If you want to use the LauncherConfiguration.amDefaultResource, do not set it or set it to null. private ResourceDescriptor amResource; @Valid private String amNodeLabel; @Valid private String taskNodeLabel; @Valid @Pattern(regexp = "^[^\\s]{1,256}$") private String taskNodeGpuType; @Valid @NotNull private String queue = "default"; @Valid @NotNull // -1 means unlimit. // -2 means using default value: LauncherConfiguration.RMResyncFrequency. private Integer containerConnectionMaxLostCount = -2; @Valid @NotNull // No unlimit option, since exceed Container must be released eventually. private Integer containerConnectionMaxExceedCount = 2; @Valid @NotNull // If this feature enabled, different Tasks is ensured to run on different nodes. private Boolean antiaffinityAllocation = false; @Valid @NotNull // If this feature enabled, all Running Tasks will be killed after any TASK_COMPLETED. private Boolean killAllOnAnyCompleted = false; @Valid @NotNull // If this feature enabled, all Running Tasks will be killed after any TASK_COMPLETED // which is due to the exit of UserService. private Boolean killAllOnAnyServiceCompleted = false; @Valid @NotNull // If this feature enabled, AM will wait until all Tasks become CONTAINER_ALLOCATED and // then Launches them together. // Besides, a ContainerIpList file will be generated in each Task's current working directory. // All the Tasks' IPAddresses are recorded consistently in this file, and the assigned current // Task's IPAddress can be retrieved from its environment variable CONTAINER_IP. private Boolean generateContainerIpList = false; @Valid @NotNull private AMType amType = AMType.DEFAULT; @Valid @NotNull // The following will take effect only if amType is "AGENT". // If this feature enabled, Agent will be enabled to send heartbeats to AM. private Boolean agentUseHeartbeat = false; @Valid @NotNull // The following will take effect only if amType is "AGENT" and AgentUseAgent flag is true. // Frameworks should not set agentHeartbeatIntervalSec to be smaller than LauncherStatus.AgentAMCheckAgentHearbeatsIntervalSec private Integer agentHeartbeatIntervalSec = 30; @Valid @NotNull // This is the value when AgentAM does not receive the heartbeats for this interval, the agent is treated as expired. // It should be a value larger than agentHeartbeatIntervalSec. private Integer agentExpiryIntervalSec = 180; @Valid @NotNull // If this feature enabled, Agent will be enabled to do health checking for user applications. private Boolean agentUseHealthCheck = false;


@Valid @NotNull private Integer cpuNumber; @Valid @NotNull private Integer memoryMB; @Valid @NotNull private List<Range> portRanges = new ArrayList<>(); @Valid @NotNull private DiskType diskType = DiskType.HDD; @Valid @NotNull private Integer diskMB = 0; @Valid @NotNull private Integer gpuNumber = 0; @Valid @NotNull private Long gpuAttribute = 0L;
3)提取PlatformSpecificParametersDescriptor中的信息,调用hadoop接口进行设置。具体有以下内容:applicationName,applicationType,优先级,Resource(cpu、mem、gpu),队列,还有一些其他信息。


applicationContext.setKeepContainersAcrossApplicationAttempts(true); applicationContext.setMaxAppAttempts(conf.getAmAttemptMaxCount()); applicationContext.setAttemptFailuresValidityInterval(conf.getAmAttemptFailuresValidityIntervalSec() * 1000);
4)设置运行ApplicationMaster的container的环境变量(environments)、运行命令(command)和将运行时需要的文件(localResources)从hdfs中下下来、还有token。这些内容都在对象ContainerLaunchContext中,入口是这个方法:


applicationContext.setAMContainerSpec(setupContainerLaunchContext(frameworkStatus, frameworkRequest, resource));
下面是具体的设置。amType是default的时候进入默认设置,amType另一种身份是AGENT,还未细看。
运行命令统一是执行一个java程序,也就是ApplicationMaster包。
注意这里在环境变量中写入了FrameworkName,后面ApplicationMaster启动的时候会根据这个FrameworkName从zk中通过pullRequest()方法pull下完整的信息。


private ContainerLaunchContext setupContainerLaunchContext( FrameworkStatus frameworkStatus, FrameworkRequest frameworkRequest, Resource amResource) throws Exception { String frameworkName = frameworkStatus.getFrameworkName(); AMType amType = frameworkRequest.getFrameworkDescriptor().getPlatformSpecificParameters().getAmType(); hdfsStore.makeFrameworkRootDir(frameworkName); HadoopUtils.invalidateLocalResourcesCache(); switch (amType) { case DEFAULT: default: { if (amType != AMType.DEFAULT) { LOGGER.logWarning("Unsupported AM type: [%s]. Using the default AM instead.", amType); } return setupContainerLaunchContextForDefaultAM(frameworkStatus, frameworkRequest, amResource); } } } private ContainerLaunchContext setupContainerLaunchContextForDefaultAM( FrameworkStatus frameworkStatus, FrameworkRequest frameworkRequest, Resource amResource) throws Exception { String frameworkName = frameworkStatus.getFrameworkName(); Integer frameworkVersion = frameworkStatus.getFrameworkVersion(); UserDescriptor loggedInUser = statusManager.getLoggedInUser(); // SetupLocalResources Map<String, LocalResource> localResources = new HashMap<>(); hdfsStore.makeFrameworkRootDir(frameworkName); HadoopUtils.addToLocalResources(localResources, hdfsStore.uploadAMPackageFile(frameworkName)); // SetupLocalEnvironment Map<String, String> localEnvs = new HashMap<>(); // Internal class is preferred over external class localEnvs.put(ApplicationConstants.Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true"); StringBuilder classpath = new StringBuilder("*"); for (String c : yarnConf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { classpath.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(c.trim()); } classpath.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(ApplicationConstants.Environment.CLASSPATH.$$()); localEnvs.put(GlobalConstants.ENV_VAR_CLASSPATH, classpath.toString()); // Use the user for LauncherAM the same as LauncherService, since they are all Launcher executables. localEnvs.put(GlobalConstants.ENV_VAR_HADOOP_USER_NAME, loggedInUser.getName()); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_NAME, frameworkName); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_VERSION, frameworkVersion.toString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_CONNECT_STRING, conf.getZkConnectString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_ROOT_DIR, conf.getZkRootDir()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_COMPRESSION_ENABLE, conf.getZkCompressionEnable().toString()); localEnvs.put(GlobalConstants.ENV_VAR_AM_VERSION, conf.getAmVersion().toString()); localEnvs.put(GlobalConstants.ENV_VAR_AM_RM_HEARTBEAT_INTERVAL_SEC, conf.getAmRmHeartbeatIntervalSec().toString()); // SetupEntryPoint Vector<CharSequence> vargs = new Vector<>(30); vargs.add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java"); vargs.add("-DLOG_DIRS=$LOG_DIRS"); vargs.add("-Xmx" + amResource.getMemory() + "m"); vargs.add(GlobalConstants.MAIN_CLASS_APPLICATION_MASTER); vargs.add(String.format( "1>%1$sstdout 2>%1$sstderr", ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator)); StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } List<String> commands = new ArrayList<>(); commands.add(command.toString()); // SetupSecurityTokens ByteBuffer fsTokens = null; if (UserGroupInformation.isSecurityEnabled()) { // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = yarnConf.get(YarnConfiguration.RM_PRINCIPAL); FileSystem fs = FileSystem.get(yarnConf); if (tokenRenewer == null || tokenRenewer.length() == 0) { throw new IOException( "Can't get Master Kerberos principal for the RM to use as renewer"); } // For now, only getting tokens for the default file-system. final org.apache.hadoop.security.token.Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); if (tokens != null) { for (org.apache.hadoop.security.token.Token<?> token : tokens) { LOGGER.logInfo("Got dt for " + fs.getUri() + "; " + token); } } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } return ContainerLaunchContext.newInstance( localResources, localEnvs, commands, null, fsTokens, null); }
所有内容设置完毕后,将运行任务launchApplication(frameworkStatus, applicationContext) 加入队列中。


transitionFrameworkStateQueue.queueSystemTask(() -> {
launchApplication(frameworkStatus, applicationContext);
});
这时,Framework A的状态变为了APPLICATION_LAUNCHED
4. launchApplication(frameworkStatus, applicationContext) (位于xxx.service.Service.java)
同第三步一样,在运行真正提交前,像statusManager和requestManager查询是否有这个Framework。
获取FrameworkDescriptor中的用户名字,稍后拿该用户名调用hadoop接口获取UserGroupInformation,这里面就包含了用户名和所在组的信息。
新建一个yarnClient对象,这个yarnClient对象只负责这一个Framework,init、start、提交ApplicationMaster运行的上下文、stop:


public static void submitApplication( ApplicationSubmissionContext appContext, UserDescriptor user) throws Throwable { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user.getName()); // Need to start a new YarnClient for a new UGI, since its internal Hadoop RPC // reuse the UGI after YarnClient.start(). try { ugi.doAs((PrivilegedExceptionAction<Void>) () -> { YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); yarnClient.submitApplication(appContext); yarnClient.stop(); return null; }); } catch (UndeclaredThrowableException e) { throw e.getCause(); } }
至此,Client的工作就完成了,即读取用户的配置信息,向RM申请资源运行ApplicationMaster。