当前位置:   article > 正文

Spark(四十四):使用Java调用spark-submit.sh(支持 --deploy-mode client和cluster两种方式)并获取applicationId...

spark applicationid

之前也介绍过使用yarn api来submit spark任务,通过提交接口返回applicationId的用法,具体参考《Spark2.3(四十):如何使用java通过yarn api调度spark app,并根据appId监控任务,关闭任务,获取任务日志》;

但是我更喜欢使用该篇文章中介绍的使用java来调用spark-submit.sh shell提交任务,并从spark-sbumit.sh执行界面获取applicationId的方案。使用hadoop api方式需要配置好环境,以及根据hadoop版本不同,需要引入不通包。

用java调用shell使用说明:

用java调用shell,使用

1 Process p=Runtime.getRuntime().exec(String[] cmd);

Runtime.exec()方法将产生一个本地的进程,并返回一个Process子类的实例,该实例可用于控制进程或取得进程的相关信息。
由于调用Runtime.exec方法所创建的子进程没有自己的终端或控制台,因此该子进程的标准IO(如stdin,stdou,stderr)都通过

1     p.getOutputStream(),
2     p.getInputStream(),
3     p.getErrorStream()

方法重定向给它的父进程了.用户需要用这些stream来向 子进程输入数据或获取子进程的输出。
    例如:Runtime.getRuntime().exec("ls")

  • 另外需要关心的是Runtime.getRuntime().exec()中产生停滞(阻塞,blocking)的问题?

    因为Runtime.getRuntime().exec()要自己去处理stdout和stderr的输出,就是说,执行的结果不知道是现有错误输出(stderr),还是现有标准输出(stdout)。你无法判断到底那个先输出,所以可能无法读取输出,而一直阻塞。
    例如:你先处理标准输出(stdout),但是处理的结果是先有错误输出(stderr),一直在等错误输出(stderr)被取走了,才到标准输出(stdout),这样就产生了阻塞。

  • 解决办法:

    用两个线程将标准输出(stdout)和错误输出(stderr)。

   参考代码:

 1 import java.util.*;  
 2 import java.io.*;  
 3 
 4 class StreamGobbler extends Thread  
 5 {  
 6     InputStream is;  
 7     String type;  
 8       
 9     StreamGobbler(InputStream is, String type)  
10     {  
11         this.is = is;  
12         this.type = type;  
13     }  
14       
15     public void run()  
16     {  
17         try  
18         {  
19             InputStreamReader isr = new InputStreamReader(is);  
20             BufferedReader br = new BufferedReader(isr);  
21             String line=null;  
22             while ( (line = br.readLine()) != null)  
23                 System.out.println(type + ">" + line);      
24             } catch (IOException ioe)  
25               {  
26                 ioe.printStackTrace();    
27               }  
28     }  
29 }  
30 
31 public class ExecRunner  
32 {  
33     public static void main(String args[])  
34     {  
35         if (args.length < 1)  
36         {  
37             System.out.println("USAGE: java GoodWindowsExec <cmd>");  
38             System.exit(1);  
39         }  
40           
41         try  
42         {              
43             String osName = System.getProperty("os.name" );  
44             String[] cmd = new String[3];  
45             if( osName.equals( "Windows NT" ) )  
46             {  
47                 cmd[0] = "cmd.exe" ;  
48                 cmd[1] = "/C" ;  
49                 cmd[2] = args[0];  
50             }  
51             else if( osName.equals( "Windows 95" ) )  
52             {  
53                 cmd[0] = "command.com" ;  
54                 cmd[1] = "/C" ;  
55                 cmd[2] = args[0];  
56             } else {  
57                 StringTokenizer st = new StringTokenizer(command, " ");  
58                 cmd = new String[st.countTokens()];  
59                 int token = 0;  
60                 while (st.hasMoreTokens()) {  
61                     String tokenString = st.nextToken();  
62                     // System.out.println(tokenString);  
63                     cmd[token++] = tokenString;  
64                 }  
65             }  
66               
67             Runtime rt = Runtime.getRuntime();  
68             System.out.println("Execing " + cmd[0] + " " + cmd[1]   
69                                + " " + cmd[2]);  
70             Process proc = rt.exec(cmd);  
71             // any error message?  
72             StreamGobbler errorGobbler = new   
73                 StreamGobbler(proc.getErrorStream(), "ERROR");              
74               
75             // any output?  
76             StreamGobbler outputGobbler = new   
77                 StreamGobbler(proc.getInputStream(), "OUTPUT");  
78                   
79             // kick them off  
80             errorGobbler.start();  
81             outputGobbler.start();  
82                                       
83             // any error???  
84             int exitVal = proc.waitFor();  
85             System.out.println("ExitValue: " + exitVal);          
86         } catch (Throwable t)  
87           {  
88             t.printStackTrace();  
89           }  
90     }  
91 }  
View Code

