赞
踩
本文档介绍的各种flink sql
的语法基于flink-1.13.x
,flink版本低于1.13.x的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。
另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持flink-1.13.x
及以上版本。
注意,在 flink sql 中,对表名、字段名、函数名等是严格区分大小写的,为了兼容 hive 等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。
比如 hive ,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用 flink sql 去读写 hive ,出现大写字母时,会出现找不到表或字段的错误。
关键字是不区分大小写的,比如 insert、select、create等。
flink sql 中所有的字符串常量都需要使用英文单引号括起来,不要使用英文双引号以及中文符号。
SET
语句用于修改配置或列出配置。
SET ('key' = 'value')
如果没有指定键和值,则只打印所有属性。否则,使用指定的键值对设置属性值。
Flink SQL> SET 'table.local-time-zone' = 'Europe/Berlin';
[INFO] Session property has been set.
Flink SQL> SET;
'table.local-time-zone' = 'Europe/Berlin'
RESET
语句用于将配置重置为默认值。
RESET ('key')
如果没有指定键,则将所有属性重置为默认值。否则,将指定的键重置为默认值。
Flink SQL> RESET 'table.planner';
[INFO] Session property has been reset.
Flink SQL> RESET;
[INFO] All session properties have been set to their default values.
CREATE
语句用于将 表
/视图
/函数
注册到当前或指定的 Catalog 中。已注册的表
/视图
/函数
可以在SQL查询中使用。
Flink SQL目前支持以下CREATE语句:
语法概述:
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] [ <table_constraint> ][ , ...n] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] ] <physical_column_definition>: column_name column_type [ <column_constraint> ] [COMMENT column_comment] <column_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED <table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED <metadata_column_definition>: column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] <computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment] <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression <source_table>: [catalog_name.][db_name.]table_name <like_options>: { { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...]
上面的语句创建了一个带有给定名称的表。如果catalog
中已经存在同名的表,则会引发异常。
物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效负载。
连接器和格式转化使用这些列(按照定义的顺序)来配置自己。其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。
下面的语句创建了一个只有常规列的表:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
元数据列是SQL标准的扩展,允许访问连接器和/或表中每一行的特定字段。元数据列由metadata关键字表示。例如,元数据列可以用来读取和写入Kafka记录的时间戳,以进行基于时间的操作。
连接器和格式文档列出了每个组件的可用元数据字段。在表的模式中声明元数据列是可选的。
下面的语句创建了一个表,其中包含引用元数据timestamp的附加元数据列:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- 读取和写入kafka记录的时间戳
) WITH (
'connector' = 'kafka'
...
);
每个元数据字段都由基于字符串的键标识,并具有文档化的数据类型。例如,Kafka连接器暴露了一个元数据字段,该字段由键timestamp和数据类型TIMESTAMP_LTZ(3)标识,可以用于读写记录。
在上面的例子中,元数据列record_time成为表模式的一部分,可以像普通列一样进行转换和存储:
INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
为了方便起见,如果将列名直接用于标识元数据,则可以省略FROM子句:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA -- 使用列名作为元数据键
) WITH (
'connector' = 'kafka'
...
);
为方便起见,如果列的数据类型与元数据字段的数据类型不同,可以显式指示强制类型转换,不过要求这两种数据类型是兼容的。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` BIGINT METADATA -- 转化timestamp类型为BIGINT
) WITH (
'connector' = 'kafka'
...
);
默认情况下,planner计划器会假定元数据列可以同时用于读写。然而在许多情况下,外部系统提供的元数据字段用于只读比可写更多。因此,可以使用VIRTUAL关键字将元数据列排除在持久化之外。
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA, -- query-to-sink schema的一部分
`offset` BIGINT METADATA VIRTUAL, -- 不是query-to-sink schema的一部分
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面的示例中,偏移量是一个只读元数据列,并从query-to-sink schema中排除。因此,source-to-query模式(用于SELECT)和query-to-sink(用于INSERT INTO)模式不同:
source-to-query schema:
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
query-to-sink schema:
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
计算列是使用语法column_name AS computed_column_expression生成的虚拟列。
计算列可以引用同一表中声明的其他列的表达式,可以访问物理列和元数据列。列本身并不物理地存储在表中,列的数据类型通过给定的表达式自动派生,不需要手动声明。
计划器会将计算列转换为常规投影。对于优化或水印策略下推,计算列的实际计算可能会跨算子进行,并执行多次,或者在给定查询不需要的情况下跳过。例如,计算列可以定义为:
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity, -- 执行表达式并接收查询结果
) WITH (
'connector' = 'kafka'
...
);
表达式可以是列、常量或函数的任意组合。表达式不能包含子查询。
计算列通常在Flink中用于在CREATE TABLE语句中定义时间属性。
与虚拟元数据列类似,计算列被排除在持久化之外。因此,计算列不能是INSERT INTO语句的目标列。因此,source-to-query模式(用于SELECT)和query-to-sink(用于INSERT - INTO)模式不同:
source-to-query schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
query-to-sink schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
WATERMARK子句用于定义表的事件时间属性,其形式为WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
返回的水印只有在非空且其值大于先前发出的本地水印时才会发出(以保持升序水印的规定)。框架会对每条记录执行水印生成表达式。框架将周期性地发出生成的最大水印。
如果当前水印与前一个相同,或为空,或返回的水印值小于上次发出的水印值,则不会发出新的水印。水印通过pipeline.auto-watermark-interval
配置的时间间隔发出。
如果水印间隔为0ms,弱生成的水印不为空且大于上次发出的水印,则每条记录都发出一次水印。
当使用事件时间语义时,表必须包含事件时间属性和水印策略。
Flink提供了几种常用的水印策略:
严格递增时间戳:WATERMARK FOR rowtime_column AS rowtime_column
发出到目前为止观察到的最大时间戳的水印。时间戳大于最大时间戳的行不属于延迟。
升序时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND
发出到目前为止观察到的最大时间戳减去1的水印。时间戳大于或等于最大时间戳的行不属于延迟。
时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit
发出到目前为止观察到的最大时间戳减去指定延迟的水印,例如:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND是一个延迟5秒的水印策略。
CREATE TABLE Orders (
user
BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL ‘5’ SECOND
) WITH ( . . . );
主键约束是Flink用于优化的一个提示。它告诉flink,指定的表或视图的一列或一组列是唯一的,它们不包含null。主列中的任何一列都不能为空。主键唯一地标识表中的一行。
主键约束可以与列定义(列约束)一起声明,也可以作为单行声明(表约束)。只能使用这两种方式之一,如果同时定义多个主键约束,则会引发异常。
有效性检查
SQL标准指定约束可以是强制的,也可以是不强制的。这将控制是否对传入/传出数据执行约束检查。Flink不保存数据,因此我们希望支持的唯一模式是not forced模式。确保查询执行的主键唯一性由用户负责。
注意:在CREATE TABLE语句中,主键约束会改变列的可空性,也就是说,一个有主键约束的列是不能为NULL的。
根据指定的列对已创建的表进行分区。如果将该表用作filesystem sink,则为每个分区创建一个目录。
用于创建表source/sink的表属性,属性通常用于查找和创建底层连接器。
表达式key1=val1
的键和值都应该是字符串字面值。有关不同连接器的所有受支持的表属性,请参阅连接器中的详细信息。
表名可以是三种格式:
对于catalog_name.db_name.Table_name
,表将被注册到catalog名为“catalog_name”,数据库名为“db_name;
对于db_name.Table_name
,表将注册到当前表执行环境的catalog和数据库名为“db_name”;
对于table_name
,表将注册到表执行环境的当前catalog和数据库中。
注意:用CREATE table语句注册的表既可以用作表source,也可以用作表sink,我们不能决定它是用作源还是用作接收器,直到它在dml语句中被引用。
LIKE子句是SQL特性的变体/组合。子句可用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除其中的某些部分。与SQL标准相反,子句必须在CREATE语句的顶层定义。这是因为子句适用于定义的多个部分,而不仅仅适用于模式部分。
您可以使用该子句重用或覆盖某些连接器属性或向外部定义的表添加水印。例如,在Apache Hive中定义的表中添加水印。
下面为示例语句:
CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE Orders_with_watermark ( -- 增加水印定义 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( -- 覆盖startup-mode 'scan.startup.mode' = 'latest-offset' ) LIKE Orders;
生成的表Orders_with_watermark等价于用以下语句创建的表:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
表特性的合并逻辑可以通过like选项进行控制。
可以控制合并的特性有:
有三种不同的合并策略:
此外,如果没有定义特定的策略,可以使用INCLUDING/EXCLUDING ALL
选项来指定使用什么策略,例如,如果你使用EXCLUDING ALL INCLUDING WATERMARKS
,则表示只有源表中的水印会被包含。
例子:
-- 存储在filesystem中的source表 CREATE TABLE Orders_in_file ( `user` BIGINT, product STRING, order_time_string STRING, order_time AS to_timestamp(order_time) ) PARTITIONED BY (`user`) WITH ( 'connector' = 'filesystem', 'path' = '...' ); -- 想存储在kafka中的对应的表 CREATE TABLE Orders_in_kafka ( -- 增加水印定义 WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', ... ) LIKE Orders_in_file ( -- 不包含任何东西,除了需要的水印计算列。 -- 我们不需要分区和文件系统选项这些kafka不接受的特性。 EXCLUDING ALL INCLUDING GENERATED );
如果没有提供like选项,INCLUDING ALL OVERWRITING OPTIONS
将作为默认选项使用。 无法控制物理列归并行为。这些物理列将被合并,就像使用了INCLUDING
策略一样。
source_table可以是复合标识符。因此,它可以是来自不同catalog或数据库的表,例如:
-- 读取kafka CREATE TABLE kafka_bscc_bsnet_sendmsg ( `time` bigint COMMENT '', `endpoint` string NULL COMMENT '主机名', `P999_delay` int NULL COMMENT 'P999分位的时延', `msg_trace_id` string NULL COMMENT '下发消息的 id', `error_count` int NULL COMMENT '业务方错误机器数量' ) with ( 'connector' = 'kafka', 'topic' = 'this-is-a-topic', 'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092', 'properties.group.id' = 'flinkSql.for-test', 'scan.startup.mode' = 'earliest-offset', -- 格式化配置 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'json.encode.decimal-as-plain-number' = 'true' ) ;
CREATE CATALOG catalog_name WITH (
key1=val1, key2=val2, ...
)
参数
必选
默认值
类型
描述
type
是
无
string
Catalog 的类型。 创建 HiveCatalog 时,该参数必须设置为’hive’。
hive-conf-dir
否
无
string
指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。
如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。
default-database
否
default
string
当一个catalog被设为当前catalog时,所使用的默认当前database。
hive-version
否
无
string
HiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。
hadoop-conf-dir
否
无
string
Hadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用HADOOP_CONF_DIR
环境变量来指定 Hadoop 配置。
因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。
使用给定的目录属性创建目录。如果已经存在同名的目录,则会引发异常。
WITH选择
用于指定与此目录相关的额外信息的目录属性。表达式key1=val1的键和值都应该是字符串字面值。
注意,key和value都应该使用英文单引号括起来。
create catalog hive with (
'type' = 'hive',
'hadoop-conf-dir' = '/path/to/dir',
'hive-conf-dir' = '/path/to/dir'
)
;
如果用户使用的是内存类型的 catalog ,也就是说没有创建 hive catalog ,则默认的 catalog 名称为default_catalog
,默认的 database 名称为default_database
。
在建表时,如果没有单独指定表所属的 catalog 和 database ,则使用上述默认的 catalog 和 database。
建议在建表时,不要指定 catalog 和 database 名称,这样比较方便。
如果用户使用的是 hive 类型的 catalog,也就是用户创建了 hive catalog ,并且使用了创建的 hive catalog(use catalog hive;),
则默认的 catalog 名称为创建的 hive catalog 名称。
比如上面的案例代码,catalog 名称就是hive
,默认的 database 名称为 default。之后新建的表(非临时表),运行时将会出现在 hive 元数据中。
之后通过 HUE 等连接 hive 的工具,就可以通过show catete table table_name
语句查看 flink 建表的元信息。
线上最佳实践
标已存在
的错误,当然可以添加if not exist
来避免。CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
使用给定的数据库属性创建数据库。如果目录中已经存在同名的数据库,则会引发异常。
IF NOT EXISTS
如果数据库已经存在,则不会发生任何事情。
WITH OPTIONS
用于指定与此数据库相关的额外信息的数据库属性。表达式key1=val1的键和值都应该是字符串字面值。
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[( columnName [, columnName ]* )] [COMMENT view_comment]
AS query_expression
使用给定的查询表达式创建视图。如果 catalog 中已经存在同名的视图,则会抛出异常。
TEMPORARY
创建具有目录和数据库名称空间并覆盖视图的临时视图。
IF NOT EXISTS
如果视图已经存在,则不会发生任何事情。
创建视图,可以将负责的查询 sql 进行拆分,以获取更好的阅读体验。
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
创建一个函数,该函数具有带有标识符和可选语言标记的catalog和数据库名称空间。如果目录中已经存在同名的函数,则会引发异常。
如果语言标记是JAVA/SCALA,则标识符是UDF的完整类路径。关于Java/Scala UDF的实现,请参考用户自定义函数。
如果语言标记是PYTHON,则标识符是UDF的完全限定名,例如pyflink.table.tests.test_udf.add。
有关Python UDF的实现,请参阅官网,这里暂不列出。
TEMPORARY
创建具有catalog和数据库名称空间并覆盖编目函数的临时编目函数。
TEMPORARY SYSTEM
创建没有命名空间并覆盖内置函数的临时系统函数。
IF NOT EXISTS
如果函数已经存在,则什么也不会发生。
LANGUAGE JAVA|SCALA|PYTHON
用于指导Flink运行时如何执行该函数的语言标记。目前只支持JAVA、SCALA和PYTHON,函数默认语言为JAVA。
create temporary function fetch_millisecond as 'cn.com.log.function.udf.time.FetchMillisecond' language java;
NSERT
语句用于向表中添加行数据。
select查询结果可以通过使用insert子句插入到表中。
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] [column_list] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
column_list:
(col_name1 [, column_name2, ...])
OVERWRITE
INSERT OVERWRITE将覆盖表或分区中的任何现有数据。否则(INTO),将追加新的数据。
PARTITION
PARTITION子句指定插入语句的静态分区列。
COLUMN LIST
现在有表T(a INT, b INT, c INT), flink支持
INSERT INTO T(c, b) SELECT x, y FROM S
查询的数据列‘x’将被写入列‘c’,查询的数据列‘y’将被写入列‘b’,并且列‘a’被设置为NULL(需保证列‘z’是可以为空的)。
overwrite 和 partition 关键字经常用于写入 hive 。
-- 创建一个分区表 CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING) PARTITIONED BY (date, country) WITH (...) -- 向静态分区(date='2019-8-30', country='China')追加数据行 INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 向分区(date, country)追加数据行,静态data分区值为“2019-8-30”,country为动态分区,该分区值通过每行对应字段值动态获取 INSERT INTO country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source; -- 向静态分区(date='2019-8-30', country='China')覆盖数据 INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 向分区(date, country)覆盖数据行,静态data分区值为“2019-8-30”,country为动态分区,该分区值通过每行对应字段值动态获取 INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source; -- 向静态分区(date='2019-8-30', country='China')追加数据行,cnt字段值被设置为NULL INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') (user) SELECT user FROM page_view_source;
可以使用INSERT…VALUES
语句将数据直接从SQL插入到表中。
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
values_row:
: (val1 [, val2, ...])
OVERWRITE
INSERT OVERWRITE
将覆盖表中任何现有数据。否则,将追加新的数据。
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
运行多个 insert 任务,在 flink UI 界面中,会体现出多个运行图。当然,如果你的多个 insert 语句读取了同一张表,或者是写入了同一张表,flink 则会对其优化,最后生成一张运行图。
USE
语句用于设置当前数据库或catalog,或更改模块的解析顺序和启用状态。
USE CATALOG catalog_name
设置当前catalog。所有未显式指定catalog的后续命令都将使用此catalog。 如果提供的catalog不存在,则抛出异常。默认当前catalog为default_catalog
。
在flink sql
中创建了hive catalog
,再用use
语句使用了hive catalog
之后,flink 就连接上了 hive 的元数据,
之后的create
创建的非临时表,其元数据就会被保存到 hive 的元数据。
如果不想在每个flink sql
任务中重复创建连接外部系统的虚拟表,就可以只在第一个flink sql
任务中创建一次表,之后的flink sql
任务就不再需要编写建表语句了,
只要有创建hive catalog
和use catalog hive
语句即可。
USE MODULES module_name1[, module_name2, ...]
按照声明的顺序设置已启用的模块。所有后续命令将解析启用模块中的元数据(函数/用户定义类型/规则等),并遵循声明顺序。
模块在加载时被默认使用。如果没有使用USE modules语句,加载的模块将被禁用。默认加载和启用的模块是core。如果使用了该语句启动模块,则不在该语句中的模块都将被禁用。
use hive, core;
表示后续使用到的函数/用户定义类型/规则等,会先按照 hive 来解析,如果 hive 解析不了的,再用 flink 来解析。
USE [catalog_name.]database_name
设置当前数据库。所有未显式指定数据库的后续命令都将使用此数据库。如果提供的数据库不存在,则抛出异常。默认的当前数据库是default_database。
select语句
主要是从表中查询数据,然后将数据插入到其他表中。直接在页面中查看select
的结果,目前平台还不支持。
单个反斜杠就可以作为转义字符使用,在select
查询中可以直接使用。
SELECT supplier_id, rating, COUNT(*) AS total
FROM
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。