当前位置:   article > 正文

JAVA代码实现Spark任务的提交_java 提交spark任务

java 提交spark任务

Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。

以下的例子以Spark On Yarn的模式来设计的。

  1. 要求运行这个类的机器上拥有Spark客户端
  2. 需要被提交的Spark任务的jar(也可以预先提交到HDFS)上
  3. 要求运行机器拥有hadoop机器的配置文件,yarn的配置文件
  4. 要求程序指导javahome的路径
package com.donny.bigdata.surveillance.components.spark;

import com.donny.bigdata.surveillance.conf.Spark2Config;
import com.donny.bigdata.surveillance.conf.YarnConfig;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.*;

/**
 * @author 1792998761@qq.com
 * @date 2023/3/30 18:00
 */
public class SparkOnYarnHelper {
    private static final Logger LOGGER = LogManager.getLogger(SparkOnYarnHelper.class);

    /**
     * 客户端上的hadoop配置文件根目录
     */
    private static String hadoopConfDir;
    /**
     * 客户端上的yarn配置文件根目录
     */
    private static String yarnConfDir;
    /**
     * 客户端上的javaHome
     */
    private static String javaHome;
    /**
     * 客户端上的Spark配置文件根目录
     */
    private static String sparkHome;
    /**
     * 提交到yarn上的Spark任务的jar的Path
     */
    private static String appReSourcePath = System.getProperty("user.dir");
    /**
     * Spark任务的jar的入口函数
     */
    private static String appMainClass;
    /**
     * 运行spark任务的用户
     */
    public static String hadoopUserName;
    /**
     * 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数
     */
    private static Integer retryMaxCount;

