当前位置:   article > 正文

Flink SQL Client 源码解析

could not find a result with result identifier 'null

Abstract

本文基于 Flink 1.12-SNAPSHOT,使用sql client命令行提交insert语句进行整个流程的分析。

sql-client.sh embedded --update "INSERT INTO user_log_sink2 SELECT * FROM user_log"

Initialize the environment

主类:org.apache.flink.table.client.SqlClient#main

  1. public static void main(String[] args) {
  2.   if (args.length < 1) {
  3.    CliOptionsParser.printHelpClient();
  4.    return;
  5.   }
  6.   switch (args[0]) {
  7.    case MODE_EMBEDDED:
  8.     // remove mode
  9.     final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
  10.     final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
  11.     if (options.isPrintHelp()) {
  12.      CliOptionsParser.printHelpEmbeddedModeClient();
  13.     } else {
  14.      try {
  15.       final SqlClient client = new SqlClient(true, options);
  16.       client.start();
  17.      } catch (SqlClientException e) {
  18.       // make space in terminal
  19.       System.out.println();
  20.       System.out.println();
  21.       LOG.error("SQL Client must stop.", e);
  22.       throw e;
  23.      } catch (Throwable t) {
  24.       // make space in terminal
  25.       System.out.println();
  26.       System.out.println();
  27.       LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
  28.       throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
  29.      }
  30.     }
  31.     break;
  32.    case MODE_GATEWAY:
  33.     throw new SqlClientException("Gateway mode is not supported yet.");
  34.    default:
  35.     CliOptionsParser.printHelpClient();
  36.   }
  37.  }

首先判断参数个数,根据第一个参数选择执行模式为embedded或gateway,本次会进入embedded。

接着就是解析命令行参数。

目前支持的参数项见org.apache.flink.table.client.cli.CliOptionsParser#parseEmbeddedModeClient org.apache.flink.table.client.cli.CliOptionsParser这个类就是用于解析命令行的。

然后基于传入的参数创建SqlClient对象,调用start方法

  1. private void start() {
  2.   if (isEmbedded) {
  3.    // create local executor with default environment
  4.    final List<URL> jars;
  5.    if (options.getJars() != null) {
  6.     jars = options.getJars();
  7.    } else {
  8.     jars = Collections.emptyList();
  9.    }
  10.    final List<URL> libDirs;
  11.    if (options.getLibraryDirs() != null) {
  12.     libDirs = options.getLibraryDirs();
  13.    } else {
  14.     libDirs = Collections.emptyList();
  15.    }
  16.    final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
  17.    executor.start();
  18.    // create CLI client with session environment
  19.    final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
  20.    appendPythonConfig(sessionEnv, options.getPythonConfiguration());
  21.    final SessionContext context;
  22.    if (options.getSessionId() == null) {
  23.     context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
  24.    } else {
  25.     context = new SessionContext(options.getSessionId(), sessionEnv);
  26.    }
  27.    // Open an new session
  28.    String sessionId = executor.openSession(context);
  29.    try {
  30.     // add shutdown hook
  31.     Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
  32.     // do the actual work
  33.     openCli(sessionId, executor);
  34.    } finally {
  35.     executor.closeSession(sessionId);
  36.    }
  37.   } else {
  38.    throw new SqlClientException("Gateway mode is not supported yet.");
  39.   }
  40.  }

首先是根据默认的sql-client-defaults.yaml配置文件实例化local executor,并调用该实例的start方法,但该方法中并没有做任何处理。

然后读取session environment生成SessionContext,注意这里的session environment其实就是读取的用户通过-e参数指定的配置文件

这里简单介绍下org.apache.flink.table.client.gateway.SessionContext这个类,该类描述一个会话,主要用于在后端打开一个新会话。如果客户端请求打开一个新会话,后端{@link Executor}将为它维护一个{@link org.apache.flink.table.client.gateway.local.ExecutionContext},每次客户端交互都需要附加这个会话ID

接着会将context对象传入executor.openSession方法中获取到sessionId。

