当前位置:   article > 正文

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)

flink sql之describe、explain、use、show、load、unload、set、reset、jar、job

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文简单的介绍了show、load/unload、set/unset、jar和job语句,并都提供了详细的使用示例。
本文依赖flink和kafka集群能正常使用。
本文分为7个部分,即show介绍及使用、load/unload介绍及使用和set/unset的介绍及使用、jar和job语句介绍及使用。
本文示例均是在Flink 1.17版本的环境中运行的,有些语句低版本没有。

四、SHOW 语句

SHOW 语句用于列出其相应父对象中的对象,例如 catalog、database、table 和 view、column、function 和 module。
SHOW CREATE 语句用于打印给定对象的创建 DDL 语句。当前的 SHOW CREATE 语句仅在打印给定表和视图的 DDL 语句时可用。

目前 Flink SQL 支持下列 SHOW 语句:

  • SHOW CATALOGS
  • SHOW CURRENT CATALOG
  • SHOW DATABASES
  • SHOW CURRENT DATABASE
  • SHOW TABLES
  • SHOW CREATE TABLE
  • SHOW COLUMNS
  • SHOW VIEWS
  • SHOW CREATE VIEW
  • SHOW FUNCTIONS
  • SHOW MODULES
  • SHOW FULL MODULES
  • SHOW JARS

1、java示例

package org.tablesql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// show catalogs
		tenv.executeSql("SHOW CATALOGS").print();
		
		// show current catalog
		tenv.executeSql("SHOW CURRENT CATALOG").print();
		
		// show databases
		tenv.executeSql("SHOW DATABASES").print();
		
		// show current database
		tenv.executeSql("SHOW CURRENT DATABASE").print();
		
		String sql = "CREATE TABLE alan_ticker2 (\r\n" + 
				"   symbol STRING,\r\n" + 
				"   price DOUBLE,\r\n" + 
				"   tax  DOUBLE,\r\n" + 
				"   rowtime  TIMESTAMP(3),\r\n" + 
				"   WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\r\n" + 
				") WITH (\r\n" + 
				"  'connector' = 'kafka',\r\n" + 
				"  'topic' = 'alan_ticker2_topic',\r\n" + 
				"  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',\r\n" + 
				"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
				"  'format' = 'csv'\r\n" + 
				");";
		tenv.executeSql(sql);

		// show tables
		tenv.executeSql("SHOW TABLES").print();
		
		// show create table
		tenv.executeSql("SHOW CREATE TABLE alan_ticker2").print();
		
		// show columns
		tenv.executeSql("SHOW COLUMNS FROM alan_ticker2 LIKE '%s%'").print();
		
		// create a view
		tenv.executeSql("CREATE VIEW alan_view AS SELECT * FROM alan_ticker2");
		
		// show views
		tenv.executeSql("SHOW VIEWS").print();
		
		// show create view
		tenv.executeSql("SHOW CREATE VIEW alan_view").print();
		
		// show functions
		tenv.executeSql("SHOW FUNCTIONS").print();
		
		// create a user defined function
//		tenv.executeSql("CREATE FUNCTION f1 AS ");
		
		// show user defined functions
		tenv.executeSql("SHOW USER FUNCTIONS").print();
		
		// show modules
		tenv.executeSql("SHOW MODULES").print();
		
		// show full modules
		tenv.executeSql("SHOW FULL MODULES").print();

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 运行结果
-- show catalogs
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

-- show current catalog
+----------------------+
| current catalog name |
+----------------------+
|      default_catalog |
+----------------------+
1 row in set

-- show databases
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set

-- show current database
+-----------------------+
| current database name |
+-----------------------+
|      default_database |
+-----------------------+
1 row in set

-- show tables
+--------------+
|   table name |
+--------------+
| alan_ticker2 |
+--------------+
1 row in set

-- show create table
CREATE TABLE `default_catalog`.`default_database`.`alan_ticker2` (
  `symbol` VARCHAR(2147483647),
  `price` DOUBLE,
  `tax` DOUBLE,
  `rowtime` TIMESTAMP(3),
  WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '1' SECOND
) WITH (
  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
  'format' = 'csv',
  'topic' = 'alan_ticker2_topic',
  'connector' = 'kafka',
  'scan.startup.mode' = 'earliest-offset'
)

