赞
踩
本文是实操,具体原理和知识,可以看前置的两个帖子 :
深入理解flinksql执行流程,calcite与catalog相关概念,扩展解析器实现语法的扩展
calcite在flink中的二次开发,介绍解析器与优化器
我们本文做一个语法扩展的实践开发。
在flink中添加一个showfunctions的方法。
函数的核心是unparse。
总的目录结构为
flink-table
-flink-sql-parser
–codegen
–java
-target
这里开发一个dql函数 ,flinklsql语法 需要继承 org.apache.calcite.sql.SqlCall
src/main/java/org/apache/flink/sql/parser/dql/SqlShowFunctions.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.sql.parser.dql;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.Collections;
import java.util.List;
/** SHOW [USER] FUNCTIONS Sql Call. */
public class SqlShowFunctions extends SqlCall {
public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("SHOW FUNCTIONS", SqlKind.OTHER);
private final boolean requireUser;
public SqlShowFunctions(SqlParserPos pos, boolean requireUser) {
super(pos);
this.requireUser = requireUser;
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return Collections.EMPTY_LIST;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
if (requireUser) {
writer.keyword("SHOW USER FUNCTIONS");
} else {
writer.keyword("SHOW FUNCTIONS");
}
}
public boolean requireUser() {
return requireUser;
}
}
血缘关系可以通过继承上翻 SqlCall 继承自sqlNode,是不是很眼熟,怎么样 是不是跟前两篇帖子联系起来了。
修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
SqlShowFunctions SqlShowFunctions() :
{
SqlParserPos pos;
boolean requireUser = false;
}
{
<SHOW> { pos = getPos();}
[
<USER> { requireUser = true; }
]
<FUNCTIONS>
{
return new SqlShowFunctions(pos.plus(getPos()), requireUser);
}
}
这里的方法是通过javacc 和反射最后使用的,不清楚的同学可以开开始的两个帖子。
将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
Parser.tdd
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{
# Generated parser implementation package and class name.
package: "org.apache.flink.sql.parser.impl",
class: "FlinkSqlParserImpl",
# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
# Please keep the import classes in alphabetical order if new class is added.
imports: [
"org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement"
"org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint"
"org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec"
"org.apache.flink.sql.parser.ddl.SqlAddJar"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterTable"
"org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
"org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint"
"org.apache.flink.sql.parser.ddl.SqlAlterTableOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
"org.apache.flink.sql.parser.ddl.SqlAlterTableReset"
"org.apache.flink.sql.parser.ddl.SqlAlterView"
"org.apache.flink.sql.parser.ddl.SqlAlterViewAs"
"org.apache.flink.sql.parser.ddl.SqlAlterViewRename"
"org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
"org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
"org.apache.flink.sql.parser.ddl.SqlCreateFunction"
"org.apache.flink.sql.parser.ddl.SqlCreateTable"
"org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
"org.apache.flink.sql.parser.ddl.SqlCreateView"
"org.apache.flink.sql.parser.ddl.SqlDropCatalog"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
"org.apache.flink.sql.parser.ddl.SqlDropFunction"
"org.apache.flink.sql.parser.ddl.SqlDropTable"
"org.apache.flink.sql.parser.ddl.SqlDropView"
"org.apache.flink.sql.parser.ddl.SqlRemoveJar"
"org.apache.flink.sql.parser.ddl.SqlReset"
"org.apache.flink.sql.parser.ddl.SqlSet"
"org.apache.flink.sql.parser.ddl.SqlTableColumn"
"org.apache.flink.sql.parser.ddl.SqlTableLike"
"org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption"
"org.apache.flink.sql.parser.ddl.SqlTableLike.MergingStrategy"
"org.apache.flink.sql.parser.ddl.SqlTableLike.SqlTableLikeOption"
"org.apache.flink.sql.parser.ddl.SqlTableOption"
"org.apache.flink.sql.parser.ddl.SqlUseCatalog"
"org.apache.flink.sql.parser.ddl.SqlUseDatabase"
"org.apache.flink.sql.parser.ddl.SqlUseModules"
"org.apache.flink.sql.parser.ddl.SqlWatermark"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dml.RichSqlMerge"
"org.apache.flink.sql.parser.dml.SqlMergeAction"
"org.apache.flink.sql.parser.dml.SqlBeginStatementSet"
"org.apache.flink.sql.parser.dml.SqlEndStatementSet"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
"org.apache.flink.sql.parser.dql.SqlShowDatabases"
"org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase"
"org.apache.flink.sql.parser.dql.SqlShowFunctions"
"org.apache.flink.sql.parser.dql.SqlShowJars"
"org.apache.flink.sql.parser.dql.SqlShowModules"
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowCreateTable"
"org.apache.flink.sql.parser.dql.SqlShowViews"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlRawTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlTimestampLtzTypeNameSpec"
"org.apache.flink.sql.parser.utils.ParserResource"
"org.apache.flink.sql.parser.validate.FlinkSqlConformance"
"org.apache.flink.sql.parser.SqlProperty"
"org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
"org.apache.calcite.sql.SqlCreate"
"org.apache.calcite.sql.SqlDrop"
"java.util.ArrayList"
"java.util.HashSet"
"java.util.List"
"java.util.Set"
]
# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
# keyword, please also add it to 'nonReservedKeywords' section.
# Please keep the keyword in alphabetical order if new keyword is added.
keywords: [
"BYTES"
"CATALOGS"
"CHANGELOG_MODE"
"COMMENT"
"DATABASES"
"ENFORCED"
"ESTIMATED_COST"
"EXTENDED"
"FUNCTIONS"
"IF"
"JSON_EXECUTION_PLAN"
"JAR"
"JARS"
"LOAD"
"METADATA"
"MODULES"
"OVERWRITE"
"OVERWRITING"
"PARTITIONED"
"PARTITIONS"
"PYTHON"
"RAW"
"REMOVE"
"RENAME"
"SCALA"
"STRING"
"TABLES"
"UNLOAD"
"USE"
"VIEWS"
"VIRTUAL"
"WATERMARK"
"WATERMARKS"
"TIMESTAMP_LTZ"
]
# List of keywords from "keywords" section that are not reserved.
nonReservedKeywords: [
"A"
"ABSENT"
"ABSOLUTE"
"ACTION"
"ADA"
"ADD"
"ADMIN"
"AFTER"
"ALWAYS"
"APPLY"
"ASC"
"ASSERTION"
"ASSIGNMENT"
"ATTRIBUTE"
"ATTRIBUTES"
"BEFORE"
"BERNOULLI"
"BREADTH"
"C"
"CASCADE"
"CATALOG"
"CATALOG_NAME"
"CENTURY"
"CHAIN"
"CHARACTERISTICS"
"CHARACTERS"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
"CHARACTER_SET_SCHEMA"
"CLASS_ORIGIN"
"COBOL"
"COLLATION"
"COLLATION_CATALOG"
"COLLATION_NAME"
"COLLATION_SCHEMA"
"COLUMN_NAME"
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
"CONDITIONAL"
"CONDITION_NUMBER"
"CONNECTION"
"CONNECTION_NAME"
"CONSTRAINT_CATALOG"
"CONSTRAINT_NAME"
"CONSTRAINTS"
"CONSTRAINT_SCHEMA"
"CONSTRUCTOR"
"CONTINUE"
"CURSOR_NAME"
"DATA"
"DATABASE"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
"DAYS"
"DECADE"
"DEFAULTS"
"DEFERRABLE"
"DEFERRED"
"DEFINED"
"DEFINER"
"DEGREE"
"DEPTH"
"DERIVED"
"DESC"
"DESCRIPTION"
"DESCRIPTOR"
"DIAGNOSTICS"
"DISPATCH"
"DOMAIN"
"DOW"
"DOY"
"DYNAMIC_FUNCTION"
"DYNAMIC_FUNCTION_CODE"
"ENCODING"
"EPOCH"
"ERROR"
"EXCEPTION"
"EXCLUDE"
"EXCLUDING"
"FINAL"
"FIRST"
"FOLLOWING"
"FORMAT"
"FORTRAN"
"FOUND"
"FRAC_SECOND"
"G"
"GENERAL"
"GENERATED"
"GEOMETRY"
"GO"
"GOTO"
"GRANTED"
"HIERARCHY"
"HOP"
"HOURS"
"IGNORE"
"IMMEDIATE"
"IMMEDIATELY"
"IMPLEMENTATION"
"INCLUDING"
"INCREMENT"
"INITIALLY"
"INPUT"
"INSTANCE"
"INSTANTIABLE"
"INVOKER"
"ISODOW"
"ISOLATION"
"ISOYEAR"
"JAR"
"JARS"
"JAVA"
"JSON"
"K"
"KEY"
"KEY_MEMBER"
"KEY_TYPE"
"LABEL"
"LAST"
"LENGTH"
"LEVEL"
"LIBRARY"
"LOAD"
"LOCATOR"
"M"
"MAP"
"MATCHED"
"MAXVALUE"
"MESSAGE_LENGTH"
"MESSAGE_OCTET_LENGTH"
"MESSAGE_TEXT"
"MICROSECOND"
"MILLENNIUM"
"MILLISECOND"
"MINUTES"
"MINVALUE"
"MONTHS"
"MORE_"
"MUMPS"
"NAME"
"NAMES"
"NANOSECOND"
"NESTING"
"NORMALIZED"
"NULLABLE"
"NULLS"
"NUMBER"
"OBJECT"
"OCTETS"
"OPTION"
"OPTIONS"
"ORDERING"
"ORDINALITY"
"OTHERS"
"OUTPUT"
"OVERRIDING"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
"PARAMETER_ORDINAL_POSITION"
"PARAMETER_SPECIFIC_CATALOG"
"PARAMETER_SPECIFIC_NAME"
"PARAMETER_SPECIFIC_SCHEMA"
"PARTIAL"
"PASCAL"
"PASSING"
"PASSTHROUGH"
"PAST"
"PATH"
"PLACING"
"PLAN"
"PLI"
"PRECEDING"
"PRESERVE"
"PRIOR"
"PRIVILEGES"
"PUBLIC"
"PYTHON"
"QUARTER"
"READ"
"RELATIVE"
"REMOVE"
"REPEATABLE"
"REPLACE"
"RESPECT"
"RESTART"
"RESTRICT"
"RETURNED_CARDINALITY"
"RETURNED_LENGTH"
"RETURNED_OCTET_LENGTH"
"RETURNED_SQLSTATE"
"RETURNING"
"ROLE"
"ROUTINE"
"ROUTINE_CATALOG"
"ROUTINE_NAME"
"ROUTINE_SCHEMA"
"ROW_COUNT"
"SCALAR"
"SCALE"
"SCHEMA"
"SCHEMA_NAME"
"SCOPE_CATALOGS"
"SCOPE_NAME"
"SCOPE_SCHEMA"
"SECONDS"
"SECTION"
"SECURITY"
"SELF"
"SEQUENCE"
"SERIALIZABLE"
"SERVER"
"SERVER_NAME"
"SESSION"
"SETS"
"SIMPLE"
"SIZE"
"SOURCE"
"SPACE"
"SPECIFIC_NAME"
"SQL_BIGINT"
"SQL_BINARY"
"SQL_BIT"
"SQL_BLOB"
"SQL_BOOLEAN"
"SQL_CHAR"
"SQL_CLOB"
"SQL_DATE"
"SQL_DECIMAL"
"SQL_DOUBLE"
"SQL_FLOAT"
"SQL_INTEGER"
"SQL_INTERVAL_DAY"
"SQL_INTERVAL_DAY_TO_HOUR"
"SQL_INTERVAL_DAY_TO_MINUTE"
"SQL_INTERVAL_DAY_TO_SECOND"
"SQL_INTERVAL_HOUR"
"SQL_INTERVAL_HOUR_TO_MINUTE"
"SQL_INTERVAL_HOUR_TO_SECOND"
"SQL_INTERVAL_MINUTE"
"SQL_INTERVAL_MINUTE_TO_SECOND"
"SQL_INTERVAL_MONTH"
"SQL_INTERVAL_SECOND"
"SQL_INTERVAL_YEAR"
"SQL_INTERVAL_YEAR_TO_MONTH"
"SQL_LONGVARBINARY"
"SQL_LONGVARCHAR"
"SQL_LONGVARNCHAR"
"SQL_NCHAR"
"SQL_NCLOB"
"SQL_NUMERIC"
"SQL_NVARCHAR"
"SQL_REAL"
"SQL_SMALLINT"
"SQL_TIME"
"SQL_TIMESTAMP"
"SQL_TINYINT"
"SQL_TSI_DAY"
"SQL_TSI_FRAC_SECOND"
"SQL_TSI_HOUR"
"SQL_TSI_MICROSECOND"
"SQL_TSI_MINUTE"
"SQL_TSI_MONTH"
"SQL_TSI_QUARTER"
"SQL_TSI_SECOND"
"SQL_TSI_WEEK"
"SQL_TSI_YEAR"
"SQL_VARBINARY"
"SQL_VARCHAR"
"STATE"
"STATEMENT"
"STRUCTURE"
"STYLE"
"SUBCLASS_ORIGIN"
"SUBSTITUTE"
"TABLE_NAME"
"TEMPORARY"
"TIES"
"TIMESTAMPADD"
"TIMESTAMPDIFF"
"TOP_LEVEL_COUNT"
"TRANSACTION"
"TRANSACTIONS_ACTIVE"
"TRANSACTIONS_COMMITTED"
"TRANSACTIONS_ROLLED_BACK"
"TRANSFORM"
"TRANSFORMS"
"TRIGGER_CATALOG"
"TRIGGER_NAME"
"TRIGGER_SCHEMA"
"TUMBLE"
"TYPE"
"UNBOUNDED"
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
"UNLOAD"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
"USER_DEFINED_TYPE_CODE"
"USER_DEFINED_TYPE_NAME"
"USER_DEFINED_TYPE_SCHEMA"
"UTF16"
"UTF32"
"UTF8"
"VERSION"
"VIEW"
"WEEK"
"WORK"
"WRAPPER"
"WRITE"
"XML"
"YEARS"
"ZONE"
]
# List of non-reserved keywords to add;
# items in this list become non-reserved.
# Please keep the keyword in alphabetical order if new keyword is added.
nonReservedKeywordsToAdd: [
# not in core, added in Flink
"CHANGELOG_MODE"
"ENFORCED"
"ESTIMATED_COST"
"IF"
"JSON_EXECUTION_PLAN"
"METADATA"
"OVERWRITE"
"OVERWRITING"
"PARTITIONED"
"PARTITIONS"
"VIRTUAL"
]
# List of non-reserved keywords to remove;
# items in this list become reserved
nonReservedKeywordsToRemove: [
]
# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"RichSqlInsert()"
"SqlBeginStatementSet()"
"SqlEndStatementSet()"
"SqlLoadModule()"
"SqlShowCatalogs()"
"SqlShowCurrentCatalogOrDatabase()"
"SqlDescribeCatalog()"
"SqlUseCatalog()"
"SqlShowDatabases()"
"SqlUseDatabase()"
"SqlAlterDatabase()"
"SqlDescribeDatabase()"
"SqlAlterFunction()"
"SqlShowFunctions()"
"SqlShowTables()"
"SqlShowCreateTable()"
"SqlRichDescribeTable()"
"SqlAlterTable()"
"SqlAlterView()"
"SqlShowModules()"
"SqlShowViews()"
"RichSqlMerge()"
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
"SqlAddJar()"
"SqlRemoveJar()"
"SqlShowJars()"
"SqlSet()"
"SqlReset()"
]
# List of methods for parsing custom literals.
# Return type of method implementation should be "SqlNode".
# Example: ParseJsonLiteral().
literalParserMethods: [
]
# List of methods for parsing ddl supported data types.
# Return type of method implementation should be "SqlTypeNameSpec".
# Example: SqlParseTimeStampZ().
dataTypeParserMethods: [
"ExtendedSqlBasicTypeName()"
"CustomizedCollectionsTypeName()"
"SqlMapTypeName()"
"SqlRawTypeName()"
"ExtendedSqlRowTypeName()"
]
# List of methods for parsing builtin function calls.
# Return type of method implementation should be "SqlNode".
# Example: DateFunctionCall().
builtinFunctionCallMethods: [
]
# List of methods for parsing extensions to "ALTER <scope>" calls.
# Each must accept arguments "(SqlParserPos pos, String scope)".
# Example: "SqlUploadJarNode"
alterStatementParserMethods: [
]
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
"SqlCreateExtended"
]
# List of methods for parsing extensions to "DROP" calls.
# Each must accept arguments "(Span s)".
dropStatementParserMethods: [
"SqlDropExtended"
]
# Binary operators tokens
binaryOperatorsTokens: [
]
# Binary operators initialization
extraBinaryExpressions: [
]
# List of files in @includes directory that have parser method
# implementations for parsing custom SQL statements, literals or types
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
implementationFiles: [
"parserImpls.ftl"
]
# List of additional join types. Each is a method with no arguments.
# Example: LeftSemiJoin()
joinTypes: [
]
includePosixOperators: false
includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
}
生成target
withParserFactory(FlinkSqlParserImpl.FACTORY)
可以看到在 FlinkSqlParserImpl.java中已经又相应的语法函数生成
将刚刚target中生成的代码 拷贝回main中 并开发测试用例
执行通过
修改校验器部分的代码 FlinkPlannerImpl#validate
还有SqlToOperationConverter#convert
SqlToOperationConverter#convertXShowCatalogs
并在org.apache.flink.table.operations中创建ShowFunctionsOperation
src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
SqlToOperationConverter# convertShowFunctions
至此就完成全部的开发了
最后 pom的问题,需要配置javacc 和 flink相关pom ,以及生成文件的complie 这里直接贴一个现成的
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-table</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.14.0-mdh1.14.0.0-SNAPSHOT</version>
</parent>
<artifactId>flink-sql-parser</artifactId>
<name>Flink : Table : SQL Parser </name>
<packaging>jar</packaging>
<properties>
<!-- override parent pom -->
<test.excludedGroups/>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
<version>${calcite.version}</version>
<exclusions>
<!--
"mvn dependency:tree" as of Calcite 1.26.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.26.0:compile
[INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.17.0:compile
[INFO] | +- org.apiguardian:apiguardian-api:jar:1.1.0:compile
Dependencies that are not needed for how we use Calcite right now.
-->
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-metrics</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-server</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</exclusion>
<exclusion>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
<exclusion>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
</exclusion>
<exclusion>
<groupId>net.hydromatic</groupId>
<artifactId>aggdesigner-algorithm</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-linq4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.uzaygezen</groupId>
<artifactId>uzaygezen-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.uzaygezen</groupId>
<artifactId>uzaygezen-core</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- adding fmpp code gen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- This must be run AFTER the fmpp-maven-plugin -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<!-- This must be kept synced with Apache Calcite. -->
<lookAhead>1</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
1、对 SQL 进行校验
final SqlNode validated = flinkPlanner.validate(sqlNode);
2、预校验重写 Insert 语句
3、调用 SqlNode.validate() 进行校验
1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
3)如果是:SqlRichExplain
4)其它:validator.validate(sqlNode)
1.校验作用域和表达式:validateScopedExpression(topNode, scope)
a)将 SqlNode 进行规范化重写
b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
c)校验 validateQuery
i)validateFeature
ii)validateNamespace
iii)validateModality
iv)validateAccess
v)validateSnapshot
d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
2.获取校验之后的节点类型
2、将 SQLNode 转换为 Operation
converter.convertSqlQuery(validated)
1)生成逻辑执行计划 RelNode
RelRoot relational = planner.rel(validated);
1.对查询进行转换
sqlToRelConverter.convertQuery(validatedSqlNode)
2)创建 PlannerQueryOperation
new PlannerQueryOperation(relational.project());
3、将 Operation 转换为 List<Transformation<?>> List
2)将 optimizedRelNodes 转换为 execGraph
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
3)将 execGraph 转换为 transformations
1.使用代码生成技术生成Function,后续可以反射调用
val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。