当前位置:   article > 正文

Flink中Table API和SQL 完整使用上 (第十一章)_tochangelogstream

tochangelogstream

Flink中Table API和SQL 完整使用

一、介绍Flink中Table API和SQL

在Flink提供的多层级API中,核心是DataStream API,这是我们开发流处理应用的基本途径;底层则是所谓的处理函数(process function),可以访问事件的时间信息、注册定时器、自定义状态,进行有状态的流处理。DataStream API和处理函数比较通用,有了这些API,理论上我们就可以实现所有场景的需求了。

不过在企业实际应用中,往往会面对大量类似的处理逻辑,所以一般会将底层API包装成更加具体的应用级接口。怎样的接口风格最容易让大家接收呢?作为大数据工程师,我们最为熟悉的数据统计方式,当然就是写SQL了。

SQL是结构化查询语言(Structured Query Language)的缩写,是我们对关系型数据库进行查询和修改的通用编程语言。在关系型数据库中,数据是以表(table)的形式组织起来的,所以也可以认为SQL是用来对表进行处理的工具语言。无论是传统架构中进行数据存储的MySQL、PostgreSQL,还是大数据应用中的Hive,都少不了SQL的身影;而Spark作为大数据处理引擎,为了更好地支持在Hive中的SQL查询,也提供了Spark SQL作为入口。

Flink同样提供了对于“表”处理的支持,这就是更高层级的应用API,在Flink中被称为Table API和SQL。Table API顾名思义,就是基于“表”(Table)的一套API,它是内嵌在Java、Scala等语言中的一种声明式领域特定语言(DSL),也就是专门为处理表而设计的;在此基础上,Flink还基于Apache Calcite实现了对SQL的支持。这样一来,我们就可以在Flink程序中直接写SQL来实现处理需求了。

在这里插入图片描述

在Flink中这两种API被集成在一起,SQL执行的对象也是Flink中的表(Table),所以我们一般会认为它们是一体的,本章会放在一起进行介绍。Flink是批流统一的处理框架,无论是批处理(DataSet API)还是流处理(DataStream API),在上层应用中都可以直接使用Table API或者SQL来实现;这两种API对于一张表执行相同的查询操作,得到的结果是完全一样的。我们主要还是以流处理应用为例进行讲解。

需要说明的是,Table API和SQL最初并不完善,在Flink 1.9版本合并阿里巴巴内部版本Blink之后发生了非常大的改变此后也一直处在快速开发和完善的过程中,直到Flink 1.12版本才基本上做到了功能上的完善。而即使是在目前最新的1.13版本中,Table API和SQL也依然不算稳定,接口用法还在不停调整和更新。所以这部分希望大家重在理解原理和基本用法,具体的API调用可以随时关注官网的更新变化。

二、快速上手

如果我们对关系型数据库和SQL非常熟悉,那么Table API和SQL的使用其实非常简单:只要得到一个“表”(Table),然后对它调用Table API,或者直接写SQL就可以了。接下来我们就以一个非常简单的例子上手,初步了解一下这种高层级API的使用方法。

1、需要引入的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。

如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:

flink-table-planner-blink 计划器整个flink table的核心、主要就是提供整个运行时环境、负责生成应用的执行的计划

<dependency>
    <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
     <!-- planner-blink 用到了scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里主要添加的依赖是一个“计划器”(planner),它是Table API的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的blink planner。由于Flink安装包的lib目录下会自带planner,所以在生产集群环境中提交的作业不需要打包这个依赖。
而在Table API的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个Scala版流处理的相关依赖。

另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2、简单示例

有了基本的依赖,接下来我们就可以尝试在Flink代码中使用Table API和SQL了。比如,我们可以自定义一些Event类型(包含了user、url和timestamp三个字段,参考5.2.1小节中的定义)的用户访问事件,作为输入的数据源;而后从中提取url地址和用户名user两个字段作为输出。

如果使用DataStream API,我们可以直接读取数据源后,用一个简单转换算子map来做字段的提取。而这个需求直接写SQL的话,实现会更加简单:
select url, user from EventTable;

这里我们把流中所有数据组成的表叫作EventTable。在Flink代码中直接对这个表执行上面的SQL,就可以得到想要提取的数据了。

package com.example.chapter11;

import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 快速上手
 * SQL
 * Table API
 */
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //将DataStream转换Table
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4、直接写SQL转换
        Table resultTable = tableEnv.sqlQuery("select user, url, `timestamp` from " + eventTable);

        //Table API
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("Alice"));

        //Table 不能打印------------最简单转换成流输出
        tableEnv.toDataStream(resultTable).print("result");
        tableEnv.toDataStream(resultTable2).print("result2");

        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

这里我们需要创建一个“表环境”(TableEnvironment),然后将数据流(DataStream)转换成一个表(Table);之后就可以执行SQL在这个表中查询数据了。查询得到的结果依然是一个表,把它重新转换成流就可以打印输出了。
代码执行的结果如下:

+I[./home, Alice]
+I[./cart, Bob]
+I[./prod?id=1, Alice]
+I[./home, Cary]
+I[./prod?id=3, Bob]
+I[./prod?id=7, Alice]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以看到,我们将原始的Event数据转换成了(url,user)这样类似二元组的类型。每行输出前面有一个“+I”标志,这是表示每条数据都是“插入”(Insert)到表中的新增数据。

Table是Table API中的核心接口类,对应着我们熟悉的“表”的概念。基于Table我们也可以调用一系列查询方法直接进行转换,这就是所谓Table API的处理方式:

// 用Table API 方式提取数据
Table clickTable2 = eventTable.select($("url"), $("user"));
  • 1
  • 2

这里的$符号是Table API中定义的“表达式”类Expressions中的一个方法,传入一个字段名称,就可以指代数据中对应字段。将得到的表转换成流打印输出,会发现结果与直接执行SQL完全一样。

三、基本API

通过上节中的简单示例,我们已经对Table API和SQL的用法有了大致的了解;本节就继续展开,对API的相关用法做一个详细的说明。

1、程序架构

在Flink中,Table API和SQL可以看作联结在一起的一套API,这套API的核心概念就是“表”(Table)。在我们的程序中,输入数据可以定义成一张表;然后对这张表进行查询,就可以得到新的表,这相当于就是流数据的转换操作;最后还可以定义一张用于输出的表,负责将处理结果写入到外部系统。

我们可以看到,程序的整体处理流程与DataStream API非常相似,也可以分为读取数据源(Source)、转换(Transform)、输出数据(Sink)三部分;只不过这里的输入输出操作不需要额外定义,只需要将用于输入和输出的表定义出来,然后进行转换查询就可以了。

程序基本架构如下

// 创建表环境
TableEnvironment tableEnv = ...;
 
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");

// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 执行SQL对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");

// 使用Table API对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);

// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

与上一节中不同,这里不是从一个DataStream转换成Table,而是通过执行DDL来直接创建一个表。这里执行的CREATE语句中用WITH指定了外部系统的连接器,于是就可以连接外部系统读取数据了。这其实是更加一般化的程序架构,因为这样我们就可以完全抛开DataStream API,直接用SQL语句实现全部的流处理过程。

而后面对于输出表的定义是完全一样的。可以发现,在创建表的过程中,其实并不区分“输入”还是“输出”,只需要将这个表“注册”进来、连接到外部系统就可以了;这里的inputTable、outputTable只是注册的表名,并不代表处理逻辑,可以随意更换。至于表的具体作用,则要等到执行后面的查询转换操作时才能明确。我们直接从inputTable中查询数据,那么inputTable就是输入表;而outputTable会接收另外表的结果进行写入,那么就是输出表。