然后创建一个shutdown hook,这个hook最主要做的工作就是关闭sql client之前会杀掉已提交的查询作业,防止查询作业一直在集群上跑浪费资源。

  1. private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
  2.   final DynamicResult<T> result = resultStore.getResult(resultId);
  3.   if (result == null) {
  4.    throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
  5.   }
  6.   // stop retrieval and remove the result
  7.   LOG.info("Cancelling job {} and result retrieval.", resultId);
  8.   result.close();
  9.   resultStore.removeResult(resultId);
  10.   // stop Flink job
  11.   try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
  12.    ClusterClient<T> clusterClient = null;
  13.    try {
  14.     // retrieve existing cluster
  15.     clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
  16.     try {
  17.      clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
  18.     } catch (Throwable t) {
  19.      // the job might has finished earlier
  20.     }
  21.    } catch (Exception e) {
  22.     throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
  23.    } finally {
  24.     try {
  25.      if (clusterClient != null) {
  26.       clusterClient.close();
  27.      }
  28.     } catch (Exception e) {
  29.      // ignore
  30.     }
  31.    }
  32.   } catch (SqlExecutionException e) {
  33.    throw e;
  34.   } catch (Exception e) {
  35.    throw new SqlExecutionException("Could not locate a cluster.", e);
  36.   }
  37.  }

最后会将sessionId和LocalExecutor对象传入openCli方法,此后进入了实际的工作方法中。

