赞
踩
Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。
以下的例子以Spark On Yarn的模式来设计的。
package com.donny.bigdata.surveillance.components.spark; import com.donny.bigdata.surveillance.conf.Spark2Config; import com.donny.bigdata.surveillance.conf.YarnConfig; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.*; /** * @author 1792998761@qq.com * @date 2023/3/30 18:00 */ public class SparkOnYarnHelper { private static final Logger LOGGER = LogManager.getLogger(SparkOnYarnHelper.class); /** * 客户端上的hadoop配置文件根目录 */ private static String hadoopConfDir; /** * 客户端上的yarn配置文件根目录 */ private static String yarnConfDir; /** * 客户端上的javaHome */ private static String javaHome; /** * 客户端上的Spark配置文件根目录 */ private static String sparkHome; /** * 提交到yarn上的Spark任务的jar的Path */ private static String appReSourcePath = System.getProperty("user.dir"); /** * Spark任务的jar的入口函数 */ private static String appMainClass; /** * 运行spark任务的用户 */ public static String hadoopUserName; /** * 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数 */ private static Integer retryMaxCount; /** * 初始化环境 */ public static void init() { hadoopConfDir = Spark2Config.getString("spark2.hadoop_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf"); yarnConfDir = Spark2Config.getString("spark2.yarn_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf"); javaHome = Spark2Config.getString("spark2.java_home", "/data/java"); sparkHome = Spark2Config.getString("spark2.spark_home", "/data/hdp/current/spark2-client"); appReSourcePath = appReSourcePath + Spark2Config.getString("spark2.app_resource_path", "/plugins/spark2-monitor-1.0.jar"); appMainClass = Spark2Config.getString("spark2.app_main_class", "com.donny.base.monitor.Spark2Monitor"); hadoopUserName = Spark2Config.getString("hadoop_user_name", "spark"); retryMaxCount = Spark2Config.getInt("spark2.yarn_report_retry", 3); if (LOGGER.isDebugEnabled()) { LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir); LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir); LOGGER.debug("javaHome=[{}].", javaHome); LOGGER.debug("sparkHome=[{}].", sparkHome); LOGGER.debug("appReSourcePath=[{}].", appReSourcePath); LOGGER.debug("appMainClass=[{}].", appMainClass); LOGGER.debug("hadoopUserName=[{}].", hadoopUserName); LOGGER.debug("retryMaxCount[{}].", retryMaxCount); } } /** * 将任务提交到yarn * * @param hiveTableName Spark任务中使用的hive表的表名 * @return 执行结果字符串 */ public static String submitJobToYarn(String hiveTableName) { String result; LOGGER.info(" Spark2 job is starting... "); HashMap<String, String> env = new HashMap<>(4); env.put("HADOOP_CONF_DIR", hadoopConfDir); env.put("JAVA_HOME", javaHome); env.put("YARN_CONF_DIR", yarnConfDir); env.put("HADOOP_USER_NAME", hadoopUserName); // 控制SparkAppHandle监听,直达状态isFinal CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppHandle handle = null; try { handle = new SparkLauncher(env) .setSparkHome(sparkHome) .setAppResource(appReSourcePath) .setMainClass(appMainClass) .setMaster("yarn") .setDeployMode("client") .setConf(SparkLauncher.DRIVER_MEMORY, "512M") .setConf(SparkLauncher.EXECUTOR_MEMORY, "512M") .setConf(SparkLauncher.EXECUTOR_CORES, "1") .setConf("spark.default.parallelism", "10") .addAppArgs(hiveTableName) .setVerbose(true).startApplication(new SparkAppHandle.Listener() { @Override public void stateChanged(SparkAppHandle handle) { if (handle.getState().isFinal()) { countDownLatch.countDown(); } LOGGER.info("SparkApp state: {}.", handle.getState().toString()); } @Override public void infoChanged(SparkAppHandle handle) { LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString()); } }); } catch (IOException e) { LOGGER.error("SparkLauncher IOException.", e); } LOGGER.info(" Spark2 job is running... "); try { // 阻塞到spark任务结束 countDownLatch.await(); } catch (InterruptedException e) { LOGGER.error("countDownLatch InterruptedException.", e); } LOGGER.info(" Spark2 job is over. "); if (null != handle) { result = getJobResult(handle.getAppId()); } else { result = " Spark2 Execution Exception."; } return result; } /** * 获取Yarn上的任务最终结果 * * @param appId yarn上的应用ID 例如application_1678883677607_0210 * @return Spark2 job's execution result */ private static String getJobResult(String appId) { LOGGER.info(" spark appId is {}.", appId); if (null == appId || "".equals(appId)) { return " Spark2 Execution Exception, ApplicationId is null."; } String result = " Spark2 Execution result Obtaining... "; String[] as = appId.split("_"); ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(as[1]), Integer.parseInt(as[2])); YarnClient client = YarnClient.createYarnClient(); Configuration conf = new Configuration(); for (String key : YarnConfig.PROPS.stringPropertyNames()) { conf.set(key, YarnConfig.PROPS.getProperty(key)); } client.init(conf); client.start(); LOGGER.info(" YarnClient is started."); ApplicationReport applicationReport = null; ScheduledExecutorService yarnReportService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build()); int retryCount = 0; // 默认重试3次,每次延迟1s执行。 while (retryCount < retryMaxCount) { ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(new YarnReport(client, applicationId), 1, TimeUnit.SECONDS); boolean f = true; while (f) { if (scheduledFuture.isDone()) { try { applicationReport = scheduledFuture.get(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus()); } } catch (InterruptedException | ExecutionException e) { yarnReportService.shutdown(); LOGGER.error("YarnReport Exception.", e); } finally { f = false; } } } if (null != applicationReport) { String finalStatus = applicationReport.getFinalApplicationStatus().toString(); if (FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)) { retryCount++; result = "Spark2 job's finalStatus is UNDEFINED. "; LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount); } else { result = "Spark2 job's finalStatus is " + finalStatus + "."; break; } } else { retryCount++; result = "Spark2 job's execution result is null. "; LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount); } } if (!yarnReportService.isShutdown()) { yarnReportService.shutdown(); } try { client.close(); } catch (IOException e) { LOGGER.error("YarnClient close IOException.", e); } return result; } static class YarnReport implements Callable<ApplicationReport> { private static final Logger LOGGER = LogManager.getLogger(YarnReport.class); final YarnClient client; final ApplicationId applicationId; YarnReport(YarnClient client, ApplicationId applicationId) { this.applicationId = applicationId; this.client = client; } @Override public ApplicationReport call() throws Exception { ApplicationReport report = client.getApplicationReport(applicationId); if (LOGGER.isDebugEnabled()) { LOGGER.debug("report={}", report.toString()); } return report; } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。