在早期的版本中,有专门的用于输入输出的TableSource和TableSink,这与流处理里的概念是一一对应的;不过这种方式与关系型表和SQL的使用习惯不符,所以已被弃用,不再区分Source和Sink。

2、创建表环境

对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)。它主要负责:

1)注册Catalog和表;
(2)执行 SQL 查询;
(3)注册用户自定义函数(UDF);
(4DataStream 和表之间的转换。
  • 1
  • 2
  • 3
  • 4

这里的Catalog就是“目录”,与标准SQL中的概念是一致的,主要用来管理所有数据库(database)和表(table)的元数据(metadata)。通过Catalog可以方便地对数据库和表进行查询的管理,所以可以认为我们所定义的表都会“挂靠”在某个目录下,这样就可以快速检索。在表环境中可以由用户自定义Catalog,并在其中注册表和自定义函数(UDF)。默认的Catalog就叫作default_catalog。

每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使用blink planner。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()    // 使用流处理模式
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这里我们引入了一个“流式表环境”(StreamTableEnvironment),它是继承自TableEnvironment的子接口。调用它的create()方法,只需要直接将当前的流执行环境(StreamExecutionEnvironment)传入, 就可以创建出对应的流式表环境了。这也正是我们在上一节简单示例中使用的方式。

3、创建表

表(Table)是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。Flink中的表概念也并不特殊,是由多个“行”数据构成的,每个行(Row)又可以有定义好的多个列(Column)字段;整体来看,表就是固定类型的数据组成的二维矩阵。

为了方便地查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过Catalog来进行注册创建的。表在环境中有一个唯一的ID,由三部分组成:目录(catalog)名,数据库(database)名,以及表名。在默认情况下,目录名为default_catalog,数据库名为default_database。所以如果我们直接创建一个叫作MyTable的表,它的ID就是:

default_catalog.default_database.MyTable
  • 1

具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。

1. 连接器表(Connector Tables)

最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。例如我们可以连接到Kafka或者文件系统,将存储在这些外部系统的数据以“表”的形式定义出来这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换;而当我们向这张表写入数据,连接器就会将数据输出(Sink)到外部系统中。

在代码中,我们可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。这里我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
  • 1

这里没有定义Catalog和Database,所以都是默认的,表的完整ID就是default_catalog.default_database.MyTable。如果希望使用自定义的目录名和库名,可以在环境中进行设置:

tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
  • 1
  • 2

这样我们创建的表完整ID就变成了custom_catalog.custom_database.MyTable。之后在表环境中创建的所有表,ID也会都以custom_catalog.custom_database作为前缀。

2. 虚拟表(Virtual Tables)

在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
  • 1

这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。

得到的newTable是一个中间转换结果,如果之后又希望直接使用这个表执行SQL,又该怎么做呢?由于newTable是一个Table对象,并没有在表环境中注册;所以我们还需要将这个中间结果表注册到环境中,才能在SQL中使用:

tableEnv.createTemporaryView("NewTable", newTable);
  • 1

我们发现,这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图”(createTemporaryView)。视图之所以是“虚拟”的,是因为我们并不会直接保存这个表的内容,并没有“实体”;只是在用到这张表的时候,会将它对应的查询语句嵌入到SQL中。

注册为虚拟表之后,我们就又可以在SQL中直接使用NewTable进行查询转换了。不难看到,通过虚拟表可以非常方便地让SQL分步骤执行得到中间结果,这为代码编写提供了很大的便利。

另外,虚拟表也可以让我们在Table API和SQL之间进行自由切换。一个Java中的Table对象可以直接调用Table API中定义好的查询转换方法,得到一个中间结果表;这跟对注册好的表直接执行SQL结果是一样的。具体我们会在下一小节继续讲解。

4、表的查询

创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。
Flink为我们提供了两种查询方式:SQL和Table API。

1. 执行SQL进行查询

基于表执行SQL语句,是我们最为熟悉的查询方式。Flink基于Apache Calcite来提供对SQL的支持,Calcite是一个为不同的计算平台提供标准SQL查询的底层工具,很多大数据框架比如Apache Hive、Apache Kylin中的SQL支持都是通过集成Calcite来实现的。

在代码中,我们只要调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象

// 创建表环境
        /**
         * 方式二
         * flink流处理环境
         */
        //1、定义环境配置来创建表执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() //使用流处理模式
                .useBlinkPlanner()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2、示例想使用函数统计

我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:

Table urlCountTable = tableEnv.sqlQuery(
    "SELECT user, COUNT(url) " +
    "FROM EventTable " +
    "GROUP BY user "
  );
  • 1
  • 2
  • 3
  • 4
  • 5

上面的例子得到的是一个新的Table对象我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:

// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
  • 1
  • 2
  • 3
// 将查询结果输出到OutputTable中
tableEnv.executeSql (
"INSERT INTO OutputTable " +
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3. 调用Table API进行查询

另外一种查询方式就是调用Table API。这是嵌入在Java和Scala语言内的查询API,核心就是Table接口类,通过一步步链式调用Table的方法,就可以定义出所有的查询转换操作。每一步方法调用的返回结果,都是一个Table。

由于Table API是基于Table的Java实例进行调用的,因此我们首先要得到表的Java对象。基于环境中已注册的表,可以通过表环境的from()方法非常容易地得到一个Table对象

Table eventTable = tableEnv.from("EventTable");
  • 1

传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:

Table maryClickTable = eventTable
        .where($("user").isEqual("Alice"))
        .select($("url"), $("user"));
  • 1
  • 2
  • 3

这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明了想要表达的内容;“$”符号用来指定表中的一个字段。上面的代码和直接执行SQL是等效的。

Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。目前Table API支持的功能相对更少,可以预见未来Flink社区也会以扩展SQL为主,为大家提供更加通用的接口方式;所以我们接下来也会以介绍SQL为主,简略地提及Table API。

5. 两种API的结合使用

可以发现,无论是调用Table API还是执行SQL,得到的结果都是一个Table对象;所以这两种API的查询可以很方便地结合在一起。

(1)无论是那种方式得到的Table对象,都可以继续调用Table API进行查询转换;
(2)如果想要对一个表执行SQL操作(用FROM关键字引用),必须先在环境中对它进行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:
注意:这里的第一个参数"MyTable"是注册的表名,而第二个参数myTable是Java中的Table对象。

另外要说明的是,在11.1.2小节的简单示例中,我们并没有将Table对象注册为虚拟表就直接在SQL中使用了:

Table clickTable = tableEnvironment.sqlQuery("select url, user from " + eventTable);
  • 1

这其实是一种简略的写法,我们将Table对象名eventTable直接以字符串拼接的形式添加到SQL语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤。

两种API殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而Table API功能相对较少、通用性较差,所以企业项目中往往会直接选择SQL的方式来实现需求。

5、输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。

在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 经过查询转换,得到结果表
Table result = ...

// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在底层,表的输出是通过将数据写入到TableSink来实现的TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、HBase、Elasticsearch)和消息队列(比如Kafka)。它有些类似于DataStream API中调用addSink()方法时传入的SinkFunction,有不同的连接器对它进行了实现。关于不同外部系统的连接器,我们会在11.8节展开介绍。

这里可以发现,我们在环境中注册的“表”,其实在写入数据的时候就对应着一个TableSink。

6、表和流的转换

从创建表环境开始,历经表的创建、查询转换和输出,我们已经可以使用Table API和SQL进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接写入到外部系统,而是在本地控制台打印输出。对于DataStream这非常容易,直接调用print()方法就可以看到结果数据流的内容了;但对于Table就比较悲剧——它没有提供print()方法。这该怎么办呢?

在Flink中我们可以将Table再转换成DataStream,然后进行打印输出。这就涉及了表和流的转换。

1. 将表(Table)转换成流(DataStream)

(1)调用toDataStream()方法

将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将11.2.4小节经查询转换得到的表maryClickTable转换成流打印输出,这代表了“Mary点击的url列表”:

Table aliceVisitTable = tableEnv.sqlQuery(
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );

// 将表转换成数据流
tableEnv.toDataStream(aliceVisitTable).print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这里需要将要转换的Table对象作为参数传入。

(2)调用toChangelogStream()方法

将maryClickTable转换成流打印输出是很简单的;然而,如果我们同样希望将“用户点击次数统计”表urlCountTable进行打印输出,就会抛出一个TableException异常:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't 
support consuming update changes ...
  • 1
  • 2
  • 3

这表示当前的TableSink并不支持表的更新(update)操作。这是什么意思呢?

因为print本身也可以看作一个Sink操作,所以这个异常就是说打印输出的Sink操作不支持对数据进行更新。具体来说,urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。也就是说,Alice的第一个点击事件到来,表中会有一行(Alice, 1);第二个点击事件到来,这一行就要更新为(Alice, 2)。但之前的(Alice, 1)已经打印输出了,“覆水难收”,我们怎么能对它进行更改呢?所以就会抛出异常。

解决的思路是,对于这样有更新操作的表,我们不要试图直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中需要调用的是表环境的toChangelogStream()方法:

Table urlCountTable = tableEnv.sqlQuery(
    "SELECT user, COUNT(url) " +
    "FROM EventTable " +
    "GROUP BY user "
  );

// 将表转换成更新日志流
tableEnv.toDataStream(urlCountTable).print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

与“更新日志流”(Changelog Streams)对应的,是那些只做了简单转换、没有进行聚合统计的表,例如前面提到的maryClickTable。``它们的特点是数据只会插入、不会更新,所以也被叫作“仅插入流”(Insert-Only Streams)

2. 将流(DataStream)转换成表(Table)

(1)调用fromDataStream()方法

想要将一个DataStream转换成表也很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。例如,我们可以直接将事件流eventStream转换成一个表:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

// 将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。

另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"), $("url"));
  • 1
  • 2

需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"), $("url"));
  • 1
  • 2
(2)调用createTemporaryView()方法

调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。

对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("EventTable", eventStream, $("timestamp").as("ts"),$("url"));
  • 1

这样,我们接下来就可以直接在SQL中引用表EventTable了

(3)调用fromChangelogStream ()方法

表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前行的更新类型(RowKind);所以一般是由连接器帮我们实现的,直接应用比较少见,感兴趣的读者可以查看官网的文档说明。

3. 支持的数据类型

前面示例中的DataStream,流中的数据类型都是定义好的POJO类。如果DataStream中的类型是简单的基本类型,还可以直接转换成表吗?这就涉及了Table中支持的数据类型。

整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。

(1)原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(2)Tuple类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。

Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long, Integer>> stream = ...;

// 将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));

// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

(3)POJO 类型

Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。

将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名,这在之前的例子中已经有过体现。

StreamTableEnvironment tableEnv = ...;

DataStream<Event> stream = ...;

Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

(4)Row类型

Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。除此之外,Row类型还附加了一个属性RowKind,用来表示当前行在更新操作中的类型。这样,Row就可以用来表示更新日志流(changelog stream)中的数据,从而架起了Flink中流和表的转换桥梁。

所以在更新日志流中,元素的类型必须是Row,而且需要调用ofKind()方法来指定更新类型。下面是一个具体的例子:

DataStream<Row> dataStream =
    env.fromElements(
        Row.ofKind(RowKind.INSERT, "Alice", 12),
        Row.ofKind(RowKind.INSERT, "Bob", 5),
        Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
        Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4. 综合应用示例

现在,我们可以将介绍过的所有API整合起来,写出一段完整的代码。同样还是用户的一组点击事件,我们可以查询出某个用户(例如Alice)点击的url列表,也可以统计出每个用户累计的点击次数,这可以用两句SQL来分别实现。具体代码如下:

package com.example.chapter11;

import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 快速上手
 * SQL
 * Table API
 */
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setParallelism(1);
        //读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //将DataStream转换Table
         tableEnv.createTemporaryView("clickTable", eventTable);
         
        //Table API --查询Alice的访问url列表
       Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Alice'");
                
                // 统计每个用户的点击次数
        Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM EventTable GROUP BY user");

        //Table 不能打印------------最简单转换成流输出
        tableEnv.toDataStream(resultTable2 ).print("result");
    
        //toChangelogStream 更新日志流--将表转成流 
        tableEnv.toChangelogStream(aggResult).print("agg");

        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

用户Alice的点击url列表只需要一个简单的条件查询就可以得到,对应的表中只有插入操作,所以我们可以直接调用toDataStream()将它转换成数据流,然后打印输出。控制台输出的结果如下:

alice visit > +I[./home, Alice]
alice visit > +I[./prod?id=1, Alice]
alice visit > +I[./prod?id=7, Alice]
  • 1
  • 2
  • 3

这里每条数据前缀的+I就是RowKind,表示INSERT(插入)。

而由于统计点击次数时用到了分组聚合,造成结果表中数据会有更新操作,所以在打印输出时需要将表urlCountTable转换成更新日志流(changelog stream)。控制台输出的结果如下:

count> +I[Alice, 1]
count> +I[Bob, 1]
count> -U[Alice, 1]
count> +U[Alice, 2]
count> +I[Cary, 1]
count> -U[Bob, 1]
count> +U[Bob, 2]
count> -U[Alice, 2]
count> +U[Alice, 3]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里数据的前缀出现了+I、-U和+U三种RowKind,分别表示INSERT(插入)、UPDATE_BEFORE(更新前)和UPDATE_AFTER(更新后)。当收到每个用户的第一次点击事件时,会在表中插入一条数据,例如+I[Alice, 1]、+I[Bob, 1]。而之后每当用户增加一次点击事件,就会带来一次更新操作,更新日志流(changelog stream)中对应会出现两条数据,分别表示之前数据的失效和新数据的生效;例如当Alice的第二条点击数据到来时,会出现一个-U[Alice, 1]和一个+U[Alice, 2],表示Alice的点击个数从1变成了2。

这种表示更新日志的方式,有点像是声明“撤回”了之前的一条数据、再插入一条更新后的数据,所以也叫作“撤回流”(Retract Stream)。关于表到流转换过程的编码方式,我们会在下节进行更深入的讨论。

5、案例实操

1. 代码1

package com.example.chapter11;

import com.example.chapter05.ClickSource;
import com.example.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 快速上手
 * SQL
 * Table API
 */
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取数据,得到DataStream
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //将DataStream转换Table
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4、直接写SQL转换
        Table resultTable = tableEnv.sqlQuery("select user, url, `timestamp` from " + eventTable);

        //Table API
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("Alice"));

        //Table 不能打印------------最简单转换成流输出
        tableEnv.toDataStream(resultTable).print("result");
        tableEnv.toDataStream(resultTable2).print("result2");


        tableEnv.createTemporaryView("clickTable", eventTable);
        Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");
        //toChangelogStream 更新日志流
        tableEnv.toChangelogStream(aggResult).print("agg");

        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

2. 代码2

package com.example.chapter11;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;


/**
 * 1、创建表的执行环境
 * 2、注册表
 * 3、转换注册表为Table对象
 * 4、为Table对象增加查询条件
 * 5、生成新的Table对象
 * 6、可以在次放入当前环境供后续SQL使用
 *
 * -----使用TableAPI 使用SQL 都可以得到Table对象
 *
 * --------使用TableAPI
 * --------使用SQL
 * --------两者混用
 */
public class CommonApiTest {
    public static void main(String[] args) {


        /**
         * 方式一
         */
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env);  //默认什么都不传递的话就是流处理、


        /**
         * 方式二
         * flink流处理环境
         */
        //1、定义环境配置来创建表执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() //使用流处理模式
                .useBlinkPlanner()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        /**
         * 方式三
         * 基于老版本planner进行流处理
         */
//        EnvironmentSettings settings1 = EnvironmentSettings.newInstance()
//                .inStreamingMode() //使用流处理模式
//                .useOldPlanner()
//                .build();
//
//        TableEnvironment tableEnv1 = TableEnvironment.create(settings1);


        /**
         * 方式三.1
         * 基于老版本planner进行批处理
         */

//        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
//        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);


        /**
         * 方式三.2
         * 基于blink版本planner进行批处理
         */

//        EnvironmentSettings setting3 = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .useAnyPlanner()
//                .build();
//
//        TableEnvironment tableEnv3 = TableEnvironment.create(setting3);


        // 2、创建表
        String createDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector'= 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";


        //executeSql 注册表
        tableEnv.executeSql(createDDL);

        //得到Table对象
        // Table java 对象
        // clickTable 是真正注册到表环境里的
        Table clickTable = tableEnv.from("clickTable");

        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));

        //如果还想把这个table放入到当前环境中、直接用在后续的SQL里面
        tableEnv.createTemporaryView("Result2", resultTable);


        //执行SQL进行表的查询转换
        Table resultTable2 = tableEnv.sqlQuery("select user_name,url from Result2");

        // 创建一张用于输出的表
        String createOutDDL = "CREATE TABLE outTable (" +
                "url STRING, " +
                "user_name STRING " +
                ") WITH (" +
                "'connector'= 'filesystem'," +
                "'path' = 'output'," +
                "'format' = 'csv'" +
                ")";


        tableEnv.executeSql(createOutDDL);



        //创建一张用于控制台打印输出的表
        String creatPrintOutDDL = "CREATE TABLE printOutTable (" +
                "url STRING, " +
                "user_name STRING " +
                ") WITH (" +
                "'connector'= 'print'" +
                ")";

        tableEnv.executeSql(creatPrintOutDDL);


        //输出表
        resultTable.executeInsert("outTable");

        //输出控制台
        resultTable2.executeInsert("printOutTable");

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147

3. 代码3

package com.example.chapter11;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;


/**
 * TODO 统计每个用户的PV值
 * 1、创建表的执行环境
 * 2、注册表
 * 3、转换注册表为Table对象
 * 4、为Table对象增加查询条件
 * 5、生成新的Table对象
 * 6、输出
 */
public class CommonApiTestCount {
    public static void main(String[] args) {


        /**
         * 方式二
         * flink流处理环境
         */
        //1、定义环境配置来创建表执行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode() //使用流处理模式
                .useBlinkPlanner()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        // 2、创建表
        String createDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector'= 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        //executeSql 注册表
        tableEnv.executeSql(createDDL);

        //针对每个用户、统计当前他到底点击了多少次---统计每个用户的PV
        Table aggResult = tableEnv.sqlQuery("select user_name,count(url) as cnt from clickTable group by user_name");

        //创建一张用于控制台打印输出的a每个用户的PV
        String creatPrintOutDDLUserPV = "CREATE TABLE printOutTable (" +
                " user_name STRING, " +
                " cnt BIGINT " +
                ") WITH (" +
                " 'connector'= 'print'" +
                ")";

        TableResult result = tableEnv.executeSql(creatPrintOutDDLUserPV);

        //输出每个用户的PV打印控制台
        aggResult.executeInsert("printOutTable");

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

四、流处理中的表

Table API和SQL的基本使用方法。我们会发现,在Flink中使用表和SQL基本上跟其它场景是一样的;不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有“仅插入流”(Insert-Only Streams)“更新日志流”(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新(update)操作。

这种麻烦其实是不可避免的。我们知道,Table API和SQL本质上都是基于关系型表的操作方式;而关系型表(Table)本身是有界的,更适合批处理的场景。所以在MySQL、Hive这样的固定数据集中进行查询,使用SQL就会显得得心应手。而对于Flink这样的流处理框架来说,要处理的是源源不断到来的无界数据流,我们无法等到数据都到齐再做查询,每来一条数据就应该更新一次结果;这时如果一定要使用表和SQL进行处理,就会显得有些别扭了,需要引入一些特殊的概念。

我们可以将关系型表/SQL与流处理做一个对比,如表11-1所示。

/关系型/SQL流处理
处理的数据对象字段元组的有界集合字段元组的无限序列
查询(Query)可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
对数据的访问
查询终止条件生成固定大小的结果集后终止永不停止,根据持续收到的数据不断更新查询结果

可以看到,其实关系型表和SQL,主要就是针对批处理设计的,这和流处理有着天生的隔阂。既然“八字不合”,那Flink中的Table API和SQL又是怎样做流处理的呢?接下来我们就来深入探讨一下流处理中表的概念。

1、动态表和持续查询

流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。

如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;如果进一步基于表执行SQL查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。

1. 动态表(Dynamic Tables)

当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。

其实动态表的概念,我们在传统的关系型数据库中已经有所接触。数据库中的表,其实是一系列INSERT、UPDATE和DELETE语句执行的结果;在关系型数据库中,我们一般把它称为更新日志流(changelog stream)。如果我们保存了表在某一时刻的快照(snapshot),那么接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型数据库(比如Oracle、DB2)中都有“物化视图”(Materialized Views)的概念,可以用来缓存SQL查询的结果;它的更新其实就是不停地处理更新日志流的过程。

Flink中的动态表,就借鉴了物化视图的思想。

2. 持续查询(Continuous Query)

动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。

描述了持续查询的过程。这里我们也可以清晰地看到流、动态表和持续查询的关系:

在这里插入图片描述
持续查询的步骤如下:
(1)流(stream)被转换为动态表(dynamic table);
(2)对动态表进行持续查询(continuous query),生成新的动态表;
(3)生成的动态表被转换成流。
这样,只要API将流和动态表的转换封装起来,我们就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。

2、将流转换成动态表

为了能够使用SQL来做流处理,我们必须先把流(stream)转换成动态表。当然,之前在讲解基本API时,已经介绍过代码中的DataStream和Table如何转换;现在我们则要抛开具体的数据类型,从原理上理解流和动态表的转换过程。

如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。

// 获取流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1", 5 * 1000L),
                        new Event("Cary", "./home", 60 * 1000L),
                        new Event("Bob", "./prod?id=3", 90 * 1000L),
                        new Event("Alice", "./prod?id=7", 105 * 1000L)
                );

// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 将数据流转换成表
tableEnv.createTemporaryView("EventTable", eventStream, $("user"), $("url"), $("timestamp").as("ts"));
    
// 统计每个用户的点击次数
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

// 将表转换成数据流,在控制台打印输出
tableEnv.toChangelogStream(urlCountTable).print("count");
        
// 执行程序
env.execute();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

我们现在的输入数据,就是用户在网站上的点击访问行为,数据类型被包装为POJO类型Event。我们将它转换成一个动态表,注册为EventTable。表中的字段定义如下:

[
  user:  VARCHAR,   // 用户名
  url:   VARCHAR,    // 用户访问的URL
  ts: BIGINT    // 时间戳
]
  • 1
  • 2
  • 3
  • 4
  • 5

如图11-3所示,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。

在这里插入图片描述

3、用SQL持续查询

1. 更新(Update)查询

我们在代码中定义了一个SQL查询。

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
  • 1

这个查询很简单,主要是分组聚合统计每个用户的点击次数。我们把原始的动态表注册为EventTable,经过查询转换后得到urlCountTable;这个结果动态表中包含两个字段,具体定义如下:

[
  user:  VARCHAR,   // 用户名
  cnt:   BIGINT    // 用户访问url的次数
]
  • 1
  • 2
  • 3
  • 4

当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。换句话说,用来定义结果表的更新日志(changelog)流中,包含了INSERT和UPDATE两种操作。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。

在这里插入图片描述

具体步骤解释如下:

(1)当查询启动时,原始动态表EventTable为空;

(2)当第一行Alice的点击数据插入EventTable表时,查询开始计算结果表,urlCountTable中插入一行数据[Alice,1]。

(3)当第二行Bob点击数据插入EventTable表时,查询将更新结果表并插入新行[Bob,1]。

(4)第三行数据到来,同样是Alice的点击事件,这时不会插入新行,而是生成一个针对已有行的更新操作。这样,结果表中第一行[Alice,1]就更新为[Alice,2]。

(5)当第四行Cary的点击数据插入到EventTable表时,查询将第三行[Cary,1]插入到结果表中。

2. 追加(Append)查询

上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。

Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");
  • 1

这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。追加查询得到的结果表,转换成DataStream调用方法没有限制,可以直接用toDataStream(),也可以像更新查询一样调用toChangelogStream()。

这样看来,我们似乎可以总结一个规律:只要用到了聚合,在之前的结果上有叠加,就会产生更新操作,就是一个更新查询。但事实上,更新查询的判断标准是结果表中的数据是否会有UPDATE操作,如果聚合的结果不再改变,那么同样也不是更新查询。

什么时候聚合的结果会保持不变呢?一个典型的例子就是窗口聚合。

我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个endT字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:

[
  user:  VARCHAR,   // 用户名
  endT:  TIMESTAMP,  // 窗口结束时间
  cnt:   BIGINT    // 用户访问url的次数
]
  • 1
  • 2
  • 3
  • 4
  • 5

与之前的分组聚合一样,当原始动态表不停地插入新的数据时,查询得到的结果result会持续地进行更改。比如时间戳在12:00:00到12:59:59之间的有四条数据,其中Alice三次点击、Bob一次点击;所以当水位线达到13:00:00时窗口关闭,输出到结果表中的就是新增两条数据[Alice, 13:00:00, 3]和[Bob, 13:00:00, 1]。同理,当下一小时的窗口关闭时,也会将统计结果追加到result表后面,而不会更新之前的数据。

在这里插入图片描述

所以我们发现,由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作。所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。

1)代码
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class AppendQueryExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

// 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperator<Event> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1",  25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) 
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

// 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")  
// 将timestamp指定为事件时间,并命名为ts
        );

// 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);

// 设置1小时滚动窗口,执行SQL统计查询
        Table result = tableEnv
                .sqlQuery(
                        "SELECT " +
                                "user, " +
                                "window_end AS endT, " +    // 窗口结束时间
                                "COUNT(url) AS cnt " +    // 统计url访问次数
                        "FROM TABLE( " +
                                "TUMBLE( TABLE EventTable, " +    // 1小时滚动窗口
                                    "DESCRIPTOR(ts), " +
                                    "INTERVAL '1' HOUR)) " +
                        "GROUP BY user, window_start, window_end "
                );

        tableEnv.toDataStream(result).print();

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
+I[Alice, 1970-01-01T01:00, 3]
+I[Bob, 1970-01-01T01:00, 1]
+I[Cary, 1970-01-01T02:00, 2]
+I[Bob, 1970-01-01T02:00, 1]
  • 1
  • 2
  • 3
  • 4

可以看到,所有输出结果都以+I为前缀,表示都是以INSERT操作追加到结果表中的;这是一个追加查询,所以我们直接使用toDataStream()转换成流是没有问题的。这里输出的window_end是一个TIMESTAMP类型;由于我们直接以一个长整型数作为事件发生的时间戳,所以可以看到对应的都是1970年1月1日的时间。

关于Table API和SQL中窗口和聚合查询的使用,我们会在后面详细讲解。

3. 查询限制

在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”,可能是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。

● 状态大小

用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大。例如我们之前举的更新查询的例子,需要记录每个用户访问url的次数。如果随着时间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败。

● 更新计算

对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是RANK()函数,它会基于一组数据计算当前值的排名。例如下面的SQL查询,会根据用户最后一次点击的时间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间(lastAction)就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多越来越严重。

SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
  SELECT user, MAX(ts) AS lastAction FROM EventTable GROUP BY user
);
  • 1
  • 2
  • 3
  • 4