-- show columns
+--------+--------+------+-----+--------+-----------+
|   name |   type | null | key | extras | watermark |
+--------+--------+------+-----+--------+-----------+
| symbol | STRING | TRUE |     |        |           |
+--------+--------+------+-----+--------+-----------+
1 row in set

-- show views
+-----------+
| view name |
+-----------+
| alan_view |
+-----------+
1 row in set

-- show create view
CREATE VIEW `default_catalog`.`default_database`.`alan_view`(`symbol`, `price`, `tax`, `rowtime`) as
SELECT *
FROM `default_catalog`.`default_database`.`alan_ticker2`

-- show functions
+-------------------------------+
|                 function name |
+-------------------------------+
|             AGG_DECIMAL_MINUS |
.....
|                withoutColumns |
+-------------------------------+
185 rows in set

-- show user defined functions
Empty set

-- show modules
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

-- show full modules
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
+-------------+------+
1 row in set

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

2、Flink SQL cli示例

Flink SQL> SHOW CATALOGS;
default_catalog

Flink SQL> SHOW DATABASES;
default_database

Flink SQL> CREATE TABLE my_table (...) WITH (...);
[INFO] Table has been created.

Flink SQL> SHOW TABLES;
my_table

Flink SQL> SHOW CREATE TABLE my_table;
CREATE TABLE `default_catalog`.`default_db`.`my_table` (
  ...
) WITH (
  ...
)


Flink SQL> SHOW COLUMNS from MyUserTable LIKE '%f%';
+--------+-------+------+-----+--------+-----------+
|   name |  type | null | key | extras | watermark |
+--------+-------+------+-----+--------+-----------+
| field2 | BYTES | true |     |        |           |
+--------+-------+------+-----+--------+-----------+
1 row in set


Flink SQL> CREATE VIEW my_view AS SELECT * from my_table;
[INFO] View has been created.

Flink SQL> SHOW VIEWS;
my_view

Flink SQL> SHOW CREATE VIEW my_view;
CREATE VIEW `default_catalog`.`default_db`.`my_view`(`field1`, `field2`, ...) as
SELECT *
FROM `default_catalog`.`default_database`.`my_table`

Flink SQL> SHOW FUNCTIONS;
mod
sha256
...

Flink SQL> CREATE FUNCTION f1 AS ...;
[INFO] Function has been created.

Flink SQL> SHOW USER FUNCTIONS;
f1
...

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set


Flink SQL> SHOW FULL MODULES;
+-------------+------+
| module name | used |
+-------------+------+
|        core | true |
+-------------+------+
1 row in set


Flink SQL> SHOW JARS;
/path/to/addedJar.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

3、show tables

其他show 都比较简单,不再赘述。

1)、语法

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
  • 1

展示指定库的所有表,如果没有指定库则展示当前库的所有表。另外返回的结果能被一个可选的匹配字符串过滤。

LIKE 根据可选的 LIKE 语句展示给定库中与 <sql_like_pattern> 是否模糊相似的所有表。

LIKE 子句中 sql 正则式的语法与 MySQL 方言中的语法相同。

  • % 匹配任意数量的字符, 也包括0数量字符, % 匹配一个 % 字符.
  • _ 只匹配一个字符, _ 匹配一个 _ 字符.

2)、示例

假定在 catalog1 的 db1 库有person和dim表,在会话的当前库下有fights和orders表。

  • 显示指定库的所有表