Opens the CLI client for executing SQL statements.

  1. /**
  2.   * Opens the CLI client for executing SQL statements.
  3.   *
  4.   * @param sessionId session identifier for the current client.
  5.   * @param executor executor
  6.   */
  7.  private void openCli(String sessionId, Executor executor) {
  8.   CliClient cli = null;
  9.   try {
  10.    Path historyFilePath;
  11.    if (options.getHistoryFilePath() != null) {
  12.     historyFilePath = Paths.get(options.getHistoryFilePath());
  13.    } else {
  14.     historyFilePath = Paths.get(System.getProperty("user.home"),
  15.       SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
  16.    }
  17.    cli = new CliClient(sessionId, executor, historyFilePath);
  18.    // interactive CLI mode
  19.    if (options.getUpdateStatement() == null) {
  20.     cli.open();
  21.    }
  22.    // execute single update statement
  23.    else {
  24.     final boolean success = cli.submitUpdate(options.getUpdateStatement());
  25.     if (!success) {
  26.      throw new SqlClientException("Could not submit given SQL update statement to cluster.");
  27.     }
  28.    }
  29.   } finally {
  30.    if (cli != null) {
  31.     cli.close();
  32.    }
  33.   }
  34.  }

首先判断命令行参数中是否指定了historyFilePath,如果没有显示指定,会使用当前用户的HOME路径下的.flink-sql-history作为historyFilePath

这里由于我们直接在命令行通过update参数传入将SQL语句,所以不会进入终端的交互模式,而是直接执行单个的update statement。

cli.submitUpdate(options.getUpdateStatement())

其中options.getUpdateStatement()是拿到了我们在命令中传入的SQL语句,也就是INSERT INTO user_log_sink2 SELECT * FROM user_log

Execute single update statement

执行submitUpdate方法,

  1. /**
  2.   * Submits a SQL update statement and prints status information and/or errors on the terminal.
  3.   *
  4.   * @param statement SQL update statement
  5.   * @return flag to indicate if the submission was successful or not
  6.   */
  7.  public boolean submitUpdate(String statement) {
  8.   terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
  9.   terminal.writer().println(new AttributedString(statement).toString());
  10.   terminal.flush();
  11.   final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
  12.   // only support INSERT INTO/OVERWRITE
  13.   return parsedStatement.map(cmdCall -> {
  14.    switch (cmdCall.command) {
  15.     case INSERT_INTO:
  16.     case INSERT_OVERWRITE:
  17.      return callInsert(cmdCall);
  18.     default:
  19.      printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
  20.      return false;
  21.    }
  22.   }).orElse(false);
  23.  }

首先打印了两行信息

  1. [INFO] Executing the following statement:
  2. INSERT INTO user_log_sink2 SELECT * FROM user_log

Parsing SQL Statement

紧接着解析传入的SQL语句

  1. private Optional<SqlCommandCall> parseCommand(String line) {
  2.   final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(executor.getSqlParser(sessionId), line);
  3.   if (!parsedLine.isPresent()) {
  4.    printError(CliStrings.MESSAGE_UNKNOWN_SQL);
  5.   }
  6.   return parsedLine;
  7.  }

首先从executor.getSqlParser(sessionId)拿到Parser对象

  1. @Override
  2.  public Parser getSqlParser(String sessionId) {
  3.   final ExecutionContext<?> context = getExecutionContext(sessionId);
  4.   final TableEnvironment tableEnv = context.getTableEnvironment();
  5.   final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
  6.   return new Parser() {
  7.    @Override
  8.    public List<Operation> parse(String statement) {
  9.     return context.wrapClassLoader(() -> parser.parse(statement));
  10.    }
  11.    @Override
  12.    public UnresolvedIdentifier parseIdentifier(String identifier) {
  13.     return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
  14.    }
  15.   };
  16.  }

将Parse对象和SQL语句传入SqlCommandParser.parse方法,org.apache.flink.table.client.cli.SqlCommandParser是一个用于确定命令类型及其参数的简单解析器。

  1. public static Optional<SqlCommandCall> parse(Parser sqlParser, String stmt) {
  2.   // normalize
  3.   stmt = stmt.trim();
  4.   // remove ';' at the end
  5.   if (stmt.endsWith(";")) {
  6.    stmt = stmt.substring(0, stmt.length() - 1).trim();
  7.   }
  8.   // parse statement via sql parser first
  9.   Optional<SqlCommandCall> callOpt = parseBySqlParser(sqlParser, stmt);
  10.   if (callOpt.isPresent()) {
  11.    return callOpt;
  12.   } else {
  13.    return parseByRegexMatching(stmt);
  14.   }
  15.  }
  16.  private static Optional<SqlCommandCall> parseBySqlParser(Parser sqlParser, String stmt) {
  17.   List<Operation> operations;
  18.   try {
  19.    operations = sqlParser.parse(stmt);
  20.   } catch (Throwable e) {
  21.    if (e instanceof ValidationException) {
  22.     // can be parsed via sql parser, but is not validated.
  23.     // throw exception directly
  24.     throw new SqlExecutionException("Invalidate SQL statement.", e);
  25.    }
  26.    return Optional.empty();
  27.   }
  28.   if (operations.size() != 1) {
  29.    throw new SqlExecutionException("Only single statement is supported now.");
  30.   }
  31.   final SqlCommand cmd;
  32.   String[] operands = new String[] { stmt };
  33.   Operation operation = operations.get(0);
  34.   if (operation instanceof CatalogSinkModifyOperation) {
  35.    boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
  36.    cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
  37.   } else if (operation instanceof CreateTableOperation) {
  38.    cmd = SqlCommand.CREATE_TABLE;
  39.   } else if (operation instanceof DropTableOperation) {
  40.    cmd = SqlCommand.DROP_TABLE;
  41.   } else if (operation instanceof AlterTableOperation) {
  42.    cmd = SqlCommand.ALTER_TABLE;
  43.   } else if (operation instanceof CreateViewOperation) {
  44.    cmd = SqlCommand.CREATE_VIEW;
  45.    CreateViewOperation op = (CreateViewOperation) operation;
  46.    operands = new String[] { op.getViewIdentifier().asSerializableString(),
  47.      op.getCatalogView().getOriginalQuery() };
  48.   } else if (operation instanceof DropViewOperation) {
  49.    cmd = SqlCommand.DROP_VIEW;
  50.    operands = new String[] { ((DropViewOperation) operation).getViewIdentifier().asSerializableString() };
  51.   } else if (operation instanceof CreateDatabaseOperation) {
  52.    cmd = SqlCommand.CREATE_DATABASE;
  53.   } else if (operation instanceof DropDatabaseOperation) {
  54.    cmd = SqlCommand.DROP_DATABASE;
  55.   } else if (operation instanceof AlterDatabaseOperation) {
  56.    cmd = SqlCommand.ALTER_DATABASE;
  57.   } else if (operation instanceof CreateCatalogOperation) {
  58.    cmd = SqlCommand.CREATE_CATALOG;
  59.   } else if (operation instanceof DropCatalogOperation) {
  60.    cmd = SqlCommand.DROP_CATALOG;
  61.   } else if (operation instanceof UseCatalogOperation) {
  62.    cmd = SqlCommand.USE_CATALOG;
  63.    operands = new String[] { ((UseCatalogOperation) operation).getCatalogName() };
  64.   } else if (operation instanceof UseDatabaseOperation) {
  65.    cmd = SqlCommand.USE;
  66.    operands = new String[] { ((UseDatabaseOperation) operation).getDatabaseName() };
  67.   } else if (operation instanceof ShowCatalogsOperation) {
  68.    cmd = SqlCommand.SHOW_CATALOGS;
  69.    operands = new String[0];
  70.   } else if (operation instanceof ShowDatabasesOperation) {
  71.    cmd = SqlCommand.SHOW_DATABASES;
  72.    operands = new String[0];
  73.   } else if (operation instanceof ShowTablesOperation) {
  74.    cmd = SqlCommand.SHOW_TABLES;
  75.    operands = new String[0];
  76.   } else if (operation instanceof ShowFunctionsOperation) {
  77.    cmd = SqlCommand.SHOW_FUNCTIONS;
  78.    operands = new String[0];
  79.   } else if (operation instanceof CreateCatalogFunctionOperation ||
  80.     operation instanceof CreateTempSystemFunctionOperation) {
  81.    cmd = SqlCommand.CREATE_FUNCTION;
  82.   } else if (operation instanceof DropCatalogFunctionOperation ||
  83.     operation instanceof DropTempSystemFunctionOperation) {
  84.    cmd = SqlCommand.DROP_FUNCTION;
  85.   } else if (operation instanceof AlterCatalogFunctionOperation) {
  86.    cmd = SqlCommand.ALTER_FUNCTION;
  87.   } else if (operation instanceof ExplainOperation) {
  88.    cmd = SqlCommand.EXPLAIN;
  89.   } else if (operation instanceof DescribeTableOperation) {
  90.    cmd = SqlCommand.DESCRIBE;
  91.    operands = new String[] { ((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString() };
  92.   } else if (operation instanceof QueryOperation) {
  93.    cmd = SqlCommand.SELECT;
  94.   } else {
  95.    cmd = null;
  96.   }
  97.   return cmd == null ? Optional.empty() : Optional.of(new SqlCommandCall(cmd, operands));
  98.  }

最终返回到 org.apache.flink.table.client.cli.CliClient#submitUpdate 方法体中的调用处 final Optional<SqlCommandCall> parsedStatement = parseCommand(statement)

然后执行

  1. // only support INSERT INTO/OVERWRITE
  2.   return parsedStatement.map(cmdCall -> {
  3.    switch (cmdCall.command) {
  4.     case INSERT_INTO:
  5.     case INSERT_OVERWRITE:
  6.      return callInsert(cmdCall);
  7.     default:
  8.      printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
  9.      return false;
  10.    }
  11.   }).orElse(false);

Call Insert Method

进入callInsert方法

  1. private boolean callInsert(SqlCommandCall cmdCall) {
  2.   printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
  3.   try {
  4.    final ProgramTargetDescriptor programTarget = executor.executeUpdate(sessionId, cmdCall.operands[0]);
  5.    terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
  6.    terminal.writer().println(programTarget.toString());
  7.    terminal.flush();
  8.   } catch (SqlExecutionException e) {
  9.    printExecutionException(e);
  10.    return false;
  11.   }
  12.   return true;
  13.  }

首先会在终端打印一行信息

[INFO] Submitting SQL update statement to the cluster...

接着执行 executor.executeUpdate(sessionId, cmdCall.operands[0])方法

  1. @Override
  2.  public ProgramTargetDescriptor executeUpdate(String sessionId, String statement) throws SqlExecutionException {
  3.   final ExecutionContext<?> context = getExecutionContext(sessionId);
  4.   return executeUpdateInternal(sessionId, context, statement);
  5.  }

进入executeUpdateInternal方法

  1. private <C> ProgramTargetDescriptor executeUpdateInternal(
  2.    String sessionId,
  3.    ExecutionContext<C> context,
  4.    String statement) {
  5.   applyUpdate(context, statement);
  6.   //Todo: we should refactor following condition after TableEnvironment has support submit job directly.
  7.   if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
  8.    return null;
  9.   }
  10.   // create pipeline
  11.   final String jobName = sessionId + ": " + statement;
  12.   final Pipeline pipeline;
  13.   try {
  14.    pipeline = context.createPipeline(jobName);
  15.   } catch (Throwable t) {
  16.    // catch everything such that the statement does not crash the executor
  17.    throw new SqlExecutionException("Invalid SQL statement.", t);
  18.   }
  19.   // create a copy so that we can change settings without affecting the original config
  20.   Configuration configuration = new Configuration(context.getFlinkConfig());
  21.   // for update queries we don't wait for the job result, so run in detached mode
  22.   configuration.set(DeploymentOptions.ATTACHED, false);
  23.   // create execution
  24.   final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
  25.   // blocking deployment
  26.   try {
  27.    JobClient jobClient = deployer.deploy().get();
  28.    return ProgramTargetDescriptor.of(jobClient.getJobID());
  29.   } catch (Exception e) {
  30.    throw new RuntimeException("Error running SQL job.", e);
  31.   }
  32.  }

Buffer List

首先进入applyUpdate(context, statement)方法

  1. /**
  2.   * Applies the given update statement to the given table environment with query configuration.
  3.   */
  4.  private <C> void applyUpdate(ExecutionContext<C> context, String updateStatement) {
  5.   final TableEnvironment tableEnv = context.getTableEnvironment();
  6.   try {
  7.    // TODO replace sqlUpdate with executeSql
  8.    // This needs we do more refactor, because we can't set the flinkConfig in ExecutionContext
  9.    // into StreamExecutionEnvironment
  10.    context.wrapClassLoader(() -> tableEnv.sqlUpdate(updateStatement));
  11.   } catch (Throwable t) {
  12.    // catch everything such that the statement does not crash the executor
  13.    throw new SqlExecutionException("Invalid SQL update statement.", t);
  14.   }
  15.  }

进入tableEnv.sqlUpdate(updateStatement)方法.

  1. @Override
  2.  public void sqlUpdate(String stmt) {
  3.   List<Operation> operations = parser.parse(stmt);
  4.   if (operations.size() != 1) {
  5.    throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
  6.   }
  7.   Operation operation = operations.get(0);
  8.   if (operation instanceof ModifyOperation) {
  9.    buffer(Collections.singletonList((ModifyOperation) operation));
  10.   } else if (operation instanceof CreateTableOperation ||
  11.     operation instanceof DropTableOperation ||
  12.     operation instanceof AlterTableOperation ||
  13.     operation instanceof CreateViewOperation ||
  14.     operation instanceof DropViewOperation ||
  15.     operation instanceof CreateDatabaseOperation ||
  16.     operation instanceof DropDatabaseOperation ||
  17.     operation instanceof AlterDatabaseOperation ||
  18.     operation instanceof CreateCatalogFunctionOperation ||
  19.     operation instanceof CreateTempSystemFunctionOperation ||
  20.     operation instanceof DropCatalogFunctionOperation ||
  21.     operation instanceof DropTempSystemFunctionOperation ||
  22.     operation instanceof AlterCatalogFunctionOperation ||
  23.     operation instanceof CreateCatalogOperation ||
  24.     operation instanceof DropCatalogOperation ||
  25.     operation instanceof UseCatalogOperation ||
  26.     operation instanceof UseDatabaseOperation) {
  27.    executeOperation(operation);
  28.   } else {
  29.    throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
  30.   }
  31.  }

parse(stmt)方法最终返回了Collections.singletonList(operation)

  1. @Override
  2.  public List<Operation> parse(String statement) {
  3.   CalciteParser parser = calciteParserSupplier.get();
  4.   FlinkPlannerImpl planner = validatorSupplier.get();
  5.   // parse the sql query
  6.   SqlNode parsed = parser.parse(statement);
  7.   Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
  8.    .orElseThrow(() -> new TableException("Unsupported query: " + statement));
  9.   return Collections.singletonList(operation);
  10.  }

buffer(Collections.singletonList((ModifyOperation) operation))将操作加到了一块

  1. private void buffer(List<ModifyOperation> modifyOperations) {
  2.   bufferedModifyOperations.addAll(modifyOperations);
  3.  }

Create pipeline and blocking deployment

返回到org.apache.flink.table.client.gateway.local.LocalExecutor#executeUpdateInternal

  1. //Todo: we should refactor following condition after TableEnvironment has support submit job directly.
  2.   if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
  3.    return null;
  4.   }
  5.   // create pipeline
  6.   final String jobName = sessionId + ": " + statement;
  7.   final Pipeline pipeline;
  8.   try {
  9.    pipeline = context.createPipeline(jobName);
  10.   } catch (Throwable t) {
  11.    // catch everything such that the statement does not crash the executor
  12.    throw new SqlExecutionException("Invalid SQL statement.", t);
  13.   }
  14.   // create a copy so that we can change settings without affecting the original config
  15.   Configuration configuration = new Configuration(context.getFlinkConfig());
  16.   // for update queries we don't wait for the job result, so run in detached mode
  17.   configuration.set(DeploymentOptions.ATTACHED, false);
  18.   // create execution
  19.   final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
  20.   // blocking deployment
  21.   try {
  22.    JobClient jobClient = deployer.deploy().get();
  23.    return ProgramTargetDescriptor.of(jobClient.getJobID());
  24.   } catch (Exception e) {
  25.    throw new RuntimeException("Error running SQL job.", e);
  26.   }
  27. context.createPipeline(jobName)
  28. public Pipeline createPipeline(String name) {
  29.   return wrapClassLoader(() -> {
  30.    if (streamExecEnv != null) {
  31.     StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
  32.     return streamTableEnv.getPipeline(name);
  33.    } else {
  34.     BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;
  35.     return batchTableEnv.getPipeline(name);
  36.    }
  37.   });
  38.  }

分离模式提交

  1. configuration.set(DeploymentOptions.ATTACHED, false);
  2. // create execution
  3. final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);