这样的查询操作,就不太适合作为连续查询在流处理中执行。这里RANK()的使用要配合一个OVER子句,这是所谓的“开窗聚合”,我们会在11.5节展开介绍。

4、将动态表转换为流

与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:

● 仅追加(Append-only)流

仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。

● 撤回(Retract)流

撤回流是包含两类消息的流,添加(add)消息和撤回(retract)消息。

具体的编码规则是:INSERT插入操作编码为add消息;DELETE删除操作编码为retract消息;而UPDATE更新操作则编码为被更改行的retract消息,和更新后行(新行)的add消息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。

可以看到,更新操作对于撤回流来说,对应着两个消息:之前数据的撤回(删除)和新数据的插入。
如图11-6所示,显示了将动态表转换为撤回流的过程。

在这里插入图片描述

这里我们用+代表add消息(对应插入INSERT操作),用-代表retract消息(对应删除DELETE操作);当Alice的第一个点击事件到来时,结果表新增一条数据[Alice, 1];而当Alice的第二个点击事件到来时,结果表会将[Alice, 1]更新为[Alice, 2],对应的编码就是删除[Alice, 1]、插入[Alice, 2]。这样当一个外部系统收到这样的两条消息时,就知道是要对Alice的点击统计次数进行更新了。

● 更新插入(Upsert)流

