赞
踩
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink 的select、where、distinct、order by、limit、集合操作以及去重及具体的运行示例。
本文依赖flink和hadoop集群能正常使用。
本文分为8个部分,即介绍了Flink 查询、with子句、条件、distinct、集合操作、order by、limit和去重,并且每个内容均以验证通过示例进行说明。
SELECT 语句和 VALUES 语句是使用 TableEnvironment 的 sqlQuery() 方法指定的。该方法将 SELECT 语句(或 VALUES 语句)的结果作为表返回。表可以在后续的 SQL 和表 API 查询中使用,转换为DataStream或写入 TableSink。SQL 和表 API 查询可以无缝混合,并经过整体优化并转换为单个程序。
为了访问 SQL 查询中的表,必须在TableEnvironment中注册该表。可以从 TableSource、Table、CREATE TABLE 语句、DataStream 注册表。或者,用户也可以在TableEnvironment中注册catalog以指定数据源的位置。
Table.toString() 在其 TableEnvironment 中自动以唯一名称注册表并返回该名称。因此,Table objects可以直接内联到 SQL 查询中,如以下示例所示。
包含不受支持的 SQL 功能的查询会导致表异常。以下各节列出了批处理表和流式处理表上的 SQL 支持的功能。
以下示例演示如何对已注册表和内联表指定 SQL 查询。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-gateway</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader -->
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency> -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
import static org.apache.flink.table.api.Expressions.$;
import java.util.Arrays;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.FormatDescriptor;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestFirstQuery {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
DataStream<Tuple3<Integer,String,Integer>> dataStream = env.fromCollection(Arrays.asList(
new Tuple3(1, "alan", 10),
new Tuple3(2, "alanchan", 60),
new Tuple3(3, "alanchanchn", 70),
new Tuple3(4, "alanchn", 100)));
Table table = tenv.fromDataStream(dataStream,$("id"), $("name"), $("balance"));
// 1、以Table对象作为表查询,没有注册成view
Table queryResult = tenv.sqlQuery("SELECT SUM(balance) FROM " + table + " WHERE balance >=60 ");
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(queryResult, Row.class);
result.print("result:");
//2、 注册成alan_user表进行查询
tenv.createTemporaryView("alan_user", dataStream, $("id"), $("name"), $("balance"));
Table tViewQueryResult = tenv.sqlQuery("SELECT SUM(balance) FROM alan_user WHERE balance >=60 ");
DataStream<Tuple2<Boolean, Row>> tVResult = tenv.toRetractStream(tViewQueryResult, Row.class);
tVResult.print("tVResult:");
//3、创建并注册TableSink
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("balance", DataTypes.INT())
.build();
final TableDescriptor sinkDescriptor = TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "D:\\workspace\\testdata")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", ",")
.build())
.build();
tenv.createTemporaryTable("alan_table_sink", sinkDescriptor);
// 查询alan_user表中的数据插入到alan_table_sink表中
tenv.executeSql("INSERT INTO alan_table_sink SELECT id,name,balance FROM alan_user WHERE balance >= 10 ");
Table tableSinkQueryResult = tenv.sqlQuery("SELECT id,name,balance FROM alan_table_sink ");
// tenv.toChangelogStream(tableSinkQueryResult).print();
// tenv.toDataStream(tableSinkQueryResult).print();
// DataStream<Row> tableSinkResult = tenv.toChangelogStream(tableSinkQueryResult,Schema.newBuilder()
// .column("id", "INT")
// .column("name", "STRING")
// .column("balance", "INT")
// .build());
// DataStream<Row> tableSinkResult = tenv.toChangelogStream(tableSinkQueryResult);
// DataStream<Row> tableSinkResult = tenv.toChangelogStream(tableSinkQueryResult,schema);
DataStream<Tuple2<Boolean, Row>> tableSinkResult = tenv.toRetractStream(tableSinkQueryResult,Row.class);
tableSinkResult.print("tableSinkResult:");
env.execute();
}
}
tableSinkResult::4> (true,+I[1, alan, 10])
tableSinkResult::4> (true,+I[2, alanchan, 60])
tableSinkResult::4> (true,+I[3, alanchanchn, 70])
tableSinkResult::4> (true,+I[4, alanchn, 100])
tVResult::5> (true,+I[60])
tVResult::8> (false,-U[130])
tVResult::7> (true,+U[130])
result::13> (false,-U[60])
result::12> (true,+I[60])
tVResult::6> (false,-U[60])
tVResult::9> (true,+U[230])
result::16> (true,+U[230])
result::15> (false,-U[130])
result::14> (true,+U[130])
通过 TableEnvironment.executeSql() 方法将可执行的 SELECT 语句或 VALUES 语句结果收集到本地。该方法将 SELECT 语句(或 VALUES 语句)的结果作为 TableResult 返回。与 SELECT 语句类似,可以使用 Table.execute() 方法执行 Table 对象,以将查询的内容收集到本地客户端。TableResult.collect() 方法返回一个可关闭的行迭代器。除非收集了所有结果数据,否则选择作业将不会完成。我们应该主动关闭作业,以避免通过 CloseableIterator#close() 方法的资源泄漏。我们还可以通过 TableResult.print() 方法将选择结果打印到客户端控制台。表结果中的结果数据只能访问一次。因此,collect() 和 print() 不能在彼此之后调用。
TableResult.collect() 和 TableResult.print() 在不同的检查点设置下的行为略有不同(要为流式处理作业启用checkpointing,请参阅9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint))。
依赖同上
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
/**
* @author alanchan
*
*/
public class TestExecuteQueryDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String sql = "CREATE TABLE alan_sink_table (\r\n" +
" id BIGINT, \r\n" +
" name STRING, \r\n" +
" age INT\r\n" +
") WITH (\r\n" +
" 'connector' = 'filesystem', \r\n" +
" 'path' = 'D:/workspace/testdata/', \r\n" +
" 'format' = 'csv' \r\n" +
");";
tenv.executeSql(sql);
//execute SELECT statement
String querySQL = "select * from alan_sink_table";
TableResult tableResult = tenv.executeSql(querySQL);
try (CloseableIterator<Row> it = tableResult.collect()) {
while(it.hasNext()) {
Row row = it.next();
System.out.println(row.toString());
}
}
//execute Table
TableResult tableResultSqlQuery = tenv.sqlQuery(querySQL).execute();
tableResultSqlQuery.print();
}
}
本示例的数据在上一个示例中已经写入了,故此处直接使用上一个示例中的数据结果,数据内容如下
DataStream<Tuple3<Integer,String,Integer>> dataStream = env.fromCollection(Arrays.asList(
new Tuple3(1, "alan", 10),
new Tuple3(2, "alanchan", 60),
new Tuple3(3, "alanchanchn", 70),
new Tuple3(4, "alanchn", 100)));
################execute SELECT statement(executeSql)运行结果
+I[1, alan, 10]
+I[2, alanchan, 60]
+I[3, alanchanchn, 70]
+I[4, alanchn, 100]
##################execute Table(sqlQuery)运行结果
+----+----------------------+--------------------------------+-------------+
| op | id | name | age |
+----+----------------------+--------------------------------+-------------+
| +I | 1 | alan | 10 |
| +I | 2 | alanchan | 60 |
| +I | 3 | alanchanchn | 70 |
| +I | 4 | alanchn | 100 |
+----+----------------------+--------------------------------+-------------+
4 rows in set
Flink 使用 Apache Calcite 解析 SQL,它支持标准 ANSI SQL。
以下 BNF 语法描述了批处理和流式查询中支持的 SQL 功能的超集。具体使用示例将在本专栏下进行介绍,并指示哪些功能仅支持批处理或流式处理查询。
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'```
Flink SQL 使用类似于 Java 的标识符(表、属性、函数名)的词法策略:
- 无论是否引用标识符,都会保留标识符的大小写。
- 标识符将区分大小写进行匹配。
- 与Java不同,反引号允许标识符包含非字母数字字符(例如,“从t中选择AS AS my field FROM t”)。
字符串文字必须括在单引号中(例如,SELECT 'Hello World')。复制单引号进行转义(例如,SELECT 'It''s me')。
```sql
Flink SQL> SELECT 'Hello World', 'It''s me';
+----+--------------------------------+--------------------------------+
| op | EXPR$0 | EXPR$1 |
+----+--------------------------------+--------------------------------+
| +I | Hello World | It's me |
+----+--------------------------------+--------------------------------+
Received a total of 1 row
字符串文本中支持 Unicode 字符。如果需要显式 unicode 代码点,请使用以下语法:
WITH 子句提供了一种用于更大查询而编写辅助语句的方法。这些编写的语句通常被称为公用表表达式,表达式可以理解为仅针对某个查询而存在的临时视图。
WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;
<with_item_defintion>:
with_item_name (column_name[, ...n]) AS ( <select_query> )
----------------示例-----------
WITH user_with_avg AS (
SELECT t_id, t_balance / t_age *10 AS t_avg
FROM alan_first_table
)
SELECT t_id, ROUND(sum(t_avg),2) AS avg_10_year
FROM user_with_avg
GROUP BY t_id;
本示例没有实际意义,仅仅演示with子句的用法
Flink SQL> select * from alan_first_table;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op | t_id | t_name | t_balance | t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I | 3 | alanchanchn | 32.23 | 28 |
| +I | 5 | alan_chan_chn | 52.23 | 38 |
| +I | 2 | alanchan | 22.23 | 10 |
| +I | 1 | alan | 12.23 | 18 |
| +I | 4 | alan_chan | 12.43 | 29 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 5 rows
Flink SQL> WITH user_with_avg AS (
> SELECT t_id, t_balance / t_age *10 AS t_avg
> FROM alan_first_table
> )
> SELECT t_id, ROUND(sum(t_avg),2) AS avg_10_year
> FROM user_with_avg
> GROUP BY t_id;
+----+----------------------+--------------------------------+
| op | t_id | avg_10_year |
+----+----------------------+--------------------------------+
| +I | 4 | 4.29 |
| +I | 1 | 6.79 |
| +I | 2 | 22.23 |
| +I | 5 | 13.74 |
| +I | 3 | 11.51 |
+----+----------------------+--------------------------------+
Received a total of 5 rows
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
这里的 table_expression 可以是任意的数据源。它可以是一张已经存在的表、视图或者 VALUES 子句,也可以是多个现有表的关联结果、或一个子查询。这里我们假设 alan_user_table 表在 Catalog 中处于可用状态,那么下面的语句会从 Orders 表中读出所有的行。
SELECT * FROM alan_user_table
Flink SQL> select * from alan_first_table;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op | t_id | t_name | t_balance | t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I | 3 | alanchanchn | 32.23 | 28 |
| +I | 5 | alan_chan_chn | 52.23 | 38 |
| +I | 2 | alanchan | 22.23 | 10 |
| +I | 1 | alan | 12.23 | 18 |
| +I | 4 | alan_chan | 12.43 | 29 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 5 rows
在 select_list 处的 * 表示查询操作将会解析所有列。但是,不鼓励在生产中使用 *,因为它会使查询操作在应对 Catalog 变化的时候鲁棒性降低。相反,可以在 select_list 处指定可用列的子集,或者使用声明的列进行计算。例如,假设 alan_first_table表中有名为t_id 、t_name、t_balance、t_age 的列,那么你可以编写如下查询:
select t_id,t_name,t_balance,t_age from alan_first_table;
Flink SQL> select t_id,t_name,t_balance,t_age from alan_first_table;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op | t_id | t_name | t_balance | t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I | 2 | alanchan | 22.23 | 10 |
| +I | 3 | alanchanchn | 32.23 | 28 |
| +I | 1 | alan | 12.23 | 18 |
| +I | 4 | alan_chan | 12.43 | 29 |
| +I | 5 | alan_chan_chn | 52.23 | 38 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 5 rows
查询操作还可以在 VALUES 子句中使用内联数据。每一个元组对应一行,另外可以通过设置别名来为每一列指定名称。
SELECT t_id, t_balance FROM (VALUES (1, 2.0), (2, 3.1)) AS t (t_id, t_balance);
Flink SQL> SELECT t_id, t_balance FROM (VALUES (1, 2.0), (2, 3.1)) AS t (t_id, t_balance);
+----+-------------+-----------+
| op | t_id | t_balance |
+----+-------------+-----------+
| +I | 1 | 2.0 |
| +I | 2 | 3.1 |
+----+-------------+-----------+
Received a total of 2 rows
可以根据 WHERE 子句对行数据进行过滤。
select t_id,t_name,t_balance,t_age from alan_first_table where t_name like 'alan%';
Flink SQL> select t_id,t_name,t_balance,t_age from alan_first_table where t_name like 'alan%';
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op | t_id | t_name | t_balance | t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I | 2 | alanchan | 22.23 | 10 |
| +I | 3 | alanchanchn | 32.23 | 28 |
| +I | 1 | alan | 12.23 | 18 |
| +I | 5 | alan_chan_chn | 52.23 | 38 |
| +I | 4 | alan_chan | 12.43 | 29 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 5 rows
此外,在任意一行的列上你可以调用内置函数和用户自定义标量函数(user-defined scalar functions)。当然,在使用前用户自定义函数( user-defined functions)必须已经注册到 Catalog 中。
关于下面的示例,请参考文章:44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
Flink SQL> select alan_testdatabase.encryptPhoneNumber("13788889999");
+----+--------------------------------+
| op | _o__c0 |
+----+--------------------------------+
| +I | 137****9999 |
+----+--------------------------------+
Received a total of 1 row
如果使用”SELECT DISTINCT“查询,所有的复制行都会从结果集(每个分组只会保留一行)中被删除
select distinct t_id from alan_first_table;
对于流式查询, 计算查询结果所需要的状态可能会源源不断地增长,而状态大小又依赖不同行的数量.此时,可以通过配置文件为状态设置合适的存活时间(TTL),以防止过大的状态可能对查询结果的正确性的影响.具体配置可参考:查询相关的配置.
Flink SQL> select * from alan_first_table;
+----+----------------------+--------------------------------+--------------------------------+-------------+
| op | t_id | t_name | t_balance | t_age |
+----+----------------------+--------------------------------+--------------------------------+-------------+
| +I | 4 | alan_chan | 12.43 | 29 |
| +I | 1 | alan | 12.23 | 18 |
| +I | 1 | alan | 100.0 | 29 |
| +I | 3 | alanchanchn | 32.23 | 28 |
| +I | 2 | alanchan | 22.23 | 10 |
| +I | 5 | alan_chan_chn | 52.23 | 38 |
+----+----------------------+--------------------------------+--------------------------------+-------------+
Received a total of 6 rows
Flink SQL> select distinct t_id from alan_first_table;
+----+----------------------+
| op | t_id |
+----+----------------------+
| +I | 4 |
| +I | 2 |
| +I | 5 |
| +I | 1 |
| +I | 3 |
+----+----------------------+
Received a total of 5 rows
UNION 和 UNION ALL 返回在任一表中找到的行。UNION 仅采用不同的行,而 UNION ALL 不会从结果行中删除重复项。
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
+---+
| s|
+---+
| c|
| a|
| b|
| d|
| e|
+---+
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
+---+
| c|
+---+
| c|
| a|
| b|
| b|
| c|
| d|
| e|
| a|
| b|
| b|
+---+
INTERSECT 和 INTERSECT ALL 返回在两个表中找到的行。INTERSECT 仅获取不同的行,而 INTERSECT ALL 不会从结果行中删除重复项。
INTERSECT 集合运算对两个输入查询的结果集取其交集,只返回在两个查询结果集中都出现的行。
INTERSECT ALL集合运算中的ALL关键字也意味着不会删除重复行。但INTERSECT ALL与UNION ALL有所不同:INTERSECT ALL不会返回所有重复行,而只返回重复行数目较少的那个多集的所有重复行。换句话说,INTERSECT ALL运算不仅关心一个行是否在两个多集同时存在,还关心它在每个多集中出现的次数。就好像这个集合运算会查找每行的每次匹配一样。
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
+---+
| s|
+---+
| a|
| b|
+---+
Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
+---+
| s|
+---+
| a|
| b|
| b|
+---+
Flink SQL> select * from alan_user_t;
+----+----------------------+--------------------------------+-------------+
| op | t_id | t_name | t_age |
+----+----------------------+--------------------------------+-------------+
| +I | 2 | alanchan | 19 |
| +I | 1 | alan | 18 |
| +I | 3 | alanchanchn | 18 |
+----+----------------------+--------------------------------+-------------+
Flink SQL> select * from alan_user_t2;
+----+----------------------+--------------------------------+-------------+
| op | t_id | t_name | t_age |
+----+----------------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
| +I | 1 | alan | 18 |
| +I | 2 | alanchan | 18 |
+----+----------------------+--------------------------------+-------------+
Flink SQL> (SELECT t_id FROM alan_user_t) INTERSECT (SELECT t_id FROM alan_user_t2);
+----+----------------------+
| op | t_id |
+----+----------------------+
| +I | 2 |
| +I | 1 |
+----+----------------------+
Flink SQL> (SELECT t_id FROM alan_user_t) INTERSECT ALL (SELECT t_id FROM alan_user_t2);
+----+----------------------+
| op | t_id |
+----+----------------------+
| +U | 1 |
| -U | 1 |
| +U | 1 |
| +U | 2 |
+----+----------------------+
EXCEPT 和 EXCEPT ALL 返回在一个表中找到的行,但不返回在另一个表中找到的行。EXCEPT 仅采用不同的行,而 EXCEPT ALL 不会从结果行中删除重复项。
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
+---+
| s |
+---+
| c |
+---+
Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
+---+
| s |
+---+
| c |
| c |
+---+
Flink SQL> select * from alan_user_t;
+----+----------------------+--------------------------------+-------------+
| op | t_id | t_name | t_age |
+----+----------------------+--------------------------------+-------------+
| +I | 2 | alanchan | 19 |
| +I | 1 | alan | 18 |
| +I | 3 | alanchanchn | 18 |
+----+----------------------+--------------------------------+-------------+
Flink SQL> select * from alan_user_t2;
+----+----------------------+--------------------------------+-------------+
| op | t_id | t_name | t_age |
+----+----------------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
| +I | 1 | alan | 18 |
| +I | 2 | alanchan | 18 |
+----+----------------------+--------------------------------+-------------+
Flink SQL> (SELECT t_id FROM alan_user_t) EXCEPT (SELECT t_id FROM alan_user_t2);
+----+----------------------+
| op | t_id |
+----+----------------------+
| +I | 3 |
+----+----------------------+
Flink SQL> (SELECT t_id FROM alan_user_t) EXCEPT ALL (SELECT t_id FROM alan_user_t2);
+----+----------------------+
| op | t_id |
+----+----------------------+
| +I | 2 |
| +I | 1 |
| +I | 3 |
| -U | 1 |
| -U | 2 |
+----+----------------------+
如果给定表子查询中存在表达式,则返回 true。子查询表必须由一列组成。此列必须与表达式具有相同的数据类型。
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
Flink SQL> SELECT *
> FROM alan_user_table
> WHERE u_id IN (
> SELECT u_id FROM alan_w_user_table
> );
>
+----+----------------------+--------------------------------+--------------+-------------+
| op | u_id | u_name | balance | age |
+----+----------------------+--------------------------------+--------------+-------------+
| +I | 1 | alan | 12.4000 | 18 |
+----+----------------------+--------------------------------+--------------+-------------+
Received a total of 1 row
优化程序将 IN 条件重写为连接和组操作。对于流式处理查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。有关详细信息,请参阅查询配置。
SELECT *
FROM alan_user_table
WHERE u_id EXISTS (
SELECT u_id FROM alan_w_user_table
);
如果子查询返回至少一行,则返回 true。仅当可以在联接和组操作中重写操作时才受支持。
优化程序将 EXISTS 操作重写为联接和组操作。对于流式处理查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。您可以为查询配置提供适当的状态生存时间 (TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。有关详细信息,请参阅查询配置。
ORDER BY 子句使结果行根据指定的表达式进行排序。 如果两行根据最左边的表达式相等,则根据下一个表达式进行比较,依此类推。 如果根据所有指定的表达式它们相等,则它们以与实现相关的顺序返回。
在流模式下运行时,表的主要排序顺序必须按时间属性升序。 所有后续的 orders 都可以自由选择。 但是批处理模式没有这个限制。
------表结构
Flink SQL> desc alan_fact_order_table2;
+----------+-----------------------------+-------+-----+---------------+-----------+
| name | type | null | key | extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
| o_id | STRING | true | | | |
| o_amount | DOUBLE | true | | | |
| u_id | BIGINT | true | | | |
| item_id | BIGINT | true | | | |
| action | STRING | true | | | |
| ts | BIGINT | true | | | |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | AS PROCTIME() | |
+----------+-----------------------------+-------+-----+---------------+-----------+
7 rows in set
-------表内数据
Flink SQL> select * from alan_fact_order_table2 ;
+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+----------------------+-------------------------+
| op | o_id | o_amount | u_id | item_id | action | ts | proctime |
+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+----------------------+-------------------------+
| +I | 1 | 123.34 | 1 | 8001 | 'b' | 1693887925763 | 2023-09-08 08:09:38.579 |
| +I | 30 | 41.34 | 5 | 7001 | 'c' | 1693874222274 | 2023-09-08 08:09:38.579 |
| +I | 30 | 41.34 | 5 | 7001 | 'c' | 1693887926780 | 2023-09-08 08:09:38.579 |
| +I | 20 | 321.34 | 3 | 9001 | 'a' | 1693887928801 | 2023-09-08 08:09:38.579 |
| +I | 50 | 666.66 | 2 | 3001 | 'd' | 1693887927790 | 2023-09-08 08:09:38.579 |
--------排序
Flink SQL> select o_id ,o_amount,u_id,ts,proctime from alan_fact_order_table2 order by proctime,o_id desc;
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op | o_id | o_amount | u_id | ts | proctime |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I | 50 | 666.66 | 2 | 1693887927790 | 2023-09-08 08:11:32.712 |
| +I | 30 | 41.34 | 5 | 1693874222274 | 2023-09-08 08:11:32.712 |
| +I | 30 | 41.34 | 5 | 1693887926780 | 2023-09-08 08:11:32.712 |
| +I | 20 | 321.34 | 3 | 1693887928801 | 2023-09-08 08:11:32.712 |
| +I | 1 | 123.34 | 1 | 1693887925763 | 2023-09-08 08:11:32.712 |
注意:排序字段内必须要包含有时间属性的字段,在有时间属性字段的基础上可以带上其他的字段或不带都可以,否则会出现如下提示
Flink SQL> select o_id ,o_amount,u_id,proctime from alan_fact_order_table2 order by o_id desc;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
LIMIT 子句限制 SELECT 语句返回的行数。 通常,此子句与 ORDER BY 结合使用,以确保结果是确定性的。
以下示例选择 alan_fact_order_table2 表中的前 3 行。
-----表内全部数据
Flink SQL> select o_id ,o_amount,u_id,proctime from alan_fact_order_table2 ;
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| op | o_id | o_amount | u_id | proctime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| +I | 1 | 123.34 | 1 | 2023-09-08 08:16:28.791 |
| +I | 30 | 41.34 | 5 | 2023-09-08 08:16:28.791 |
| +I | 30 | 41.34 | 5 | 2023-09-08 08:16:28.791 |
| +I | 20 | 321.34 | 3 | 2023-09-08 08:16:28.791 |
| +I | 50 | 666.66 | 2 | 2023-09-08 08:16:28.791 |
-----查询表内前三行数据
Flink SQL> select o_id ,o_amount,u_id,proctime from alan_fact_order_table2 limit 3;
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| op | o_id | o_amount | u_id | proctime |
+----+--------------------------------+--------------------------------+----------------------+-------------------------+
| +I | 1 | 123.34 | 1 | 2023-09-08 08:15:27.611 |
| +I | 30 | 41.34 | 5 | 2023-09-08 08:15:27.612 |
| +I | 30 | 41.34 | 5 | 2023-09-08 08:15:27.612 |
重复数据删除会删除在一组列上重复的行,仅保留第一列或最后一列。在某些情况下,上游 ETL 作业不是 end-to-end exactly-once;这可能会导致在故障转移时接收器中出现重复记录。但是,重复的记录会影响下游分析作业(例如 SUM、COUNT)的正确性,因此在进一步分析之前需要进行重复数据删除。
Flink 使用 ROW_NUMBER() 删除重复项,就像 Top-N 查询的方式一样。理论上,重复数据删除是 Top-N 的一种特例,其中 N 是一个,并按处理时间或事件时间排序。
下面显示了重复数据删除语句的语法:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
必须严格遵循上述模式,否则优化程序将无法转换查询。
-----1、表结构
Flink SQL> desc alan_fact_order_table2;
+----------+-----------------------------+-------+-----+---------------+-----------+
| name | type | null | key | extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
| o_id | STRING | true | | | |
| o_amount | DOUBLE | true | | | |
| u_id | BIGINT | true | | | |
| item_id | BIGINT | true | | | |
| action | STRING | true | | | |
| ts | BIGINT | true | | | |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | AS PROCTIME() | |
+----------+-----------------------------+-------+-----+---------------+-----------+
7 rows in set
-----2、表内全部数据-示例
Flink SQL> select * from alan_fact_order_table2 ;
+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+----------------------+-------------------------+
| op | o_id | o_amount | u_id | item_id | action | ts | proctime |
+----+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+----------------------+-------------------------+
| +I | 1 | 123.34 | 1 | 8001 | 'b' | 1693887925763 | 2023-09-08 08:09:38.579 |
| +I | 30 | 41.34 | 5 | 7001 | 'c' | 1693874222274 | 2023-09-08 08:09:38.579 |
| +I | 30 | 41.34 | 5 | 7001 | 'c' | 1693887926780 | 2023-09-08 08:09:38.579 |
| +I | 20 | 321.34 | 3 | 9001 | 'a' | 1693887928801 | 2023-09-08 08:09:38.579 |
| +I | 50 | 666.66 | 2 | 3001 | 'd' | 1693887927790 | 2023-09-08 08:09:38.579 |
----3、去重示例
Flink SQL> SELECT o_id, u_id, proctime, action
> FROM (
> SELECT *,
> ROW_NUMBER() OVER (PARTITION BY o_id ORDER BY proctime ASC) AS row_num
> FROM alan_fact_order_table2)
> WHERE row_num = 1 ;
+----+--------------------------------+----------------------+-------------------------+--------------------------------+
| op | o_id | u_id | proctime | action |
+----+--------------------------------+----------------------+-------------------------+--------------------------------+
| +I | 1 | 1 | 2023-09-08 08:26:21.137 | 'b' |
| +I | 30 | 5 | 2023-09-08 08:26:21.138 | 'c' |
| +I | 20 | 3 | 2023-09-08 08:26:21.138 | 'a' |
| +I | 50 | 2 | 2023-09-08 08:26:21.138 | 'd' |
以上,介绍了Flink 的select、where、distinct、order by、limit、集合操作以及去重及具体的运行示例。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。