show tables from db1;
-- show tables from catalog1.db1;
-- show tables in db1;
-- show tables in catalog1.db1;
+------------+
| table name |
+------------+
|        dim |
|     person |
+------------+
2 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 显示指定库中相似于指定 SQL 正则式的所有表
show tables from db1 like '%n';
-- show tables from catalog1.db1 like '%n';
-- show tables in db1 like '%n';
-- show tables in catalog1.db1 like '%n';
+------------+
| table name |
+------------+
|     person |
+------------+
1 row in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 显示指定库中不相似于指定 SQL 正则式的所有表
show tables from db1 not like '%n';
-- show tables from catalog1.db1 not like '%n';
-- show tables in db1 not like '%n';
-- show tables in catalog1.db1 not like '%n';
+------------+
| table name |
+------------+
|        dim |
+------------+
1 row in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 显示当前库中的所有表
show tables;
+------------+
| table name |
+------------+
|      items |
|     orders |
+------------+
2 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4、SHOW CREATE TABLE

展示创建指定表的 create 语句。

截至Flink 1.17版本 SHOW CREATE TABLE 只支持通过 Flink SQL DDL 创建的表。

SHOW CREATE TABLE [catalog_name.][db_name.]table_name
  • 1

5、SHOW COLUMNS

展示给定表的所有列。

LIKE 根据可选的 LIKE 语句展示给定表中与 <sql_like_pattern> 是否模糊相似的所有列。

LIKE 子句中 sql 正则式的语法与 MySQL 方言中的语法相同。

1)、语法

SHOW COLUMNS ( FROM | IN ) [[catalog_name.]database.]<table_name> [ [NOT] LIKE <sql_like_pattern>]
  • 1

2)、示例

假定在 catalog1 catalog 中的 database1 数据库中有名为 orders 的表,其结构如下所示:

+---------+-----------------------------+-------+-----------+---------------+----------------------------+
|    name |                        type |  null |       key |        extras |                  watermark |
+---------+-----------------------------+-------+-----------+---------------+----------------------------+
|    user |                      BIGINT | false | PRI(user) |               |                            |
| product |                 VARCHAR(32) |  true |           |               |                            |
|  amount |                         INT |  true |           |               |                            |
|      ts |      TIMESTAMP(3) *ROWTIME* |  true |           |               | `ts` - INTERVAL '1' SECOND |
|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | false |           | AS PROCTIME() |                            |
+---------+-----------------------------+-------+-----------+---------------+----------------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 显示指定表中的所有列
show columns from orders;
-- show columns from database1.orders;
-- show columns from catalog1.database1.orders;
-- show columns in orders;
-- show columns in database1.orders;
-- show columns in catalog1.database1.orders;
+---------+-----------------------------+-------+-----------+---------------+----------------------------+
|    name |                        type |  null |       key |        extras |                  watermark |
+---------+-----------------------------+-------+-----------+---------------+----------------------------+
|    user |                      BIGINT | false | PRI(user) |               |                            |
| product |                 VARCHAR(32) |  true |           |               |                            |
|  amount |                         INT |  true |           |               |                            |
|      ts |      TIMESTAMP(3) *ROWTIME* |  true |           |               | `ts` - INTERVAL '1' SECOND |
|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | false |           | AS PROCTIME() |                            |
+---------+-----------------------------+-------+-----------+---------------+----------------------------+
5 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 显示指定表中相似于指定 SQL 正则式的所有列
show columns from orders like '%r';
-- show columns from database1.orders like '%r';
-- show columns from catalog1.database1.orders like '%r';
-- show columns in orders like '%r';
-- show columns in database1.orders like '%r';
-- show columns in catalog1.database1.orders like '%r';
+------+--------+-------+-----------+--------+-----------+
| name |   type |  null |       key | extras | watermark |
+------+--------+-------+-----------+--------+-----------+
| user | BIGINT | false | PRI(user) |        |           |
+------+--------+-------+-----------+--------+-----------+
1 row in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 显示指定表中不相似于指定 SQL 正则式的所有列
show columns from orders not like '%_r';
-- show columns from database1.orders not like '%_r';
-- show columns from catalog1.database1.orders not like '%_r';
-- show columns in orders not like '%_r';
-- show columns in database1.orders not like '%_r';
-- show columns in catalog1.database1.orders not like '%_r';
+---------+-----------------------------+-------+-----+---------------+----------------------------+
|    name |                        type |  null | key |        extras |                  watermark |
+---------+-----------------------------+-------+-----+---------------+----------------------------+
| product |                 VARCHAR(32) |  true |     |               |                            |
|  amount |                         INT |  true |     |               |                            |
|      ts |      TIMESTAMP(3) *ROWTIME* |  true |     |               | `ts` - INTERVAL '1' SECOND |
|   ptime | TIMESTAMP_LTZ(3) *PROCTIME* | false |     | AS PROCTIME() |                            |
+---------+-----------------------------+-------+-----+---------------+----------------------------+
4 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