使用JAVA调用spark-submit.sh实现

spark-submit提交脚本submit_test.sh

#/bin/sh
jarspath=''

for file in `ls /home/dx/djj/sparkjars/*.jar`
do
  jarspath=${file},$jarspath
done
jarspath=${jarspath%?}

echo $jarspath

/home1/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.dx.test.BroadcastTest \
--properties-file ./conf/spark-properties-mrs.conf \
--jars $jarspath \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 1 \
--driver-memory 2G \
--driver-java-options "-XX:+TraceClassPaths" \
./test.jar $1 $2 $3 $4

注意:yarn的提交方式测试时,需要修改--deploy-mode参数:

cluster方式:--deploy-mode cluster \

client  方式:--deploy-mode client \

我们如果需要从spark-submit中获取到applicationId,就需要从spark-submit执行打印结果(也就是Process对象的标准输出、错误输出)过滤出applicationId,如果用过spark-submit.sh提交spark任务的话,你会发现执行时,在打印界面上会输出applicationId。

  • yarn的client方式(--deploy-mode client)时,执行spark-submit.sh提交任务打印applicationid的位置:
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@215a34b4{/static,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e380628{/,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1eaf1e62{/api,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@652ab8d9{/jobs/job/kill,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51e0301d{/stages/stage/kill,null,AVAILABLE,@Spark}
19/04/02 11:38:31 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032
[Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar]
[Opened /usr/java/jdk1.8.0_152/jre/lib/charsets.jar]
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
  • yarn的cluster方式(--deploy-mode cluster)时,执行spark-submit.sh提交任务打印applicationid的位置:
19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)

过滤applicationId函数实现如下:

 

    /**
     * @param line stdin,stderr的一行信息。
     * */
    private String filterApplicationId(String line, boolean isCluster) {
        String applicationId = null;
        line = line.toLowerCase();

        // --deploy-mode client
        // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
        // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
        boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
        // --deploy-mode cluster
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
        boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
        boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1;

        if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
            if (isIndexSparkOwnLog && false == isCluster) {
                int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
                applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
            } else if (isIndexSparkOwn2Log && true == isCluster) {
                int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
                applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
                if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
                    applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
                }
            }
        }

        if (applicationId != null && applicationId.startsWith("application_")) {
            System.out.println("====================================Index of applicationId:" + applicationId);
            System.out.println("====================================Index of applicationId:Complete ...");
        }

        return applicationId;
    }

 

如果过滤成功,就反回applicationId,过滤不到返回null。

对stdin,stderr Stream进行接收的线程定义:

class StreamFilterTask implements Callable<String> {
    private InputStream inputStream;
    private ConcurrentLinkedQueue<String> queue;
    private String streamType = null;
    private boolean isCluster;

    private StreamFilterTask() {
    }

    public StreamFilterTask(InputStream inputStream, ConcurrentLinkedQueue<String> queue, String streamType,
            boolean isCluster) {
        this.inputStream = inputStream;
        this.queue = queue;
        this.streamType = streamType;
        this.isCluster = isCluster;
    }

    @Override
    public String call() throws Exception {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(inputStream));
            String line = null;
            while ((line = br.readLine()) != null) {
                System.out.println(line);

                // 维护队列为最近1000条记录,超过就删除。
                // size() 是要遍历一遍集合的,所以尽量要避免用size而改用isEmpty().
                if (this.streamType.equalsIgnoreCase("error")) {
                    if (queue.size() > 1000) {
                        // 检索并删除此队列的头,如果此队列为空,则返回空值。
                        queue.poll();
                    }
                    // 在该队列的尾部插入指定的元素。由于队列未绑定,因此此方法永远不会返回false。
                    queue.offer(line);
                }

                String applicationId = filterApplicationId(line, isCluster);

                if (applicationId != null && applicationId.startsWith("application_")) {
                    return applicationId;
                }
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        return null;
    }

    /**
     * @param line stdin,stderr的一行信息。
     * */
    private String filterApplicationId(String line, boolean isCluster) {
        String applicationId = null;
        line = line.toLowerCase();

        // --deploy-mode client
        // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
        // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
        boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
        // --deploy-mode cluster
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
        boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
        boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1;

        if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
            if (isIndexSparkOwnLog && false == isCluster) {
                int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
                applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
            } else if (isIndexSparkOwn2Log && true == isCluster) {
                int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
                applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
                if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
                    applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
                }
            }
        }

        if (applicationId != null && applicationId.startsWith("application_")) {
            System.out.println("====================================Index of applicationId:" + applicationId);
            System.out.println("====================================Index of applicationId:Complete ...");
        }

        return applicationId;
    }
}

