赞
踩
本文基于 Flink 1.12-SNAPSHOT,使用sql client命令行提交insert语句进行整个流程的分析。
sql-client.sh embedded --update "INSERT INTO user_log_sink2 SELECT * FROM user_log"
主类:org.apache.flink.table.client.SqlClient#main
- public static void main(String[] args) {
- if (args.length < 1) {
- CliOptionsParser.printHelpClient();
- return;
- }
-
- switch (args[0]) {
-
- case MODE_EMBEDDED:
- // remove mode
- final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
- final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
- if (options.isPrintHelp()) {
- CliOptionsParser.printHelpEmbeddedModeClient();
- } else {
- try {
- final SqlClient client = new SqlClient(true, options);
- client.start();
- } catch (SqlClientException e) {
- // make space in terminal
- System.out.println();
- System.out.println();
- LOG.error("SQL Client must stop.", e);
- throw e;
- } catch (Throwable t) {
- // make space in terminal
- System.out.println();
- System.out.println();
- LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
- throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
- }
- }
- break;
-
- case MODE_GATEWAY:
- throw new SqlClientException("Gateway mode is not supported yet.");
-
- default:
- CliOptionsParser.printHelpClient();
- }
- }

首先判断参数个数,根据第一个参数选择执行模式为embedded或gateway,本次会进入embedded。
接着就是解析命令行参数。
目前支持的参数项见org.apache.flink.table.client.cli.CliOptionsParser#parseEmbeddedModeClient org.apache.flink.table.client.cli.CliOptionsParser这个类就是用于解析命令行的。
然后基于传入的参数创建SqlClient对象,调用start方法
- private void start() {
- if (isEmbedded) {
- // create local executor with default environment
- final List<URL> jars;
- if (options.getJars() != null) {
- jars = options.getJars();
- } else {
- jars = Collections.emptyList();
- }
- final List<URL> libDirs;
- if (options.getLibraryDirs() != null) {
- libDirs = options.getLibraryDirs();
- } else {
- libDirs = Collections.emptyList();
- }
- final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
- executor.start();
-
- // create CLI client with session environment
- final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
- appendPythonConfig(sessionEnv, options.getPythonConfiguration());
- final SessionContext context;
- if (options.getSessionId() == null) {
- context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
- } else {
- context = new SessionContext(options.getSessionId(), sessionEnv);
- }
-
- // Open an new session
- String sessionId = executor.openSession(context);
- try {
- // add shutdown hook
- Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
-
- // do the actual work
- openCli(sessionId, executor);
- } finally {
- executor.closeSession(sessionId);
- }
- } else {
- throw new SqlClientException("Gateway mode is not supported yet.");
- }
- }

首先是根据默认的sql-client-defaults.yaml配置文件实例化local executor,并调用该实例的start方法,但该方法中并没有做任何处理。
然后读取session environment生成SessionContext,注意这里的session environment其实就是读取的用户通过-e参数指定的配置文件
这里简单介绍下org.apache.flink.table.client.gateway.SessionContext这个类,该类描述一个会话,主要用于在后端打开一个新会话。如果客户端请求打开一个新会话,后端{@link Executor}将为它维护一个{@link org.apache.flink.table.client.gateway.local.ExecutionContext},每次客户端交互都需要附加这个会话ID
接着会将context对象传入executor.openSession方法中获取到sessionId。
然后创建一个shutdown hook,这个hook最主要做的工作就是关闭sql client之前会杀掉已提交的查询作业,防止查询作业一直在集群上跑浪费资源。
- private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
- final DynamicResult<T> result = resultStore.getResult(resultId);
- if (result == null) {
- throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
- }
-
- // stop retrieval and remove the result
- LOG.info("Cancelling job {} and result retrieval.", resultId);
- result.close();
- resultStore.removeResult(resultId);
-
- // stop Flink job
- try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
- ClusterClient<T> clusterClient = null;
- try {
- // retrieve existing cluster
- clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
- try {
- clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
- } catch (Throwable t) {
- // the job might has finished earlier
- }
- } catch (Exception e) {
- throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
- } finally {
- try {
- if (clusterClient != null) {
- clusterClient.close();
- }
- } catch (Exception e) {
- // ignore
- }
- }
- } catch (SqlExecutionException e) {
- throw e;
- } catch (Exception e) {
- throw new SqlExecutionException("Could not locate a cluster.", e);
- }
- }