更新插入流中只包含两种类型的消息:更新插入(upsert)消息和删除(delete)消息。

所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。

既然更新插入流中不区分插入(insert)和更新(update),那我们自然会想到一个问题:如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?

这就需要动态表中必须有唯一的键(key)。通过这个key进行查询,如果存在对应的数据就做更新(update),如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能正确地处理消息。

如图11-7所示,显示了将动态表转换为更新插入流的过程。

在这里插入图片描述

可以看到,更新插入流跟撤回流的主要区别在于,更新(update)操作由于有key的存在,只需要用单条消息编码就可以,因此效率更高。

需要注意的是,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流,我们调用toChangelogStream()得到的其实就是撤回流;这也很好理解,DataStream中并没有key的定义,所以只能通过两条消息一减一增来表示更新操作。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。

五、时间属性和窗口

基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。

所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。

时间属性的数据类型为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。

按照时间语义的不同,我们可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。

1、事件时间

我们在实际应用中,最常用的就是事件时间。在事件时间语义下,允许表处理程序根据每个数据中包含的时间戳(也就是事件发生的时间)来生成结果。

事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。

为了处理无序事件,并区分流中的迟到事件。Flink需要从事件数据中提取时间戳,并生成水位线,用来推进事件时间的进展。

事件时间属性可以在创建表DDL中定义,也可以在数据流和表的转换中定义。