SubmitSpark类定义:

该类使用Porcess来处理脚本,通过获取Process对象的stdin,stderr过滤applicationId,通过Process.waitFro(tiimeout,TimeUnit)来控制最大允许等待时间。

class SubmitSpark {
    public String submit(String filePath, long timeoutMinutes, String charsetName) {
        String applicatioId = null;

        String command = filePath;
        boolean isCluster = false;
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), charsetName));
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                if (line.replace("  ", " ").toLowerCase().indexOf("--deploy-mode cluster") != -1) {
                    isCluster = true;
                    break;
                }
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        StringTokenizer st = new StringTokenizer(command, " ");
        String[] cmd = new String[st.countTokens()];
        int token = 0;
        while (st.hasMoreTokens()) {
            String tokenString = st.nextToken();
            cmd[token++] = tokenString;
        }

        Runtime rt = Runtime.getRuntime();
        System.out.println("Execing " + command);
        Process proc = null;

        try {
            proc = rt.exec(cmd);

            ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
            ExecutorService executor = Executors.newFixedThreadPool(2);

            // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get();
            // 如果在这里使用get(),会造成等待同步。
            // any output?
            Future<String> futureInput = executor.submit(new StreamFilterTask(proc.getInputStream(), queue, "input",
                    isCluster));
            // any error message?
            Future<String> futureError = executor.submit(new StreamFilterTask(proc.getErrorStream(), queue, "error",
                    isCluster));

            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",开始proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);");
            // any error???
            boolean exitVal = proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
            System.out.println("exitVal:" + exitVal);
            proc.destroyForcibly();
            System.out.println("proc.isAlive():" + proc.isAlive());
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);");

            // applicationId不管--deploy-mode是cluster,还是client方式,applicationId信息都从getErrorStream中读取出来,因此只要能提交成功,就返回,除非资源不足,一直到超时失败为止。
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureError.get();:");
            if (futureError.get() != null) {
                applicatioId = futureError.get();
            }    
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureError.get();:"
                    + applicatioId);
            
            // 如果是cluster方式,会阻塞,因此不能打开该段代码
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureInput.get();:");
            // if (futureInput.get() != null) {
            //      applicatioId = futureInput.get();
            // }

            // kill process进程
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureInput.get();:"
            //             + applicatioId);
            // 
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取process Id");
            // long pid = -1;
            // try {
            //         Class<?> clazz = Class.forName("java.lang.UNIXProcess");
            //         Field field = clazz.getDeclaredField("pid");
            //         field.setAccessible(true);
            //         pid = (Integer) field.get(proc);
            // } catch (Throwable e) {
            //         e.printStackTrace();
            // }
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取process Id:"
            //             + pid);
            // 
            // System.out.println("proc.isAlive():" + proc.isAlive());
            // String[] killCmd = { "sh", "-c", "kill -9 " + pid };
            // Runtime.getRuntime().exec(killCmd).waitFor();
            
            System.out.println("Complete:" + applicatioId);
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",开始获取if (proc != null && proc.isAlive())");
            if (proc != null && proc.isAlive()) {
                proc.destroyForcibly();
                System.out.println("proc.isAlive():" + proc.isAlive());
            }
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",结束获取if (proc != null && proc.isAlive())");
        }

        return applicatioId;
    }
}