6、SHOW JARS

SHOW JARS
  • 1

展示所有通过 ADD JAR 语句加入到 session classloader 中的 jar。

截至Flink 1.17版本 SHOW JARS 命令只能在 SQL CLI 或者 SQL Gateway 中使用.

7、SHOW JOBS

SHOW JOBS
  • 1

展示集群中所有作业。

截至Flink 1.17版本 SHOW JOBS 命令只能在 SQL CLI 或者 SQL Gateway 中使用.

五、LOAD 语句

LOAD 语句用于加载内置的或用户自定义的模块。

1、语法

module_name 是一个简单的标识符。它是区分大小写的,由于它被用于执行模块发现,因此也要与模块工厂(module factory)中定义的模块类型相同。属性 (‘key1’ = ‘val1’, ‘key2’ = ‘val2’, …) 是一个 map 结构,它包含一组键值对(不包括 ’type’ 的键),这些属性会被传递给模块发现服务以实例化相应的模块。

LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
  • 1

2、Java示例

可以使用 TableEnvironment 的 executeSql() 方法执行 LOAD 语句。如果 LOAD 操作执行成功,executeSql() 方法会返回 ‘OK’,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 LOAD 语句。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		// show modules
		tenv.executeSql("SHOW MODULES").print();
		
		// show full modules
		tenv.executeSql("SHOW FULL MODULES").print();
		
		// 加载 hive 模块
		tenv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
		tenv.executeSql("SHOW MODULES").print();
		
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 运行结果
-- show modules
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

-- show full modules
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
+-------------+------+
1 row in set

-- 加载 hive 模块 后 show modules
+-------------+
| module name |
+-------------+
|        core |
|        hive |
+-------------+
2 rows in set

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3、Flink SQL Cli示例

Flink SQL> LOAD MODULE hive WITH ('hive-version' = '3.1.3');
[INFO] Load module succeeded!

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
|        hive |
+-------------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

六、UNLOAD 语句

UNLOAD 语句用于卸载内置的或用户自定义的模块

1、语法

UNLOAD MODULE module_name
  • 1

2、java示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		
		// show modules
		tenv.executeSql("SHOW MODULES").print();
		
		// show full modules
		tenv.executeSql("SHOW FULL MODULES").print();
		
		// 加载 hive 模块
		tenv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
		tenv.executeSql("SHOW MODULES").print();
		
		// 卸载 hive 模块
		tenv.executeSql("UNLOAD MODULE hive");
		tenv.executeSql("SHOW MODULES").print();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 运行结果
-- show modules
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

-- show full modules
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
+-------------+------+
1 row in set

-- 加载 hive 模块 后 show modules
+-------------+
| module name |
+-------------+
|        core |
|        hive |
+-------------+
2 rows in set

-- 卸载 hive 模块 后 show modules
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

3、Flink SQL Cli示例

Flink SQL> UNLOAD MODULE hive;
[INFO] Unload module succeeded!

Flink SQL> SHOW MODULES;
+-------------+
| module name |
+-------------+
|        core |
+-------------+
1 row in set

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

七、SET语句

SET 语句用于修改配置或展示配置。
SET 语句可以在 SQL CLI 中执行。

以下示例展示了如何在 SQL CLI 中执行一条 SET 语句。

Flink SQL> SET 'table.local-time-zone' = 'Europe/Berlin';
[INFO] Session property has been set.

Flink SQL> SET;
'table.local-time-zone' = 'Europe/Berlin'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 语法
SET ('key' = 'value')
  • 1

如果没有指定 key 和 value,它仅仅打印所有属性。否则,它会为 key 设置指定的 value 值。

八、RESET 语句