org.apache.flink.table.client.gateway.local.ProgramDeployer用于在集群上部署一个表程序。

异步提交Flink Job

  1. public CompletableFuture<JobClient> deploy() {
  2.   LOG.info("Submitting job {} for query {}`", pipeline, jobName);
  3.   if (LOG.isDebugEnabled()) {
  4.    LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration);
  5.   }
  6.   if (configuration.get(DeploymentOptions.TARGET) == null) {
  7.    throw new RuntimeException("No execution.target specified in your configuration file.");
  8.   }
  9.   PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
  10.   final PipelineExecutorFactory executorFactory;
  11.   try {
  12.    executorFactory = executorServiceLoader.getExecutorFactory(configuration);
  13.   } catch (Exception e) {
  14.    throw new RuntimeException("Could not retrieve ExecutorFactory.", e);
  15.   }
  16.   final PipelineExecutor executor = executorFactory.getExecutor(configuration);
  17.   CompletableFuture<JobClient> jobClient;
  18.   try {
  19.    jobClient = executor.execute(pipeline, configuration);
  20.   } catch (Exception e) {
  21.    throw new RuntimeException("Could not execute program.", e);
  22.   }
  23.   return jobClient;
  24.  }

地址:https://github.com/y0908105023/wiki/wiki/Flink-Sql-Client-%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90
作者:y0908105023

 
 

end

  1. Flink 从入门到精通 系列文章
  2. 基于 Apache Flink 的实时监控告警系统
  3. 关于数据中台的深度思考与总结(干干货)
  4. 日志收集Agent,阴暗潮湿的地底世界

a50807271e99d9ad3ed04c7f505d01cf.png

871e7ca36ee023bd230608c1c93af8ac.png

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug        本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/742958
推荐阅读
相关标签