注意:

1)过期时间不能太短,太短会造成结果:任务还未提交到yarn就结束,导致任务提交还未提交就被结束问题,无法返回applicationId。

2)上边的SparkSubmit函数即使返回了applicatioId后,java -cp test.jar com.dx.test.Submit该java执行spark-submit.sh shell的程序都不会退出,实际上process的stdin,stderr还在打开中;

3)即使打开上边的kill process的代码,stdin,stderr被关闭依然无法让java -cp test.jar com.dx.test.Submit程序退出。打开kill process代码吧process对象给关闭后,(只要已经将spark任务提交到了yarn上)程序会catch到stdin,stderr的错误(在人为关闭java执行shell提交spark程序之前,yarn client方式式yarn上的spark程序也不会退出,yarn cluster一旦提交到yarn关闭java程序也无法关闭yarn上的spark程序)但yarn上的spark程序不会被关闭。因此,这个process代码可有可无。

测试:

package com.dx.test

public class Submit {
    public static void main(String[] args) {
        String filePath = "./submit_test.sh";
        String charsetName = "utf-8";
        long timeoutMinutes = 5;

        SubmitSpark submitSpark = new SubmitSpark();
        String applicationId = submitSpark.submit(filePath, timeoutMinutes, charsetName);

        System.out.println("return the applicationId:" + applicationId);
    }
}

超时时间设置为2minutes

  • yarn --deploy-mode client时,执行会出现以下问题:此时超时时间设置为2 minutes
19/04/02 10:54:49 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032
[Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar]
exitVal:false
proc.isAlive():false
2019-04-02 10:56:38,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 10:56:38,开始获取applicationId:
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-04-02 10:56:52,结束获取applicationId:null
2019-04-02 10:56:52,开始获取process Id
2019-04-02 10:56:52,结束获取process Id:13994
proc.isAlive():false
Complete:null
2019-04-02 10:56:52,开始获取if (proc != null && proc.isAlive())
2019-04-02 10:56:52,结束获取if (proc != null && proc.isAlive())
return the applicationId:null

备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。

1)获取不到applicationId,但是此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,此时yarn上的spark程序会被关闭。

  • yarn --deploy-mode cluster时,执行会出现以下问题:此时超时时间设置为2 minutes
19/04/02 10:57:00 INFO yarn.Client: Uploading resource file:/home1/boco/duanjiajun/sparkjars/bcprov-jdk15on-1.52.jar -> hdfs://vm10.60.0.11.com.cn:8020/user/boco/.sparkStaging/application_1548381669007_0816/bcprov-jdk15on-1.52.jar
exitVal:false
proc.isAlive():false
2019-04-02 10:57:01,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 10:57:01,开始获取applicationId:
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-04-02 10:57:01,结束获取applicationId:null
2019-04-02 10:57:01,开始获取process Id
2019-04-02 10:57:01,结束获取process Id:14359
proc.isAlive():false
Complete:null
2019-04-02 10:57:01,开始获取if (proc != null && proc.isAlive())
2019-04-02 10:57:01,结束获取if (proc != null && proc.isAlive())
return the applicationId:null