1. 在创建表的DDL中定义

在创建表的DDL(CREATE TABLE语句)中,可以增加一个字段,通过WATERMARK语句来定义事件时间属性。WATERMARK语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。具体定义方式如下:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这里我们把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。这里的“5秒”是以“时间间隔”的形式定义的,格式是INTERVAL <数值> <时间单位>:

INTERVAL '5' SECOND
  • 1

这里的数值必须用单引号引起来,而单位用SECOND和SECONDS是等效的。
Flink中支持的事件时间属性数据类型必须为TIMESTAMP或者

TIMESTAMP_LTZ。这里TIMESTAMP_LTZ是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式,那就是不带时区信息的,可以将事件时间属性定义为TIMESTAMP类型。

而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为TIMESTAMP_LTZ会更方便:

CREATE TABLE events (
  user STRING,
  url STRING,
  ts BIGINT,
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里我们另外定义了一个字段ts_ltz,是把长整型的ts转换为TIMESTAMP_LTZ得到的;进而使用WATERMARK语句将它设为事件时间属性,并设置5秒的水位线延迟。

2. 在数据流转换为表时定义

事件时间属性也可以在将DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前DDL中定义的第二种情况;也可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为TIMESTAMP。不论那种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP类型)。

需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在DataStream上定义好了。由于DataStream中没有时区概念,因此Flink 会将事件时间属性解析成不带时区的TIMESTAMP类型,所有的时间值都被当作UTC标准时间。