    /**
     * 初始化环境
     */
    public static void init() {
        hadoopConfDir = Spark2Config.getString("spark2.hadoop_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf");
        yarnConfDir = Spark2Config.getString("spark2.yarn_conf_dir", "/usr/hdp/2.6.5.0-292/hadoop/conf");
        javaHome = Spark2Config.getString("spark2.java_home", "/data/java");
        sparkHome = Spark2Config.getString("spark2.spark_home", "/data/hdp/current/spark2-client");
        appReSourcePath = appReSourcePath + Spark2Config.getString("spark2.app_resource_path", "/plugins/spark2-monitor-1.0.jar");
        appMainClass = Spark2Config.getString("spark2.app_main_class", "com.donny.base.monitor.Spark2Monitor");
        hadoopUserName = Spark2Config.getString("hadoop_user_name", "spark");
        retryMaxCount = Spark2Config.getInt("spark2.yarn_report_retry", 3);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("javaHome=[{}].", javaHome);
            LOGGER.debug("sparkHome=[{}].", sparkHome);
            LOGGER.debug("appReSourcePath=[{}].", appReSourcePath);
            LOGGER.debug("appMainClass=[{}].", appMainClass);
            LOGGER.debug("hadoopUserName=[{}].", hadoopUserName);
            LOGGER.debug("retryMaxCount[{}].", retryMaxCount);
        }
    }

    /**
     * 将任务提交到yarn
     *
     * @param hiveTableName Spark任务中使用的hive表的表名
     * @return 执行结果字符串
     */
    public static String submitJobToYarn(String hiveTableName) {
        String result;
        LOGGER.info(" Spark2 job is starting... ");
        HashMap<String, String> env = new HashMap<>(4);
        env.put("HADOOP_CONF_DIR", hadoopConfDir);
        env.put("JAVA_HOME", javaHome);
        env.put("YARN_CONF_DIR", yarnConfDir);
        env.put("HADOOP_USER_NAME", hadoopUserName);
        // 控制SparkAppHandle监听,直达状态isFinal
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SparkAppHandle handle = null;
        try {
            handle = new SparkLauncher(env)
                    .setSparkHome(sparkHome)
                    .setAppResource(appReSourcePath)
                    .setMainClass(appMainClass)
                    .setMaster("yarn")
                    .setDeployMode("client")
                    .setConf(SparkLauncher.DRIVER_MEMORY, "512M")
                    .setConf(SparkLauncher.EXECUTOR_MEMORY, "512M")
                    .setConf(SparkLauncher.EXECUTOR_CORES, "1")
                    .setConf("spark.default.parallelism", "10")
                    .addAppArgs(hiveTableName)
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        @Override
                        public void stateChanged(SparkAppHandle handle) {
                            if (handle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            LOGGER.info("SparkApp state: {}.", handle.getState().toString());
                        }

                        @Override
                        public void infoChanged(SparkAppHandle handle) {
                            LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString());
                        }
                    });
        } catch (IOException e) {
            LOGGER.error("SparkLauncher IOException.", e);
        }
        LOGGER.info(" Spark2 job is running... ");
        try {
            // 阻塞到spark任务结束
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOGGER.error("countDownLatch InterruptedException.", e);
        }
        LOGGER.info(" Spark2 job is over. ");

        if (null != handle) {
            result = getJobResult(handle.getAppId());
        } else {
            result = " Spark2 Execution Exception.";
        }
        return result;
    }

    /**
     * 获取Yarn上的任务最终结果
     *
     * @param appId yarn上的应用ID 例如application_1678883677607_0210
     * @return Spark2 job's execution result
     */
    private static String getJobResult(String appId) {
        LOGGER.info(" spark appId is {}.", appId);
        if (null == appId || "".equals(appId)) {
            return " Spark2 Execution Exception, ApplicationId is null.";
        }

        String result = " Spark2 Execution result Obtaining... ";
        String[] as = appId.split("_");
        ApplicationId applicationId = ApplicationId.newInstance(Long.parseLong(as[1]), Integer.parseInt(as[2]));
        YarnClient client = YarnClient.createYarnClient();
        Configuration conf = new Configuration();
        for (String key : YarnConfig.PROPS.stringPropertyNames()) {
            conf.set(key, YarnConfig.PROPS.getProperty(key));
        }
        client.init(conf);
        client.start();
        LOGGER.info(" YarnClient is started.");
        ApplicationReport applicationReport = null;

        ScheduledExecutorService yarnReportService = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build());
        int retryCount = 0;

        // 默认重试3次,每次延迟1s执行。
        while (retryCount < retryMaxCount) {
            ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(new YarnReport(client, applicationId), 1, TimeUnit.SECONDS);
            boolean f = true;
            while (f) {
                if (scheduledFuture.isDone()) {
                    try {
                        applicationReport = scheduledFuture.get();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus());
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        yarnReportService.shutdown();
                        LOGGER.error("YarnReport Exception.", e);
                    } finally {
                        f = false;
                    }
                }
            }

            if (null != applicationReport) {
                String finalStatus = applicationReport.getFinalApplicationStatus().toString();
                if (FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)) {
                    retryCount++;
                    result = "Spark2 job's finalStatus is UNDEFINED. ";
                    LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount);
                } else {
                    result = "Spark2 job's finalStatus is " + finalStatus + ".";
                    break;
                }
            } else {
                retryCount++;
                result = "Spark2 job's execution result is null. ";
                LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount);
            }
        }

        if (!yarnReportService.isShutdown()) {
            yarnReportService.shutdown();
        }

        try {
            client.close();
        } catch (IOException e) {
            LOGGER.error("YarnClient close IOException.", e);
        }
        return result;
    }

    static class YarnReport implements Callable<ApplicationReport> {
        private static final Logger LOGGER = LogManager.getLogger(YarnReport.class);

        final YarnClient client;
        final ApplicationId applicationId;

        YarnReport(YarnClient client, ApplicationId applicationId) {
            this.applicationId = applicationId;
            this.client = client;
        }

        @Override
        public ApplicationReport call() throws Exception {
            ApplicationReport report = client.getApplicationReport(applicationId);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("report={}", report.toString());
            }
            return report;
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/466676
推荐阅读
相关标签
  

闽ICP备14008679号