赞
踩
要分析mr job的提交过程,首先需要一个job的代码,本文选用hadoop2.7.1版本自带的wordcount例子程序为分析对象,现将代码贴出如下:
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在判断状态state可以提交Job后,执行submit()方法。monitorAndPrintJob()方法会不断的刷新获取job运行的进度信息,并打印。boolean参数verbose为true表明要打印运行进度,为false就只是等待job运行结束,不打印运行日志。
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { //当state为define时,则提交,job初始化后即为DEFINE状况 submit(); } if (verbose) { //打印job进行详情的化,每1s进行一次状况获取并输出 monitorAndPrintJob(); } else { //不打印详情的化,每5s进行检测一次是否完成 // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
public void submit() throws IOException, InterruptedException, ClassNotFoundException { //再次检查作业的状态 ensureState(JobState.DEFINE); //两套API,这里使用新API setUseNewAPI(); //连接集群 connect(); //创建jobSubmitter final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { // return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,它实际上是构造集群Cluster实例cluster
对应类Job
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
//cluster是连接MapReduce集群的工具,提供了远程获取MapReduce集群的方法
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
来到了Cluster类。先来看下Cluster类的成员信息。
public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; private ClientProtocolProvider clientProtocolProvider;//客户端通信协议提供者 private ClientProtocol client;//客户端通信协议实例 private UserGroupInformation ugi;//用户组信息 private Configuration conf;//配置信息 private FileSystem fs = null;//文件系统实例 private Path sysDir = null;//系统路径 private Path stagingAreaDir = null;//作业资源存放路径 private Path jobHistoryDir = null;//作业历史信息路径 private static final Log LOG = LogFactory.getLog(Cluster.class); //客户端通信协议提供者加载器,即通过类加载机器加载ClientProtocolProvider接口的实现类 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); } public Cluster(Configuration conf) throws IOException { this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf;//设置配置 this.ugi = UserGroupInformation.getCurrentUser();//获取用户信息 initialize(jobTrackAddr, conf);//初始化 } 。。。 }
Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider和客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //依次取出每个ClientProtocolProvider,通过其create()方法构造ClientProtocol实例 for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { //如果mapred-site.xml配置文件配置的调度框架为local信息,则构建LocalRunner(loaclrunner封装了本地调度器的客户端api接口),MR任务本地运行 //如果mapred-site.xml配置文件配置的调度框架为yarn,则构建YarnRunner(yarnrunner封装了yarn的客户端api接口),MR任务在YARN集群上运行 if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } //设置成员变量clientProtocolProvider和client,并退出循环 if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
相关参数:mapreduce.framework.name 调度框架,可以是local和yarn两种
ClientProtocolProvider有两个实现:
前者为Yarn模式,而后者为Local模式。
我们先看下看下Local模式,代码如下
public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { //参数"mapreduce.framework.name","local" String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); //若framework是local,则返回LocalJobRunner,并且设置Map任务数1;否则返回null if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); return new LocalJobRunner(conf); } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket } @Override public void close(ClientProtocol clientProtocol) { // no clean up required } }
再来看下Yarn模式,代码如下
public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { //若参数mapreduce.framework.name配置为Yarn,则构造一个YARNRunner实例并返回,否则返回null if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); } return null; } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { // nothing to do } }
总结:到此,我们看到Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。
以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现。
public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private ResourceMgrDelegate resMgrDelegate; //ResourceManager代理实例对象 private ClientCache clientCache; //客户端缓存实例 private Configuration conf; private final FileContext defaultFileContext; //文件上下文实例,提供了application操作hdfs的接口,本质是组合了FileSystem实例 /** * Yarn runner incapsulates the client interface of * yarn * @param conf the configuration object for the client */ //先构造ResourceManager代理实例 public YARNRunner(Configuration conf) { this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); } /** * Similar to {@link #YARNRunner(Configuration)} but allowing injecting * {@link ResourceMgrDelegate}. Enables mocking and testing. * @param conf the configuration object for the client * @param resMgrDelegate the resourcemanager client handle. */ //先构造客户端缓存ClientCache实例 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); } /** * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} * but allowing injecting {@link ClientCache}. Enable mocking and testing. * @param conf the configuration object * @param resMgrDelegate the resource manager delegate * @param clientCache the client cache object. */ //获取文件上下文实例 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } }
最重要的一个变量就是ResourceManager的代理ResourceMgrDelegate类型的resMgrDelegate实例,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。ResourceMgrDelegate的具体分析见附录(1)。
另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,具体分析见附录(2)。
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。
再此回到前面,该进入job提交过程中最重要的阶段submitJobInternal()了。
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
首先是,构建JobSubmitter对象,传入FileSystem对象和YarnRunner对象。
class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
private static final int SHUFFLE_KEY_LENGTH = 64;
private FileSystem jtFs; //文件系统实例
private ClientProtocol submitClient; //客户端通信协议实例,用于与集群交互,完成作业提交、作业状态查询等
private String submitHostName; //提交作业的主机名
private String submitHostAddress; //提交作业的主机
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
throws IOException {
this.submitClient = submitClient;
this.jtFs = submitFs;
}
接下来开始分析submitter.submitJobInternal(),此为JobSubmitter唯一对外报漏的接口了。
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs //验证作业输出规格:检查作业输出路径是否配置并且是否存在。正确情况是已经配置且不存在 //输出路径的配置参数为mapreduce.output.fileoutputformat.outputdir checkSpecs(job); Configuration conf = job.getConfiguration(); //添加应用框架路径到分布式缓存中 TODO:目的? addMRFrameworkToDistributedCache(conf); //通过静态方法getStagingDir()获取作业执行时相关资源的存放路径 //对应参数:yarn.app.mapreduce.am.staging-dir,默认值/tmp/hadoop-yarn/staging/提交作业用户名/.staging //.staging路径存在则返回,不存在则创建 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost();//获取当前主机IP if (ip != null) { //记录提交作业的主机IP、主机名,并设置到conf中 submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); //获取 conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID();//请求RM获取新的applicationID,并将applicationID转化为JobID(app_xxx vs job_xxx)具体分析见附录(8) job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //构造提交作业路径,jobStagingArea后接/jobID JobStatus status = null; try { //执行作业的用户设置为当前用户 conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); //mapreduce.job.dir设置为.staging/jobID conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir 获取路径的授权令牌 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); //获取秘钥和令牌,并将它们存储到令牌缓存TokenCache中 populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers //生成一个秘钥来验证洗牌转移 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } //上传job相关的文件到hdfs的.staging/JobID路径下 copyAndConfigureFiles(job, submitJobDir); //构造配置文件路径,即.staging/JobID/job.xml Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job 创建splits LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); // 文件split,写分片数据文件job.splits 和 分片元数据文件job.splitmetainfo,计算map个数 int maps = writeSplits(job, submitJobDir); //设置map数 conf.setInt(MRJobConfig.NUM_MAPS, maps); //mapreduce.job.maps LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. //获取作业队列名,mapreduce.job.queuename,如果不分队列的化就是default String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); //清楚缓存的令牌 //根据参数确定是否要追踪令牌ID TODO:追踪令牌ID的目的? if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids 通过job获取令牌id,并缓存到trackingIds列表中 ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists 设置保留信息,如果它存在 TODO:什么是保留信息? ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir job的conf信息输出到.staging/job.xml文件中 writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // //log打印令牌信息 printTokens(jobId, job.getCredentials()); //通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业 //并获取作业状态实例status。由上下文可知,此处的submitClient是YARNRunner或LocalJobRunner。submitJob的具体细节见附录(5) status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
至此submit分析完了。接着分析monitorAndPrintJob();
近实时(默认1秒)的获取job运行的状态信息和进度信息并打印出来,当有task失败了会打印详细task失败信息。job运行的状态信息是通过rpc从RM处获得。
public boolean monitorAndPrintJob() throws IOException, InterruptedException { String lastReport = null; Job.TaskStatusFilter filter; Configuration clientConf = getConfiguration(); filter = Job.getTaskOutputFilter(clientConf); //FAILED JobID jobId = getJobID(); LOG.info("Running job: " + jobId); int eventCounter = 0; boolean profiling = getProfileEnabled(); //mapreduce.task.profile=false IntegerRanges mapRanges = getProfileTaskRange(true); //mapreduce.task.profile.maps=0-2 IntegerRanges reduceRanges = getProfileTaskRange(false); //mapreduce.task.profile.reduces=0-2 int progMonitorPollIntervalMillis = Job.getProgressPollInterval(clientConf); //mapreduce.client.progressmonitor.pollinterval=1000ms /* make sure to report full progress after the job is done */ boolean reportedAfterCompletion = false; boolean reportedUberMode = false; while (!isComplete() || !reportedAfterCompletion) { //job完成前不断定期汇报状态 if (isComplete()) { //和RM通信更新job执行状态 reportedAfterCompletion = true; } else { Thread.sleep(progMonitorPollIntervalMillis); } if (status.getState() == JobStatus.State.PREP) { continue; } if (!reportedUberMode) { reportedUberMode = true; //是否已uber mode运行job,uber mode的解释见附录(6) LOG.info("Job " + jobId + " running in uber mode : " + isUber()); } String report = (" map " + StringUtils.formatPercent(mapProgress(), 0)+ //map完成进度(直接从rm处获取进度) " reduce " + StringUtils.formatPercent(reduceProgress(), 0)); //reduce完成进度(直接从rm处获取进度) if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } TaskCompletionEvent[] events = getTaskCompletionEvents(eventCounter, 10); //获取距上次汇报后Task完成的事件个数 eventCounter += events.length; printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); // 打印完成的taskevents信息,如果task失败会打印失败详细信息 } boolean success = isSuccessful(); if (success) { LOG.info("Job " + jobId + " completed successfully"); } else { LOG.info("Job " + jobId + " failed with state " + status.getState() + " due to: " + status.getFailureInfo()); } //获取counter信息并输出 Counters counters = getCounters(); if (counters != null) { LOG.info(counters.toString()); } return success; }
(1)Job的submit()方法创建一个内部的JobSummiter实例,并且调用submitJobInternal()方法。提交作业后,waitForCompletion()每秒查询作业进度,如果自上次报告后有改变,就把进度报告到控制台。作业完成后,成功会显示作业计数器,失败会将错误记录显示到控制台。
(2)JobSummiter向资源管理器请求(注册)一个新应用ID,用于MapReduce作业ID。并且检查作业的输出目录,如果输出目录已存在,会报错。然后,计算作业的输入分片,如果分片由于输入路径不存在等无法计算,也报错。
(3)JobSummiter将运行作业所需资源(包括JAR,配置文件和计算得到的输入分片)复制到HDFS中一个以作业ID(job ID)命名的目录下。其中由mapreduce.client.submit.file.replication属性设置的默认JAR副本数为10个。
(4)JobSummiter通过调用资源管理器的submitApplication()方法提交作业。
主要有三个成员变量:
(1)ApplicationSubmissionContext
包含RM需要的关于ApplicationMaster的所有信息
(2)yarnClient
与yarn通信的客户端,包括:RMClient,ApplicationHistoryServerClient,TimelineServerClient
(3) ApplicationId
提交作业时从RM处获取到的jobID
public class ResourceMgrDelegate extends YarnClient { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); private YarnConfiguration conf; private ApplicationSubmissionContext application; private ApplicationId applicationId; @Private @VisibleForTesting protected YarnClient client; private Text rmDTService; /** * Delegate responsible for communicating with the Resource Manager's * {@link ApplicationClientProtocol}. * @param conf the configuration object. */ public ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); this.conf = conf; this.client = YarnClient.createYarnClient(); //new YarnClientImpl() init(conf); //最终调YarnClientImpl.serviceInit() start(); //最终调YarnClientImpl.serviceStart() }
来看看YarnClientImpl.serviceInit(),主要是创建和初始化ApplicationHistoryserverClient和TimelineserverClient。ApplicationHistoryserverClient是RPCClient,TimelineserverClient是httpClient。
protected void serviceInit(Configuration conf) throws Exception { asyncApiPollIntervalMillis = conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); asyncApiPollTimeoutMillis = conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS); submitPollIntervalMillis = asyncApiPollIntervalMillis; if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS) != null) { submitPollIntervalMillis = conf.getLong( YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; historyClient = AHSClient.createAHSClient(); //new AHSClientImpl() historyClient.init(conf); } if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { timelineServiceEnabled = true; timelineClient = createTimelineClient();//new TimelineClientImpl() timelineClient.init(conf); timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf); } timelineServiceBestEffort = conf.getBoolean( YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT); super.serviceInit(conf); }
相关参数:
yarn.client.application-client-protocol.poll-interval-ms:yarnclient定期查询app运行状态的间隔时间,默认200ms
yarn.client.application-client-protocol.poll-timeout-ms:yarnclient查询app运行状态的超时时间,默认-1无超时
再来看看YarnClientImpl.serviceStart(),首先构建了rmClient,然后启动historyClient和timelineClient。
protected void serviceStart() throws Exception {
try {
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
if (historyServiceEnabled) {
historyClient.start();
}
if (timelineServiceEnabled) {
timelineClient.start();
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
再看看ClientRMProxy.createRMProxy,未开启ha的yarn的化直接连接rm的地址,开启ha的化proxy会处理多个rm的failover。
protected static <T> T createRMProxy(final Configuration configuration, final Class<T> protocol, RMProxy instance) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); RetryPolicy retryPolicy = createRetryPolicy(conf); if (HAUtil.isHAEnabled(conf)) { RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol); return (T) RetryProxy.create(protocol, provider, retryPolicy); } else { InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); LOG.info("Connecting to ResourceManager at " + rmAddress); T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); } }
缓存三个对象:1)每个job的ClientSerivceDelegate;2)ResouceMgrDelegate;3)historyServerProxy
public class ClientCache { private final Configuration conf; private final ResourceMgrDelegate rm; private static final Log LOG = LogFactory.getLog(ClientCache.class); private Map<JobID, ClientServiceDelegate> cache = new HashMap<JobID, ClientServiceDelegate>(); //每个job都对应一个ClientServiceDelegate对象 private MRClientProtocol hsProxy; //每个集群只有一个hsproxy public ClientCache(Configuration conf, ResourceMgrDelegate rm) { this.conf = conf; this.rm = rm; } //TODO: evict from the cache on some threshold public synchronized ClientServiceDelegate getClient(JobID jobId) { ... } protected synchronized MRClientProtocol getInitializedHSProxy(){ ... } //在此分析一下YarnRPC的构建,见附录(4) protected MRClientProtocol instantiateHistoryProxy() throws IOException { final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); if (StringUtils.isEmpty(serviceAddr)) { return null; } LOG.debug("Connecting to HistoryServer at: " + serviceAddr); final YarnRPC rpc = YarnRPC.create(conf); LOG.debug("Connected to HistoryServer at: " + serviceAddr); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() { @Override public MRClientProtocol run() { return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf); } }); }
/.staging/路径文件如下:
job.xml是所有的配置信息,包括core-site hdfs-site yarn-site mapreduce-site
以ClientCache类中historyserver rpc构建为例来分析
protected MRClientProtocol instantiateHistoryProxy() throws IOException { //获取historyserver地址 final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); if (StringUtils.isEmpty(serviceAddr)) { return null; } LOG.debug("Connecting to HistoryServer at: " + serviceAddr); //创建rpc对象,实例化了HadoopYarnRpc类对象 final YarnRPC rpc = YarnRPC.create(conf); LOG.debug("Connected to HistoryServer at: " + serviceAddr); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); //使用YarnRPC对象的getProxy获得HSClientProtocol的RPC代理实例,由于rpc协议使用的是protobuf,所以最终是构建了HSClientProtocolPBClientImpl对象 return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() { @Override public MRClientProtocol run() { return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf); } }); }
相关参数:
mapreduce.jobhistory.address
重点看YarnRPC.create()
public static YarnRPC create(Configuration conf) {
LOG.debug("Creating YarnRPC for " +
conf.get(YarnConfiguration.IPC_RPC_IMPL));
String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
if (clazzName == null) {
clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL; //默认为proto
}
//创建HadoopYarnProtoRPC对象
try {
return (YarnRPC) Class.forName(clazzName).newInstance();
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
相关参数:
yarn.ipc.rpc.class yarn的rpc协议类,默认为org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
再看rpc.getProxy()
public class HadoopYarnProtoRPC extends YarnRPC { private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class); @Override //使用广场模式实例化protocl对象 public Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) { LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol); return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1, addr, conf); } @Override public void stopProxy(Object proxy, Configuration conf) { RpcFactoryProvider.getClientFactory(conf).stopClient(proxy); } @Override public Server getServer(Class protocol, Object instance, InetSocketAddress addr, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, String portRangeConfig) { LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + " with " + numHandlers + " handlers"); return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, instance, addr, conf, secretManager, numHandlers, portRangeConfig); } }
YARNRunner.java类
(1)创建启动AM的必要信息
(2)提交作业到RM
(3)构建JobStatus跟踪作业运行状态
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM // 创建启动AM的必要信息 ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } //构建JobStatus对象用来跟踪job运行状态。 return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
createApplicationSubmissionContext(conf, jobSubmitDir, ts)分析
public ApplicationSubmissionContext createApplicationSubmissionContext( Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException { ApplicationId applicationId = resMgrDelegate.getApplicationId(); // Setup resource requirements Resource capability = recordFactory.newRecordInstance(Resource.class); capability.setMemory( conf.getInt( MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB ) ); //yarn.app.mapreduce.am.resource.mb 每个am使用内存大小 capability.setVirtualCores( conf.getInt( MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES ) );//yarn.app.mapreduce.am.resource.cpu-vcores 每个am使用cpu个数 LOG.debug("AppMaster capability = " + capability); // Setup LocalResources 创建本地资源map,代表将job信息下载到NM节点上运行的本地资源 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); //.staging/job.xml URL yarnUrlForJobSubmitDir = ConverterUtils .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem() .resolvePath( defaultFileContext.makeQualified(new Path(jobSubmitDir)))); LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir); //添加job.xml资源 localResources.put(MRJobConfig.JOB_CONF_FILE, createApplicationResource(defaultFileContext, jobConfPath, LocalResourceType.FILE)); //添加job.jar资源 if (jobConf.get(MRJobConfig.JAR) != null) { Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); LocalResource rc = createApplicationResource( FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, LocalResourceType.PATTERN); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); localResources.put(MRJobConfig.JOB_JAR, rc); } else { // Job jar may be null. For e.g, for pipes, the job jar is the hadoop // mapreduce jar itself which is already on the classpath. LOG.info("Job jar is not present. " + "Not adding any jar to the list of resources."); } // TODO gross hack // 添加job.split、job.splitmetainfo资源 for (String s : new String[] { MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO }) { localResources.put( MRJobConfig.JOB_SUBMIT_DIR + "/" + s, createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s), LocalResourceType.FILE)); } // Setup security tokens DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); // Setup the command to run the AM // 构建运行AM的命令参数:就是启动AM进程的java命令行 List<String> vargs = new ArrayList<String>(8); //add /<java_home>/bin/java vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java"); // add lob4j参数 MRApps.addLog4jSystemProperties(null, vargs, conf); // Check for Java Lib Path usage in MAP and REDUCE configs //配置中存在-Djava.library.path则打印warn warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV); warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV); warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); // Add AM admin command opts before user command opts // so that it can be overridden by user String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS); warnForJavaLibPath(mrAppMasterAdminOptions, "app master", MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV); vargs.add(mrAppMasterAdminOptions); // Add AM user command opts String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS); warnForJavaLibPath(mrAppMasterUserOptions, "app master", MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV); //add -Xmx1024m vargs.add(mrAppMasterUserOptions); //am开启profile,profile是mr job的性能诊断功能,具体见附录(7) if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE, MRJobConfig.DEFAULT_MR_AM_PROFILE)) { //yarn.app.mapreduce.am.profile final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);//yarn.app.mapreduce.am.profile if (profileParams != null) { vargs.add(String.format(profileParams, ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + TaskLog.LogName.PROFILE)); } //add -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=/<LOG_DIR>/profile.out } // add org.apache.hadoop.mapreduce.v2.app.MRAppMaster vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); // add 1><LOG_DIR>/stdout vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); // add 2><LOG_DIR>/stderr vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); Vector<String> vargsFinal = new Vector<String>(8); // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); } vargsFinal.add(mergedCommand.toString()); LOG.debug("Command to launch container for ApplicationMaster is : " + mergedCommand); // Setup the CLASSPATH in environment // i.e. add { Hadoop jars, job jar, CWD } to classpath. Map<String, String> environment = new HashMap<String, String>(); MRApps.setClasspath(environment, conf); // Shell environment.put(Environment.SHELL.name(), conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL)); // Add the container working directory at the front of LD_LIBRARY_PATH MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf); // Setup the environment variables for Admin first MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), conf); // Setup the environment variables (LD_LIBRARY_PATH, etc) MRApps.setEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV), conf); // Parse distributed cache MRApps.setupDistributedCache(jobConf, localResources); Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(2); acls.put(ApplicationAccessType.VIEW_APP, jobConf.get( MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get( MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); // Setup ContainerLaunchContext for AM container // 创建AM Container启动上下文对象 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(localResources, environment, vargsFinal, null, securityTokens, acls); Collection<String> tagsFromConf = jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS); // Set up the ApplicationSubmissionContext // 创建ApplicationSubmissionContextPBImpl对象 ApplicationSubmissionContext appContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); // 为appContext设置applicationID appContext.setApplicationId(applicationId); // ApplicationId // 为appContext设置queuename appContext.setQueue( // Queue name jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME)); // add reservationID if present ReservationId reservationID = null; try { reservationID = ReservationId.parseReservationId(jobConf .get(JobContext.RESERVATION_ID)); } catch (NumberFormatException e) { // throw exception as reservationid as is invalid String errMsg = "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID) + " specified for the app: " + applicationId; LOG.warn(errMsg); throw new IOException(errMsg); } if (reservationID != null) { //为appContext设置预留ID TODO:预留ID的作用? appContext.setReservationID(reservationID); LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId + " to queue:" + appContext.getQueue() + " with reservationId:" + appContext.getReservationID()); } //jobname appContext.setApplicationName( // Job name jobConf.get(JobContext.JOB_NAME, YarnConfiguration.DEFAULT_APPLICATION_NAME)); appContext.setCancelTokensWhenComplete( conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)); // amContainer启动上下文对象 appContext.setAMContainerSpec(amContainer); // AM Container // am最多可以有几个attempts,参数mapreduce.am.max-attempts默认2 appContext.setMaxAppAttempts( conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); // 设置资源需求量 appContext.setResource(capability); // 设置appliation类型为mapreduce appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); if (tagsFromConf != null && !tagsFromConf.isEmpty()) { appContext.setApplicationTags(new HashSet<String>(tagsFromConf)); } return appContext; }
接着分析resMgrDelegate.submitApplication(appContext);
可以看到是执行了rmClient.submitApplication(request);由与提及作业是异步的,不知道是否提交成功了。所以提交完后需要获取job提交状态,于是不断的循环构建ApplicationReporter对象来获取ApplicationState,直到提交状况为失败或者成功才退出循环,否则会再次提交。
@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } }
mapreduce job在提交或有输出如下信息:
19/04/02 13:15:55 INFO mapreduce.Job: Running job: job_1552994470849_0005
19/04/02 13:16:03 INFO mapreduce.Job: Job job_1552994470849_0005 running in uber mode : false
19/04/02 13:16:03 INFO mapreduce.Job: map 0% reduce 0%
注意上面日志的第二行,显示job_1552994470849_0005不是以uber模式运行的。这里介绍下uber mode是什么。
背景
在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。Uber运行模式就是为解决此类问题而提出的。
条件
Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,而是将这些小任务统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行。那么什么样的任务,mapreduce框架会认为它是小任务呢?
uberEnabled:其实就是 mapreduce.job.ubertask.enable 参数的值,默认情况下为 false ;也就是说默认情况不启用Uber模式;
smallNumMapTasks:启用Uber模式的作业Map的个数必须小于等于 mapreduce.job.ubertask.maxmaps 参数的值,该值默认为9;也计算说,在默认情况下,如果你想启用Uber模式,作业的Map个数必须小于10;
smallNumReduceTasks:同理,Uber模式的作业Reduce的个数必须小于等于mapreduce.job.ubertask.maxreduces,该值默认为1;也计算说,在默认情况下,如果你想启用Uber模式,作业的Reduce个数必须小于等于1;
smallInput:不是任何作业都适合启用Uber模式的,输入数据的大小必须小于等于 mapreduce.job.ubertask.maxbytes 参数的值,默认情况是HDFS一个文件块大小;
uber启用的效果
我们来看当uber功能被启用的时候,Yarn是如何执行一个application的。首先,Resource Manager里的Application Manager会为每一个application在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。containe启动时便会相应启动一个JVM。此时,如果uber功能被启用,并且该application被认为是一个“小的application”,那么Application Master便会将该application包含的每一个task依次在这个container里的JVM里顺序执行,直到所有task被执行完(“WIth ‘uber’ mode enabled, you’ll run everything within the container of the AM itself”)。这样Application Master便不用再为每一个task向Resource Manager去申请一个单独的container,最终达到了 JVM重用(资源重用)的目的。
启用uber示例
hadoop jar /opt/bmr/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar wordcount -Dmapreduce.job.ubertask.enable=true /ttt /ttt-result29
参见:https://www.linuxidc.com/Linux/2012-01/51616.htm
JobID jobId = submitClient.getNewJobID(),本质是请求RM获取新的Application
YarnRunner.java
public JobID getNewJobID() throws IOException, InterruptedException {
//看这里
return resMgrDelegate.getNewJobID();
}
ResourceManagerDelegate.java
public JobID getNewJobID() throws IOException, InterruptedException {
try {
//看这里
this.application = client.createApplication().getApplicationSubmissionContext();
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
} catch (YarnException e) {
throw new IOException(e);
}
}
YarnClientImpl.java
public YarnClientApplication createApplication()
throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
//看这里
GetNewApplicationResponse newApp = getNewApplication();
ApplicationId appId = newApp.getApplicationId();
context.setApplicationId(appId);
return new YarnClientApplication(newApp, context);
}
YarnClientImpl.java
private GetNewApplicationResponse getNewApplication()
throws YarnException, IOException {
GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
return rmClient.getNewApplication(request);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。