在代码中的定义方式如下:

// 方法一:
// 流中数据类型为二元组Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());

// 方法二:
// 流中数据类型为三元组Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、处理时间

相比之下处理时间就比较简单了,它就是我们的系统时间,使用时不需要提取时间戳(timestamp)和生成水位线(watermark)。因此在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。
类似地,处理时间属性的定义也有两种方式:创建表DDL中定义,或者在数据流转换成表时定义。

1. 在创建表的DDL中定义

在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性,返回的类型是TIMESTAMP_LTZ。

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里的时间属性,其实是以“计算列”(computed column)的形式定义出来的。所谓的计算列是Flink SQL中引入的特殊概念,可以用一个AS语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将ts字段转换成TIMESTAMP_LTZ类型的ts_ltz,也是计算列的定义方式。

2. 在数据流转换为表时定义

处理时间属性同样可以在将DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。

代码中定义处理时间属性的方法如下:

DataStream<Tuple2<String, String>> stream = ...;

// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());
  • 1
  • 2
  • 3
  • 4

3、窗口(Window)

有了时间属性,接下来就可以定义窗口进行计算了。我们知道,窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。

1. 分组窗口(Group Window,老版本)

在Flink 1.12之前的版本中,Table API和SQL提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在SQL中就是调用TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:

TUMBLE(ts, INTERVAL '1' HOUR)
  • 1

这里的ts是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL来定义。

在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:

Table result = tableEnv.sqlQuery(
                      "SELECT " +
                          "user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
                          "COUNT(url) AS cnt " +
                      "FROM EventTable " +
                      "GROUP BY " +                     // 使用窗口和用户名进行分组
                          "user, " +
                          "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义1小时滚动窗口
                );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这里定义了1小时的滚动窗口,将窗口和用户user一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为endT提取出来。

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

2、窗口表值函数(Windowing TVFs,新版本)

从1.13版本开始,Flink开始使用窗口表值函数(Windowing table-valued functions, Windowing TVFs)来定义窗口。窗口表值函数是Flink定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数,关于这部分内容,我们会在11.6节进行介绍。

目前Flink提供了以下几个窗口TVF:
● 滚动窗口(Tumbling Windows);
● 滑动窗口(Hop Windows,跳跃窗口);
● 累积窗口(Cumulate Windows);
● 会话窗口(Session Windows,目前尚未完全支持)。

窗口表值函数可以完全替代传统的分组窗口函数。窗口TVF更符合SQL标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口Top-N、窗口联结(window join)等等。当然,目前窗口TVF的功能还不完善,会话窗口和很多高级功能还不支持,不过正在快速地更新完善。可以预见在未来的版本中,窗口TVF将越来越强大,将会是窗口处理的唯一入口。

在窗口TVF的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外3个列:“窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。

在SQL中的声明方式,与以前的分组窗口是类似的,直接调用TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。下面我们就分别对这几种窗口TVF进行介绍。

(1)滚动窗口(TUMBLE)

滚动窗口在SQL中的概念与DataStream API中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。

在SQL中通过调用TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在SQL中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口TVF本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOUR)

这里基于时间字段ts,对表EventTable中的数据开了大小为1小时的滚动窗口。窗口会将表中的每一行数据,按照它们ts的值分配到一个指定的窗口中。

(2)滑动窗口(HOP)

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在SQL中通过调用HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
  • 1

这里我们基于时间属性ts,在表EventTable上创建了大小为1小时的滑动窗口,每5分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

(3)累积窗口(CUMULATE)

滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。

例如,我们按天来统计网站的PV(Page View,页面浏览量),如果用1天的滚动窗口,那需要到每天24点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去24小时的PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔1小时就输出一次当天到目前为止的PV值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。

在这里插入图片描述

累积窗口是窗口TVF中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。如图11-8所示,开始时,创建的第一个窗口大小就是步长step;之后的每个窗口都会在之前的基础上再扩展step的长度,直到达到最大窗口长度。在SQL中可以用CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
  • 1

这里我们基于时间属性ts,在表EventTable上定义了一个统计周期为1天、累积步长为1小时的累积窗口。注意第三个参数为步长step,第四个参数则是最大窗口长度。

上面所有的语句只是定义了窗口,类似于DataStream API中的窗口分配器;在SQL中窗口的完整调用,还需要配合聚合操作和其它操作。我们会在下一节详细讲解窗口的聚合。

六、聚合(Aggregation)查询

在SQL中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合(Aggregation)查询。Flink 中的SQL是流处理与标准SQL结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及SQL原生的聚合查询方式。

1、分组聚合

SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");
  • 1

而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。例如之前我们举的例子,可以按照用户名进行分组,统计每个用户点击url的次数:

SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user
  • 1

这种聚合方式,就叫作“分组聚合”(group aggregation)。从概念上讲,SQL中的分组聚合可以对应DataStream API中keyBy之后的聚合转换它们都是按照某个key对数据进行了划分,各自维护状态来进行聚合统计的

流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。

另外,在持续查询的过程中,由于用于分组的key可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API和SQL可以在表环境中配置状态的生存时间(TTL):

TableEnvironment tableEnv = ...

// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

或者也可以直接设置配置项table.exec.state.ttl:

TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");
  • 1
  • 2
  • 3

这两种方式是等效的。需要注意,配置TTL有可能会导致统计结果不准确,这其实是以牺牲正确性为代价换取了资源的释放。

此外,在Flink SQL的分组聚合中同样可以使用DISTINCT进行去重的聚合处理;可以使用HAVING对聚合结果进行条件筛选;还可以使用GROUPING SETS(分组集)设置多个分组情况分别统计。这些语法跟标准SQL中的用法一致,这里就不再详细展开了。

可以看到,分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。关于自定义函数(UDF),我们会在11.7节中详细介绍。

2、窗口聚合

流处理中,往往需要将无限数据流划分成有界数据集这就是所谓的“窗口”。在11.4.3小节中已经介绍了窗口的声明方式,这相当于DataStream API中的窗口分配器(window assigner),只是明确了窗口的形式以及数据如何分配;而窗口具体的计算处理操作,在DataStream API中还需要窗口函数(window function)来进行定义。

在Flink的Table API和SQL中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过GROUP BY子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组key的一部分定义出来。在Flink 1.12版本之前,是直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”(参见11.4.3小节);而1.13版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end。

比如,我们将11.4.3中分组窗口的聚合,用窗口TVF重新实现一下:

Table result = tableEnv.sqlQuery(
                        "SELECT " +
                            "user, " +
                            "window_end AS endT, " +
                            "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                                  "TUMBLE( TABLE EventTable, " +
                                  "DESCRIPTOR(ts), " +
                                  "INTERVAL '1' HOUR)) " +
                        "GROUP BY user, window_start, window_end "
                );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里我们以ts作为时间属性字段、基于EventTable定义了1小时的滚动窗口,希望统计出每小时每个用户点击url的次数。用来分组的字段是用户名user,以及表示窗口的window_start和window_end;而TUMBLE()是表值函数,所以得到的是一个表(Table),我们的聚合查询就是在这个Table中进行的。这就是11.3.2小节中窗口聚合的实现方式。

Flink SQL目前提供了滚动窗口TUMBLE()、滑动窗口HOP()和累积窗口(CUMULATE)三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class CumulateWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

// 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperator<Event> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1",  25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

// 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")  
        );

// 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);

// 设置累积窗口,执行SQL统计查询
        Table result = tableEnv
                .sqlQuery(
                        "SELECT " +
                            "user, " +
                            "window_end AS endT, " +
                            "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                            "CUMULATE( TABLE EventTable, " +    // 定义累积窗口
                                "DESCRIPTOR(ts), " +
                                "INTERVAL '30' MINUTE, " +
                                "INTERVAL '1' HOUR)) " +
                        "GROUP BY user, window_start, window_end "
                );

        tableEnv.toDataStream(result).print();

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

这里我们使用了统计周期为1小时、累积间隔为30分钟的累积窗口。可以看到,代码的架构和处理逻辑与11.3.2小节中的实现完全一致,只是将滚动窗口TUMBLE()换成了累积窗口CUMULATE()。代码执行结果如下:

+I[Alice, 1970-01-01T00:30, 2]
+I[Bob, 1970-01-01T00:30, 1]
+I[Alice, 1970-01-01T01:00, 3]
+I[Bob, 1970-01-01T01:00, 1]
+I[Bob, 1970-01-01T01:30, 1]
+I[Cary, 1970-01-01T02:00, 2]
+I[Bob, 1970-01-01T02:00, 1]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

与分组聚合不同,窗口聚合不会将中间聚合的状态输出,只会最后输出一个结果。我们可以看到,所有数据都是以INSERT操作追加到结果动态表中的,因此输出每行前面都有+I的前缀。所以窗口聚合查询都属于追加查询,没有更新操作,代码中可以直接用toDataStream()将结果表转换成流。

具体来看,上面代码输入的前三条数据属于第一个半小时的累积窗口,其中Alice的访问数据有两条Bob的访问数据有1条,所以输出了两条结果[Alice, 1970-01-01T00:30, 2]和[Bob, 1970-01-01T00:30, 1];而之后又到来的一条Alice访问数据属于第二个半小时范围,同时也属于第一个1小时的统计周期,所以会在之前两条的基础上进行叠加,输出[Alice, 1970-01-01T00:30, 3],而Bob没有新的访问数据,因此依然输出[Bob, 1970-01-01T00:30, 1]。从第二个小时起,数据属于新的统计周期,就全部从零开始重新计数了。

相比之前的分组窗口聚合,Flink 1.13版本的窗口表值函数(TVF)聚合有更强大的功能。除了应用简单的聚合函数、提取窗口开始时间(window_start)和结束时间(window_end)之外,窗口TVF还提供了一个window_time字段,用于表示窗口中的时间属性;这样就可以方便地进行窗口的级联(cascading window)和计算了。另外,窗口TVF还支持GROUPING SETS,极大地扩展了窗口的应用范围。

基于窗口的聚合,是流处理中聚合统计的一个特色,也是与标准SQL最大的不同之处。在实际项目中,很多统计指标其实都是基于时间窗口来进行计算的,所以窗口聚合是Flink SQL中非常重要的功能;基于窗口TVF的聚合未来也会有更多功能的扩展支持,比如窗口Top N、会话窗口、窗口联结等等。

3、开窗(Over)聚合

在标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前1小时内所有数据的平均值;也可以计算它之前10个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫作“OVER聚合”(Over Aggregation)。基本语法

1) 语法

与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫作“OVER聚合”(Over Aggregation)。基本语法如下:

SELECT
  <聚合函数> OVER (
    [PARTITION BY <字段1>[, <字段2>, ...]]
    ORDER BY <时间属性字段>
    <开窗范围>),
  ...
FROM ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2) OVER子句中主要有以下几个部分

这里OVER关键字前面是一个聚合函数,它会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:

● PARTITION BY(可选)

用来指定分区的键(key),类似于GROUP BY的分组,这部分是可选的;

● ORDER BY(可选)

OVER窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在OVER子句中必须用ORDER BY明确地指出数据基于那个字段排序。在Flink的流处理中,目前只支持按照时间属性的升序排列,所以这里ORDER BY后面的字段必须是定义好的时间属性。

● 开窗范围(可选)

对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:

BETWEEN ... PRECEDING AND CURRENT ROW
  • 1

前面我们提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。

● 范围间隔(可选)

范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前1小时的数据:

RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 1
● 行间隔(可选)

行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的5行数据(最终聚合会包括当前行,所以一共6条数据):

ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  • 1

下面是一个具体示例:

SELECT user, ts,
        COUNT(url) OVER (
            PARTITION BY user
            ORDER BY ts
            RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
        ) AS cnt
FROM EventTable
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里我们以ts作为时间属性字段,对EventTable中的每行数据都选取它之前1小时的所有数据进行聚合,统计每个用户访问url的总次数,并重命名为cnt。最终将表中每行的user,ts以及扩展出cnt提取出来。

可以看到,整个开窗聚合的结果,是对每一行数据都有一个对应的聚合值,因此就像将表中扩展出了一个新的列一样。由于聚合范围上界只能到当前行,新到的数据一般不会影响之前数据的聚合结果,所以结果表只需要不断插入(INSERT)就可以了。执行上面SQL得到的结果表,可以用toDataStream()直接转换成流打印输出。

开窗聚合与窗口聚合(窗口TVF聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在Table API中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在SQL中,也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:

SELECT user, ts,
COUNT(url) OVER w AS cnt,
MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
PARTITION BY user
ORDER BY ts
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

上面的SQL中定义了一个选取之前2行数据的OVER窗口,并重命名为w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。比如这里除统计url的个数外,还统计了url的最大长度:首先用CHAR_LENGTH()函数计算出url的长度,再调用聚合函数MAX()进行聚合统计。这样,我们就可以方便重复引用定义好的OVER窗口了,大大增强了代码的可读性。

七、应用实例 —— Top N

灵活使用各种类型的窗口以及聚合函数,可以实现不同的需求。一般的聚合函数,比如SUM()、MAX()、MIN()、COUNT()等,往往只是针对一组数据聚合得到一个唯一的值;所谓OVER聚合的“多对多”模式,也是针对每行数据都进行一次聚合才得到了多行的结果,对于每次聚合计算实际上得到的还是唯一的值。而有时我们可能不仅仅需要统计数据中的最大/最小值,还希望得到前N个最大/最小值;这时每次聚合的结果就不是一行,而是N行了。这就是经典的“Top N”应用场景。

Top N聚合字面意思是“最大N个”,这只是一个泛称,它不仅包括查询最大的N个值、也包括了查询最小的N个值的场景。

理想的状态下,我们应该有一个TOPN()聚合函数,调用它对表进行聚合就可以得到想要选取的前N个值了。不过仔细一想就会发现,这个聚合函数并不容易实现:对于每一次聚合计算,都应该都有多行数据输入,并得到N行结果输出,这是一个真正意义上的“多对多”转换。这种函数相当于把一个表聚合成了另一个表,所以叫作“表聚合函数”(Table Aggregate Function)。表聚合函数的抽象比较困难,目前只有窗口TVF有能力提供直接的Top N聚合,不过也尚未实现。

所以目前在Flink SQL中没有能够直接调用的Top N函数,而是提供了稍微复杂些的变通实现方法。

1. 普通Top N

在Flink SQL中,是通过OVER聚合和一个条件筛选来实现Top N的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来;然后再用WHERE子句筛选行号小于等于N的那些行返回。

1) 基本语法如下:

SELECT ...
FROM (
   SELECT ...,
      ROW_NUMBER() OVER (
[PARTITION BY <字段1>[, <字段1>...]]
          ORDER BY <排序字段1> [asc|desc][, <排序字段2> [asc|desc]...]
) AS row_num
   FROM ...)
WHERE row_num <= N [AND <其它条件>]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里的OVER窗口定义与之前的介绍基本一致目的就是利用ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为row_num,并在外层的查询中以row_num <= N作为条件进行筛选,就可以得到根据排序字段统计的Top N结果了。

2) 需要对关键字额外做一些说明

WHERE

用来指定Top N选取的条件,这里必须通过row_num <= N或者 row_num < N + 1指定一个“排名结束点”(rank end),以保证结果有界。

PARTITION BY

是可选的,用来指定分区的字段,这样我们就可以针对不同的分组分别统计Top N了。

ORDER BY

指定了排序的字段,因为只有排序之后,才能进行前N个最大/最小的选取。每个排序字段后可以用asc或者desc来指定排序规则:asc为升序排列,取出的就是最小的N个值;desc为降序排序,对应的就是最大的N个值。默认情况下为升序,asc可以省略。

细心的读者可能会发现,之前介绍的OVER窗口不是说了,目前ORDER BY后面只能跟时间字段、并且只支持升序吗?这里怎么又可以任意指定字段进行排序了呢?

这是因为OVER窗口目前并不完善,不过针对Top N这样一个经典应用场景,Flink SQL专门用OVER聚合做了优化实现。所以只有在Top N的应用场景中,OVER窗口ORDER BY后才可以指定其它排序字段;而要想实现Top N,就必须按照上面的格式进行定义,否则Flink SQL的优化器将无法正常解析。而且,目前Table API中并不支持ROW_NUMBER()函数,所以也只有SQL中这一种通用的Top N实现方式。

另外要注意,Top N的实现必须写成上面的嵌套查询形式。这是因为行号row_num是内部子查询聚合的结果,不可能在内部作为筛选条件,只能放在外层的WHERE子句中。

下面是一个具体的示例,我们统计每个用户的访问事件中,按照字符长度排序的前两个url:

SELECT user, url, ts, row_num
FROM (
   SELECT *,
      ROW_NUMBER() OVER (
PARTITION BY user
          ORDER BY CHAR_LENGTH(url) desc 
) AS row_num
   FROM EventTable)
WHERE row_num <= 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里我们以用户来分组,以访问url的字符长度作为排序的字段,降序排列后用聚合统计出每一行的行号,这样就相当于在EventTable基础上扩展出了一列row_num。而后筛选出行号小于等于2的所有数据,就得到了每个用户访问的长度最长的两个url。

需要特别说明的是,这里的Top N聚合是一个更新查询。新数据到来后,可能会改变之前数据的排名,所以会有更新(UPDATE)操作。这是ROW_NUMBER()聚合函数的特性决定的。因此,如果执行上面的SQL得到结果表,需要调用toChangelogStream()才能转换成流打印输出。

2. 窗口Top N

除了直接对数据进行Top N的选取,我们也可以针对窗口来做Top N。

例如电商行业,实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来,按窗口进行分组,并按点击量大小降序排序,选取前N个作为结果返回。

我们已经知道,Top N聚合本质上是一个表聚合函数,这和窗口表值函数(TVF)有天然的联系。尽管如此,想要基于窗口TVF实现一个通用的Top N聚合函数还是比较麻烦的,目前Flink SQL尚不支持。不过我们同样可以借鉴之前的思路,使用OVER窗口统计行号来实现。

具体来说,可以先做一个窗口聚合,将窗口信息window_start、window_end连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的Top N那样定义OVER窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前N行就可以得到结果。所以窗口Top N的实现就是窗口聚合与OVER聚合的结合使用。

下面是一个具体案例的代码实现。由于用户访问事件Event中没有商品相关信息,因此我们统计的是每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。

代码

package com.example.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1、在创建表的DDL中直接定义时间属性
        //FROM_UNIXTIME 长整型的值转换为String 格式的年月日的、时分秒、标准的UTC时间
        String createDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT, " +
                "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector'= 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        //注册表
        tableEnv.executeSql(createDDL);


        /**
         * 普通Top N,选取当前所有用户中浏览量最大的2个
         *     cnt 访问量
         *     row_num 到底排第几
         *
         *  ROW_NUMBER() 函数
         *  OVER 窗口: 在获取一个新的字段、就是当前排序之后、按照cnt排序之后、到底排第几、对应的那个行号
         */
        Table topNResultTable = tableEnv.sqlQuery("select user_name, cnt, row_num " +
                "FROM (" +
                " SELECT *,ROW_NUMBER() OVER (" +
                "order by cnt DESC" +
                " ) AS row_num " +
                " FROM  (SELECT user_name,COUNT(url) AS cnt FROM clickTable GROUP BY user_name)" +
                ") WHERE row_num <=2");


//        tableEnv.toChangelogStream(topNResultTable).print("top 2 :");


        //窗口TOP N 统计一段时间内的(前两名)活跃用户
        String subQuery = "SELECT user_name,COUNT(url) AS cnt, window_start,window_end " +
                "FROM Table (" +
                "TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" +
                ")" +
                "GROUP  BY user_name,window_start,window_end";

        Table windowTopN = tableEnv.sqlQuery("select user_name, cnt, row_num,window_end " +
                "FROM (" +
                " SELECT *,ROW_NUMBER() OVER (" +
                " PARTITION BY window_start,window_end " +
                " order by cnt DESC" +
                " ) AS row_num " +
                " FROM  ( " + subQuery + " )) WHERE row_num <=2");

        tableEnv.toChangelogStream(windowTopN).print("window TOP N: ");

        env.execute();
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

这里为了更好的代码可读性,我们将SQL拆分成了用来做窗口聚合的内部子查询,和套用Top N模板的外层查询。

(1)首先基于ts时间字段定义1小时滚动窗口,统计EventTable中每个用户的访问次数,重命名为cnt;为了方便后面做排序,我们将窗口信息window_start和window_end也提取出来,与user和cnt一起作为聚合结果表中的字段。

(2)然后套用Top N模板,对窗口聚合的结果表中每一行数据进行OVER聚合统计行号。这里以窗口信息进行分组,按访问次数cnt进行排序,并筛选行号小于等于2的数据,就可以得到每个窗口内访问次数最多的前两个用户了。

运行结果如下:

+I[1970-01-01T00:00, 1970-01-01T01:00, Alice, 3, 1]
+I[1970-01-01T00:00, 1970-01-01T01:00, Bob, 1, 2]
+I[1970-01-01T01:00, 1970-01-01T02:00, Cary, 2, 1]
+I[1970-01-01T01:00, 1970-01-01T02:00, Bob, 1, 2]

可以看到,第一个1小时窗口中,Alice有3次访问排名第一,Bob有1次访问排名第二;而第二小时内,Cary以2次访问占据活跃榜首,Bob仍以1次访问排名第二。由于窗口的统计结果只会最终输出一次,所以排名也是确定的,这里结果表中只有插入(INSERT)操作。也就是说,窗口Top N是追加查询,可以直接用toDataStream()将结果表转换成流打印输出。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/863936
推荐阅读
相关标签
  

闽ICP备14008679号