RESET 语句用于将配置重置为默认值
RESET 语句可以在 SQL CLI 中执行。

以下示例展示了如何在 SQL CLI 中执行一条 RESET 语句。

Flink SQL> RESET 'table.planner';
[INFO] Session property has been reset.

Flink SQL> RESET;
[INFO] All session properties have been set to their default values.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 语法
RESET ('key')
  • 1

如果未指定 key,则将所有属性重置为默认值。否则,将指定的 key 重置为默认值。

九、JAR 语句

JAR 语句用于将用户 jar 添加到 classpath、或将用户 jar 从 classpath 中删除或展示运行时 classpath 中添加的 jar。

截至Flink1.17 版本 Flink SQL 支持以下 JAR 语句:

  • ADD JAR
  • SHOW JARS
  • REMOVE JAR

1、语法

  • add jar
ADD JAR '<path_to_filename>.jar'
  • 1

添加一个 JAR 文件到资源列表中,该 jar 应该位于 Flink 当前支持的本地或远程文件系统 中。添加的 JAR 文件可以使用 SHOW JARS 语句列出。

  • SHOW JARS
SHOW JARS
  • 1

展示所有通过 ADD JAR 语句添加的 jar。

  • REMOVE JAR
REMOVE JAR '<path_to_filename>.jar'
  • 1

删除由 ADD JAR 语句添加的指定 jar。

REMOVE JAR 语句仅适用于 SQL CLI。

2、示例

以下示例展示了如何在 SQL CLI 中运行 JAR 语句。该部分在文章:19、Flink 的Table API 和 SQL 中的自定义函数(2) 中示例添加用户自定义函数。

Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3、限制

请不要通过 ADD JAR 语句来加载 Hive 的source、sink、function、catalog。这是 Hive connector 的一个已知限制,且会在将来版本中修复。请参考使用hive集成的文章:33、Flink之hive介绍与简单示例

十、JOB 语句

Job 语句用于管理作业的生命周期。

截至Flink 1.17版本 Flink SQL 支持以下 JOB 语句:

  • SHOW JOBS
  • STOP JOB

1、语法

  • SHOW JOBS
SHOW JOBS
  • 1

展示 Flink 集群上的作业。

SHOW JOBS 语句仅适用于 SQL CLI 或者 SQL Gateway.

  • STOP JOB
STOP JOB '<job_id>' [WITH SAVEPOINT] [WITH DRAIN]
  • 1

停止指定作业。

WITH SAVEPOINT 在作业停止之前执行 Savepoin。 Savepoint 的路径可以通过集群配置的 state.savepoints.dir 指定, 或者通过 SET 语句指定(后者有更高优先级)。

WITH DRAIN 在触发 savepoint 之前将 Watermark 提升至最大。该操作会可能会触发窗口的计算。请您注意该操作可能导致您之后从该创建的 savepoint 恢复的作业结果不正确。

STOP JOB 语句仅适用于 SQL CLI 或者 SQL Gateway.

2、示例

Flink SQL> SHOW JOBS;
+----------------------------------+----------+---------+----------------------------+
|                           job id | job name |  status |              start time    |
+----------------------------------+----------+---------+----------------------------+
| 228d70913eab60cca85c5e7f78b5783f |    alan_job | RUNNING | 2023-09-11T05:03:51.523 |
+----------------------------------+----------+---------+----------------------------+

Flink SQL> SET 'state.savepoints.dir'='hdfs://server1:8020/flinktest/flinkckp/';
[INFO] Execute statement succeed.

Flink SQL> STOP JOB '228d70913eab60cca85c5e7f78b5783f' WITH SAVEPOINT;
+-----------------------------------------------------------------------------+
|                                      savepoint path                         |
+-----------------------------------------------------------------------------+
| hdfs://server1:8020flinktest/flinkckp/0f93e35e25c3fb87ee8ce3d6393d6344      |
+-----------------------------------------------------------------------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

以上,简单的介绍了show、load/unload、set/unset、jar和job语句,并都提供了详细的使用示例。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/626896
推荐阅读
相关标签
  

闽ICP备14008679号