赞
踩
Flink 通过支持标准 ANSI SQL的 Apache Calcite解析 SQL。
CREATE语句适用于当前或指定的Catalog中注册表、视图或函数。注册后的表、视图和函数可以在SQL查询中适用。
CREATE TABLE [catalog_name.][db_name.]table_name ( { <column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) <column_definition>: column_name column_type [COMMENT column_comment] <computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment] <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
根据指定的表名常见一个表,如果同名表已经在Catalog中存在了,则无法注册。
(1)计算列(COMPUTED COLUMN)
计算列是一个适用“column_name AS computed_column_expression”语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以适用cost as price * quantity进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。
在Flink中计算列一般用于为CREATE TABLE语句定义时间属性。处理时间属性可以简单地通过使用系统函数PROCETIME()的proc AS PROCTIME()语句进行定义。另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是TIMESTAMP(3)或嵌套在JSON字符串中。
注意:
① 定义在一个数据源表(source table)上的计算列会在从数据源读取数据后被计算,它们可以在SELECT语句中使用。
② 计算列不可以作为INSERT语句的目标,在INSERT语句中,SELECT语句的schema需要与目标表不带有计算列的schema一致。
(2)WATERMARK
WATERMARK定义了表的事件时间属性,其形式化为WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
rowtime_column_name把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为TIMESTAME(3),且是schema中的顶层列,它也可以是一个计算列。
watermark_strategy_expression定义了watermark的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算watermark;表达式的返回类型必须是TIMESTAMP(3),表示了从Epoch以来的经过的时间。返回的watermark只有当其不为空且其值大于之前发出的本地watermark时才会被发现(以保证watermark递增)。每条记录的watermark生成表达式计算都会由框架完成。框架会定期发出所生成的最大的watermark,如果当前watermark仍然与前一个watermark相同、为空、或返回的watermark的值小于最后一个发出的watermark,则新的watermark不会被发出。Watermark根据pipline.auto-watermark-interval中所配置的间隔发出。若watermark的间隔是0ms,那么每条记录都会产生一个watermark,且watermark会在不为空并大于上一个发出的watermark时发出。
使用事件时间语义时,表必须包含事件时间属性和watermark策略。
Flink提供了集中常用的watermark策略。
① 严格递增时间戳:watermark for rowtime_column as rowtime_column。
发出到目前为止已观察到的最大时间戳的watermark,时间戳小于最大时间戳的行被认为没有迟到。
② 递增时间戳:watermark for rowtime_colum as rowtime_column – INTERVAL ‘0.001’ SECOND
发出到目前为止已观察到的最大时间戳减1的watermark,时间戳等于或小于最大时间戳的行被认为没有迟到。
③ 有界乱序时间戳:watermark for rowtime_column as rowtime_column – INTERVAL ‘string’ timeunit。
发出到目前为止已观察到的最大时间戳减去指定延迟的watermark,例如,watermark for rowtime_column as rowtime_column – interval ‘5’ SECOND是一个5秒延迟的watermark策略。
CREATE TABLE Orders (
user BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
(3)PARTITIONED BY
根据指定的列对已经创建的表进行分区。若表使用filesystem sink,则将会为每个分区创建一个目录。
(4)WITH OPTIONS
表属性用于创建table source/sink,一般用于寻找和创建底层的连接器。具体用法参见。
表达式key1=val1的键和值必须为字符串文本常量。
**注意:**表名可以为以下三种格式① catalog_name.db_name.table_name;② db_name.table_name;③ table_name。使用catalog_name.db_name.table_name 的表将会与名为 “catalog_name” 的 catalog 和名为 “db_name” 的数据库一起注册到 metastore 中。使用 db_name.table_name 的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 “db_name”;对于 table_name, 数据表将会被注册到当前正在运行的catalog和数据库中。
使用CREATE TABLE语句注册的表均可用作table source和table sink。在被 DML语句引用前,我们无法决定其实际用于source抑或是sink。
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
根据给定的数据库属性创建数据库。若数据库中已存在同名数据库会抛出异常。
(1)IF NOT EXISTS
若数据库已经存在,则不会进行任何操作。
(2)WITH OPTIONS
数据库属性一般用于存储关于这个数据库额外的信息。表达式key1=val1中的键和值都需要是字符串文本常量。
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA]
创建一个有catalog和数据库命名空间的catalog function,其需要指定JAVA/SCALA或其他language tag完整的classpath。若catalog中,已经有同名的函数注册了,则无法注册。
(1)TEMPORARY
创建一个有catalog和数据库命名空间的临时catalog function,并覆盖原有的catalog function。
(2)TEMPORARY SYSTEM
创建一个没有数据库命名空间的临时系统catalog function,并覆盖系统内置的函数。
(3)IF NOT ESISTS
若该函数已经存在,则不会进行任何操作。
(4)LANGUAGE JAVA|SCALA
Language tag用于指定Flink runtime如何执行这个函数。目前只支持JAVA和SCALA,且函数的默认语言为JAVA。
DROP 语句用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
根据给定的表名删除某个表。若需要删除的表不存在,则抛出异常。
(1)IF EXISTS
表不存在时不会进行任何操作。
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
根据给定的表名删除数据库。若需要删除的数据库不存在会抛出异常。
(1)IF EXISTS
若数据库不存在,不执行任何操作。
(2)RESTRICT
当删除一个非空数据库时,会触发异常。(默认打开)
(3)CASCADE
删除一个非空数据库时,把相关联的表与函数一并删除。
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name
删除一个有catalog和数据库命名空间的catalog function。若需要删除的函数不存在,则会产生异常。
(1)TEMPORARY
删除一个有catalog和数据库命名空间的临时catalog function。
(2)TEMPORARY SYSTEM
删除一个没有数据库命名空间的临时系统函数。
(3)IF EXISTS
若函数不存在,则不会进行任何操作。
ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。
(1)重命名表
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
把原有的表名更改为新的表名。
(2)设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
为指定的表设置一个或多个属性。若个别属性已经存在于表中,则使用新的值覆盖旧的值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|
修改一个有catalog和数据库命名空间的catalog function,其需要指定JAVA /SCALA或其他language tag完整的classpath。若函数不存在,删除会抛出异常。
(1)TEMPORARY
修改一个有catalog和数据库命名空间的临时catalog function,并覆盖原有的catalog function。
(2)TEMPORARY SYSTEM
修改一个没有数据库命名空间的临时系统catalog function,并覆盖系统内置的函数。
(3)IF EXISTS
若函数不存在,则不进行任何操作。
(4)LANGUAGE JAVA|SCALA
Language tag用于指定Flink runtime如何执行这个函数。目前,只支持JAVA和SCALA,且函数的默认语言为JAVA。
以下 BNF-语法 描述了批处理和流处理查询中所支持的 SQL 特性的超集。
query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
Flink SQL对于标识符(表、属性和函数名)有类似Java的此法约定:
① 不管是否引用标识符,都保留标识符的大小写;
② 标识符区分大小写;
③ 与Java不同的地方在于,通过反引号,可以允许标识符带有非字母的字符(如:“SELECT a AS ‘my field’ FROM t”)。
字符串文本常量需要被单引号包起来(如SELECT ‘Hello World’)。两个单引号表示转义(如 SELECT ‘It’‘s me.’)。字符串文本常量支持Unicode字符,如需明确使用Unicode编码,请使用以下语法:
① 使用反斜杠(\)作为转义字符(默认):SELECT u& ‘\263A’;
② 使用自定义的转义字符:SELECT U&’#263A’ UESCAPE ‘#’。
INSERT 语句用来向表中添加行。
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
(1)OVERWRITE
INSERT OVERWRITE将覆盖表中或分区中的任何已存在的数据。否则,新数据就会追加到表中或分区中。
(2)PARTITION
PARTITION语句应该包含需要插入的静态分区列与值。
(3)示例
-- 创建一个分区表 CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING) PARTITIONED BY (date, country) WITH (...) -- 追加行到该静态分区中 (date='2019-8-30', country='China') INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定 INSERT INTO country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source; -- 覆盖行到静态分区 (date='2019-8-30', country='China') INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定 INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source;
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
values_row:
: (val1 [, val2, ...])
(1)OVERWRITE
INSERT OVERWRITE 将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。
(2)示例
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
本章主要是对Flink SQL的语法进行了总结,同时,对一些比较重要的知识点(例如:计算列和watermark等)进行了相关介绍。目前,也在看阿里云社区的文章,这些文章主要分为两个方面:第一,内置函数,我觉得这方面知识,可以在实战中,边使用边学习,效果会更好;第二,Flink SQL的调优(比较难,靠经验,应该是一直学习的内容),这方面知识点,我觉得对个人提升比较大,后续,我会在这方面多下功夫。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。