最后会将sessionId和LocalExecutor对象传入openCli方法,此后进入了实际的工作方法中。
- /**
- * Opens the CLI client for executing SQL statements.
- *
- * @param sessionId session identifier for the current client.
- * @param executor executor
- */
- private void openCli(String sessionId, Executor executor) {
- CliClient cli = null;
- try {
- Path historyFilePath;
- if (options.getHistoryFilePath() != null) {
- historyFilePath = Paths.get(options.getHistoryFilePath());
- } else {
- historyFilePath = Paths.get(System.getProperty("user.home"),
- SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
- }
- cli = new CliClient(sessionId, executor, historyFilePath);
- // interactive CLI mode
- if (options.getUpdateStatement() == null) {
- cli.open();
- }
- // execute single update statement
- else {
- final boolean success = cli.submitUpdate(options.getUpdateStatement());
- if (!success) {
- throw new SqlClientException("Could not submit given SQL update statement to cluster.");
- }
- }
- } finally {
- if (cli != null) {
- cli.close();
- }
- }
- }

首先判断命令行参数中是否指定了historyFilePath,如果没有显示指定,会使用当前用户的HOME路径下的.flink-sql-history作为historyFilePath
这里由于我们直接在命令行通过update参数传入将SQL语句,所以不会进入终端的交互模式,而是直接执行单个的update statement。
cli.submitUpdate(options.getUpdateStatement())
其中options.getUpdateStatement()是拿到了我们在命令中传入的SQL语句,也就是INSERT INTO user_log_sink2 SELECT * FROM user_log
执行submitUpdate方法,
- /**
- * Submits a SQL update statement and prints status information and/or errors on the terminal.
- *
- * @param statement SQL update statement
- * @return flag to indicate if the submission was successful or not
- */
- public boolean submitUpdate(String statement) {
- terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
- terminal.writer().println(new AttributedString(statement).toString());
- terminal.flush();
-
- final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
- // only support INSERT INTO/OVERWRITE
- return parsedStatement.map(cmdCall -> {
- switch (cmdCall.command) {
- case INSERT_INTO:
- case INSERT_OVERWRITE:
- return callInsert(cmdCall);
- default:
- printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
- return false;
- }
- }).orElse(false);
- }

首先打印了两行信息
- [INFO] Executing the following statement:
- INSERT INTO user_log_sink2 SELECT * FROM user_log
紧接着解析传入的SQL语句
- private Optional<SqlCommandCall> parseCommand(String line) {
- final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(executor.getSqlParser(sessionId), line);
- if (!parsedLine.isPresent()) {
- printError(CliStrings.MESSAGE_UNKNOWN_SQL);
- }
- return parsedLine;
- }
首先从executor.getSqlParser(sessionId)拿到Parser对象
- @Override
- public Parser getSqlParser(String sessionId) {
- final ExecutionContext<?> context = getExecutionContext(sessionId);
- final TableEnvironment tableEnv = context.getTableEnvironment();
- final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
- return new Parser() {
- @Override
- public List<Operation> parse(String statement) {
- return context.wrapClassLoader(() -> parser.parse(statement));
- }
-
- @Override
- public UnresolvedIdentifier parseIdentifier(String identifier) {
- return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
- }
- };
- }

将Parse对象和SQL语句传入SqlCommandParser.parse方法,org.apache.flink.table.client.cli.SqlCommandParser是一个用于确定命令类型及其参数的简单解析器。
- public static Optional<SqlCommandCall> parse(Parser sqlParser, String stmt) {
- // normalize
- stmt = stmt.trim();
- // remove ';' at the end
- if (stmt.endsWith(";")) {
- stmt = stmt.substring(0, stmt.length() - 1).trim();
- }
-
- // parse statement via sql parser first
- Optional<SqlCommandCall> callOpt = parseBySqlParser(sqlParser, stmt);
- if (callOpt.isPresent()) {
- return callOpt;
- } else {
- return parseByRegexMatching(stmt);
- }
- }
-
- private static Optional<SqlCommandCall> parseBySqlParser(Parser sqlParser, String stmt) {
- List<Operation> operations;
- try {
- operations = sqlParser.parse(stmt);
- } catch (Throwable e) {
- if (e instanceof ValidationException) {
- // can be parsed via sql parser, but is not validated.
- // throw exception directly
- throw new SqlExecutionException("Invalidate SQL statement.", e);
- }
- return Optional.empty();
- }
- if (operations.size() != 1) {
- throw new SqlExecutionException("Only single statement is supported now.");
- }
-
- final SqlCommand cmd;
- String[] operands = new String[] { stmt };
- Operation operation = operations.get(0);
- if (operation instanceof CatalogSinkModifyOperation) {
- boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
- cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
- } else if (operation instanceof CreateTableOperation) {
- cmd = SqlCommand.CREATE_TABLE;
- } else if (operation instanceof DropTableOperation) {
- cmd = SqlCommand.DROP_TABLE;
- } else if (operation instanceof AlterTableOperation) {
- cmd = SqlCommand.ALTER_TABLE;
- } else if (operation instanceof CreateViewOperation) {
- cmd = SqlCommand.CREATE_VIEW;
- CreateViewOperation op = (CreateViewOperation) operation;
- operands = new String[] { op.getViewIdentifier().asSerializableString(),
- op.getCatalogView().getOriginalQuery() };
- } else if (operation instanceof DropViewOperation) {
- cmd = SqlCommand.DROP_VIEW;
- operands = new String[] { ((DropViewOperation) operation).getViewIdentifier().asSerializableString() };
- } else if (operation instanceof CreateDatabaseOperation) {
- cmd = SqlCommand.CREATE_DATABASE;
- } else if (operation instanceof DropDatabaseOperation) {
- cmd = SqlCommand.DROP_DATABASE;
- } else if (operation instanceof AlterDatabaseOperation) {
- cmd = SqlCommand.ALTER_DATABASE;
- } else if (operation instanceof CreateCatalogOperation) {
- cmd = SqlCommand.CREATE_CATALOG;
- } else if (operation instanceof DropCatalogOperation) {
- cmd = SqlCommand.DROP_CATALOG;
- } else if (operation instanceof UseCatalogOperation) {
- cmd = SqlCommand.USE_CATALOG;
- operands = new String[] { ((UseCatalogOperation) operation).getCatalogName() };
- } else if (operation instanceof UseDatabaseOperation) {
- cmd = SqlCommand.USE;
- operands = new String[] { ((UseDatabaseOperation) operation).getDatabaseName() };
- } else if (operation instanceof ShowCatalogsOperation) {
- cmd = SqlCommand.SHOW_CATALOGS;
- operands = new String[0];
- } else if (operation instanceof ShowDatabasesOperation) {
- cmd = SqlCommand.SHOW_DATABASES;
- operands = new String[0];
- } else if (operation instanceof ShowTablesOperation) {
- cmd = SqlCommand.SHOW_TABLES;
- operands = new String[0];
- } else if (operation instanceof ShowFunctionsOperation) {
- cmd = SqlCommand.SHOW_FUNCTIONS;
- operands = new String[0];
- } else if (operation instanceof CreateCatalogFunctionOperation ||
- operation instanceof CreateTempSystemFunctionOperation) {
- cmd = SqlCommand.CREATE_FUNCTION;
- } else if (operation instanceof DropCatalogFunctionOperation ||
- operation instanceof DropTempSystemFunctionOperation) {
- cmd = SqlCommand.DROP_FUNCTION;
- } else if (operation instanceof AlterCatalogFunctionOperation) {
- cmd = SqlCommand.ALTER_FUNCTION;
- } else if (operation instanceof ExplainOperation) {
- cmd = SqlCommand.EXPLAIN;
- } else if (operation instanceof DescribeTableOperation) {
- cmd = SqlCommand.DESCRIBE;
- operands = new String[] { ((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString() };
- } else if (operation instanceof QueryOperation) {
- cmd = SqlCommand.SELECT;
- } else {
- cmd = null;
- }
-
- return cmd == null ? Optional.empty() : Optional.of(new SqlCommandCall(cmd, operands));
- }

最终返回到 org.apache.flink.table.client.cli.CliClient#submitUpdate
方法体中的调用处 final Optional<SqlCommandCall> parsedStatement = parseCommand(statement)
然后执行
- // only support INSERT INTO/OVERWRITE
- return parsedStatement.map(cmdCall -> {
- switch (cmdCall.command) {
- case INSERT_INTO:
- case INSERT_OVERWRITE:
- return callInsert(cmdCall);
- default:
- printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
- return false;
- }
- }).orElse(false);
进入callInsert方法
- private boolean callInsert(SqlCommandCall cmdCall) {
- printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
-
- try {
- final ProgramTargetDescriptor programTarget = executor.executeUpdate(sessionId, cmdCall.operands[0]);
- terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
- terminal.writer().println(programTarget.toString());
- terminal.flush();
- } catch (SqlExecutionException e) {
- printExecutionException(e);
- return false;
- }
- return true;
- }
首先会在终端打印一行信息
[INFO] Submitting SQL update statement to the cluster...
接着执行 executor.executeUpdate(sessionId, cmdCall.operands[0])
方法
- @Override
- public ProgramTargetDescriptor executeUpdate(String sessionId, String statement) throws SqlExecutionException {
- final ExecutionContext<?> context = getExecutionContext(sessionId);
- return executeUpdateInternal(sessionId, context, statement);
- }
进入executeUpdateInternal方法
- private <C> ProgramTargetDescriptor executeUpdateInternal(
- String sessionId,
- ExecutionContext<C> context,
- String statement) {
-
- applyUpdate(context, statement);
-
- //Todo: we should refactor following condition after TableEnvironment has support submit job directly.
- if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
- return null;
- }
-
- // create pipeline
- final String jobName = sessionId + ": " + statement;
- final Pipeline pipeline;
- try {
- pipeline = context.createPipeline(jobName);
- } catch (Throwable t) {
- // catch everything such that the statement does not crash the executor
- throw new SqlExecutionException("Invalid SQL statement.", t);
- }
-
- // create a copy so that we can change settings without affecting the original config
- Configuration configuration = new Configuration(context.getFlinkConfig());
- // for update queries we don't wait for the job result, so run in detached mode
- configuration.set(DeploymentOptions.ATTACHED, false);
-
- // create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
-
- // blocking deployment
- try {
- JobClient jobClient = deployer.deploy().get();
- return ProgramTargetDescriptor.of(jobClient.getJobID());
- } catch (Exception e) {
- throw new RuntimeException("Error running SQL job.", e);
- }
- }

首先进入applyUpdate(context, statement)方法
- /**
- * Applies the given update statement to the given table environment with query configuration.
- */
- private <C> void applyUpdate(ExecutionContext<C> context, String updateStatement) {
- final TableEnvironment tableEnv = context.getTableEnvironment();
- try {
- // TODO replace sqlUpdate with executeSql
- // This needs we do more refactor, because we can't set the flinkConfig in ExecutionContext
- // into StreamExecutionEnvironment
- context.wrapClassLoader(() -> tableEnv.sqlUpdate(updateStatement));
- } catch (Throwable t) {
- // catch everything such that the statement does not crash the executor
- throw new SqlExecutionException("Invalid SQL update statement.", t);
- }
- }
进入tableEnv.sqlUpdate(updateStatement)方法.
- @Override
- public void sqlUpdate(String stmt) {
- List<Operation> operations = parser.parse(stmt);
-
- if (operations.size() != 1) {
- throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
- }
-
- Operation operation = operations.get(0);
- if (operation instanceof ModifyOperation) {
- buffer(Collections.singletonList((ModifyOperation) operation));
- } else if (operation instanceof CreateTableOperation ||
- operation instanceof DropTableOperation ||
- operation instanceof AlterTableOperation ||
- operation instanceof CreateViewOperation ||
- operation instanceof DropViewOperation ||
- operation instanceof CreateDatabaseOperation ||
- operation instanceof DropDatabaseOperation ||
- operation instanceof AlterDatabaseOperation ||
- operation instanceof CreateCatalogFunctionOperation ||
- operation instanceof CreateTempSystemFunctionOperation ||
- operation instanceof DropCatalogFunctionOperation ||
- operation instanceof DropTempSystemFunctionOperation ||
- operation instanceof AlterCatalogFunctionOperation ||
- operation instanceof CreateCatalogOperation ||
- operation instanceof DropCatalogOperation ||
- operation instanceof UseCatalogOperation ||
- operation instanceof UseDatabaseOperation) {
- executeOperation(operation);
- } else {
- throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
- }
- }

parse(stmt)方法最终返回了Collections.singletonList(operation)
- @Override
- public List<Operation> parse(String statement) {
- CalciteParser parser = calciteParserSupplier.get();
- FlinkPlannerImpl planner = validatorSupplier.get();
- // parse the sql query
- SqlNode parsed = parser.parse(statement);
-
- Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
- .orElseThrow(() -> new TableException("Unsupported query: " + statement));
- return Collections.singletonList(operation);
- }
buffer(Collections.singletonList((ModifyOperation) operation))将操作加到了一块
- private void buffer(List<ModifyOperation> modifyOperations) {
- bufferedModifyOperations.addAll(modifyOperations);
- }
返回到org.apache.flink.table.client.gateway.local.LocalExecutor#executeUpdateInternal
- //Todo: we should refactor following condition after TableEnvironment has support submit job directly.
- if (!INSERT_SQL_PATTERN.matcher(statement.trim()).matches()) {
- return null;
- }
-
- // create pipeline
- final String jobName = sessionId + ": " + statement;
- final Pipeline pipeline;
- try {
- pipeline = context.createPipeline(jobName);
- } catch (Throwable t) {
- // catch everything such that the statement does not crash the executor
- throw new SqlExecutionException("Invalid SQL statement.", t);
- }
-
- // create a copy so that we can change settings without affecting the original config
- Configuration configuration = new Configuration(context.getFlinkConfig());
- // for update queries we don't wait for the job result, so run in detached mode
- configuration.set(DeploymentOptions.ATTACHED, false);
-
- // create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
-
- // blocking deployment
- try {
- JobClient jobClient = deployer.deploy().get();
- return ProgramTargetDescriptor.of(jobClient.getJobID());
- } catch (Exception e) {
- throw new RuntimeException("Error running SQL job.", e);
- }
- context.createPipeline(jobName)
-
- public Pipeline createPipeline(String name) {
- return wrapClassLoader(() -> {
- if (streamExecEnv != null) {
- StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
- return streamTableEnv.getPipeline(name);
- } else {
- BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl) tableEnv;
- return batchTableEnv.getPipeline(name);
- }
- });
- }

- configuration.set(DeploymentOptions.ATTACHED, false);
- // create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
org.apache.flink.table.client.gateway.local.ProgramDeployer用于在集群上部署一个表程序。
异步提交Flink Job
- public CompletableFuture<JobClient> deploy() {
- LOG.info("Submitting job {} for query {}`", pipeline, jobName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Submitting job {} with configuration: \n{}", pipeline, configuration);
- }
-
- if (configuration.get(DeploymentOptions.TARGET) == null) {
- throw new RuntimeException("No execution.target specified in your configuration file.");
- }
-
- PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
- final PipelineExecutorFactory executorFactory;
- try {
- executorFactory = executorServiceLoader.getExecutorFactory(configuration);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve ExecutorFactory.", e);
- }
-
- final PipelineExecutor executor = executorFactory.getExecutor(configuration);
- CompletableFuture<JobClient> jobClient;
- try {
- jobClient = executor.execute(pipeline, configuration);
- } catch (Exception e) {
- throw new RuntimeException("Could not execute program.", e);
- }
- return jobClient;
- }

地址:https://github.com/y0908105023/wiki/wiki/Flink-Sql-Client-%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90
作者:y0908105023
end
- Flink 从入门到精通 系列文章
-
- 基于 Apache Flink 的实时监控告警系统
- 关于数据中台的深度思考与总结(干干货)
- 日志收集Agent,阴暗潮湿的地底世界
公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/742958
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。