备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。

1)获取不到applicationId,且此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,如果已经将spark任务提交到yarn上去了,此时yarn上的spark程序不会被关闭。

设置超时时间为5 minutes

  • --deploy-mode cluster方式,设置超时时间为5 minutes
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
====================================Index of applicationId:application_1548381669007_0828
====================================Index of applicationId:Complete ...
exitVal:false
proc.isAlive():true
2019-04-02 11:42:59,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 11:42:59,开始获取applicatioId = futureError.get();:
2019-04-02 11:42:59,结束获取applicatioId = futureError.get();:application_1548381669007_0828
Complete:application_1548381669007_0828
2019-04-02 11:42:59,开始获取if (proc != null && proc.isAlive())
2019-04-02 11:42:59,结束获取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0828
^Cbash-4.1$ 

此时手动结束进程,不会终止yarn上的spark程序

  • --deploy-mode client方式,设置超时时间为5 minutes
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
====================================Index of applicationId:application_1548381669007_0829
====================================Index of applicationId:Complete ...
the value is :86
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- int_id: long (nullable = true)

root
 |-- int_id: string (nullable = false)
 |-- job_result: string (nullable = true)

Query started: a82ad759-8b14-4d58-93a3-8bed7617dd9c
-------------------------------------------
Batch: 0
-------------------------------------------
listener...application_1548381669007_0829
+------+----------+
|int_id|job_result|
+------+----------+
|     0|      null|
|     1|      1,86|
|     2|      2,86|
|     3|      3,86|
|     4|      4,86|
|     5|      5,86|
|     6|      6,86|
|     7|      7,86|
|     8|      8,86|
|     9|      9,86|
|    10|     10,86|
|    11|      null|
|    12|      null|
|    13|      null|
|    14|      null|
|     0|      null|
|     1|      1,86|
|     2|      2,86|
|     3|      3,86|
|     4|      4,86|
+------+----------+
only showing top 20 rows
。。。
listener...application_1548381669007_0829
Query made progress: {
  "id" : "a82ad759-8b14-4d58-93a3-8bed7617dd9c",
  "runId" : "a53447f1-056e-4d84-b27e-7007829bc1e2",
  "name" : null,
  "timestamp" : "2019-04-02T03:43:10.001Z",
  "batchId" : 9,
  "numInputRows" : 1000,
  "inputRowsPerSecond" : 100.0,
  "processedRowsPerSecond" : 1584.7860538827258,
  "durationMs" : {
    "addBatch" : 417,
    "getBatch" : 21,
    "getOffset" : 0,
    "queryPlanning" : 38,
    "triggerExecution" : 631,
    "walCommit" : 154
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=64]",
    "startOffset" : 107,
    "endOffset" : 117,
    "numInputRows" : 1000,
    "inputRowsPerSecond" : 100.0,
    "processedRowsPerSecond" : 1584.7860538827258
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@58975f19"
  }
}
the value is :83
Trigger    accumulator value: 10
Load count accumulator value: 11
exitVal:false
proc.isAlive():false
2019-04-02 11:43:19,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 11:43:19,开始获取applicatioId = futureError.get();:
2019-04-02 11:43:19,结束获取applicatioId = futureError.get();:application_1548381669007_0829
Complete:application_1548381669007_0829
2019-04-02 11:43:19,开始获取if (proc != null && proc.isAlive())
2019-04-02 11:43:19,结束获取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0829
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:162)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
#这里这个错误被程序catch的错误,打印出来的错误,不会导致程序中心。
^Cbash-4.1$ 

此时手动结束进程,将会终止yarn上的spark程序

 

参考《JAVA调用Shell脚本--及阻塞的解决办法

 

 

 

 

转载于:https://www.cnblogs.com/yy3b2007com/p/10642398.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/625513
推荐阅读
相关标签
  

闽ICP备14008679号