当前位置:   article > 正文

Flink系列之:Joins

Flink系列之:Joins

一、Joins

  • 适用于流、批一体

Flink SQL支持对动态表进行复杂而灵活的连接操作。 为了处理不同的场景,需要多种查询语义,因此有几种不同类型的 Join。

默认情况下,joins 的顺序是没有优化的。表的 join 顺序是在 FROM 从句指定的。可以通过把更新频率最低的表放在第一个、频率最高的放在最后这种方式来微调 join 查询的性能。需要确保表的顺序不会产生笛卡尔积,因为不支持这样的操作并且会导致查询失败。

二、Regular Joins

Regular join 是最通用的 join 类型。在这种 join 下,join 两侧表的任何新记录或变更都是可见的,并会影响整个 join 的结果。 例如:如果左边有一条新纪录,在 Product.id 相等的情况下,它将和右边表的之前和之后的所有记录进行 join。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
  • 1
  • 2
  • 3

对于流式查询,regular join 的语法是最灵活的,允许任何类型的更新(插入、更新、删除)输入表。 然而,这种操作具有重要的操作意义:Flink 需要将 Join 输入的两边数据永远保持在状态中。 因此,计算查询结果所需的状态可能会无限增长,这取决于所有输入表的输入数据量。你可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这样做可能会影响查询的正确性。

三、INNER Equi-JOIN

根据 join 限制条件返回一个简单的笛卡尔积。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。(cross join 指的是类似 SELECT * FROM table_a CROSS JOIN table_b,theta join 指的是类似 SELECT * FROM table_a, table_b)

SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id
  • 1
  • 2
  • 3
  • 4

四、OUTER Equi-JOIN

返回所有符合条件的笛卡尔积(即:所有通过 join 条件连接的行),加上所有外表没有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。

SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id
  • 1
  • 2
  • 3
  • 4

这个Flink SQL查询的目标是通过将Orders表与Product表进行FULL OUTER JOIN,获取所有订单和产品的信息。

  • 使用SELECT *来选择所有列。
  • 使用FULL OUTER JOIN将Orders表和Product表连接起来,连接条件是Orders表的product_id列与Product表的id列相等。
  • FULL OUTER JOIN操作将返回所有匹配和不匹配的行,因此查询结果将包括Orders表和Product表中的所有数据。
  • 这个查询的结果将包含所有订单和产品的信息。

五、Interval Joins

返回一个符合 join 条件和时间限制的简单笛卡尔积。Interval join 需要至少一个 equi-join 条件和一个 join 两边都包含的时间限定 join 条件。范围判断可以定义成就像一个条件(<, <=, >=, >),也可以是一个 BETWEEN 条件,或者两边表的一个相同类型(即:处理时间 或 事件时间)的时间属性 的等式判断。

例如:如果订单是在被接收到4小时后发货,这个查询会把所有订单和它们相应的 shipments join 起来。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
  • 1
  • 2
  • 3
  • 4

这个SQL查询的目标是,从订单数据表(Orders)和发货数据表(Shipments)中选择符合条件的所有列。

  • 首先,我们使用FROM关键字将Orders表和Shipments表进行连接(使用逗号表示进行内连接)。
  • 然后,我们使用WHERE子句来指定连接条件,即订单(Orders)和发货(Shipments)的订单ID必须相等。
  • 除此之外,我们还使用AND子句来指定一个条件,即订单时间(o.order_time)必须在发货时间(s.ship_time)之前的4小时内。
  • 最后,使用SELECT *语句选择所有的列。
  • 这个查询的结果将返回满足连接条件和时间范围条件的所有订单和发货数据。

下面列举了一些有效的 interval join 时间条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
  • ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

对于流式查询,对比 regular join,interval join 只支持有时间属性的非更新表。 由于时间属性是递增的,Flink 从状态中移除旧值也不会影响结果的正确性。

六、Temporal Joins

时态表(Temporal table)是一个随时间变化的表:在 Flink 中被称为动态表。时态表中的行与一个或多个时间段相关联,所有 Flink 中的表都是时态的(Temporal)。 时态表包含一个或多个版本的表快照,它可以是一个变化的历史表,跟踪变化(例如,数据库变化日志,包含所有快照)或一个变化的维度表,也可以是一个将变更物化的维表(例如,存放最终快照的数据表)。

七、事件时间 Temporal Join

基于事件时间的 Temporal join 允许对版本表进行 join。 这意味着一个表可以使用变化的元数据来丰富,并在某个时间点检索其具体值。

Temporal Joins 使用任意表(左侧输入/探测端)的每一行与版本表中对应的行进行关联(右侧输入/构建端)。 Flink 使用 SQL:2011标准 中的 FOR SYSTEM_TIME AS OF 语法去执行操作。 Temporal join 的语法如下:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
  • 1
  • 2
  • 3
  • 4

有了事件时间属性(即:rowtime 属性),就能检索到过去某个时间点的值。 这允许在一个共同的时间点上连接这两个表。 版本表将存储自最后一个 watermark 以来的所有版本(按时间标识)。

例如,假设我们有一个订单表,每个订单都有不同货币的价格。 为了正确地将该表统一为单一货币(如美元),每个订单都需要与下单时相应的汇率相关联。

-- 创建订单表。这是一个标准
-- 仅追加动态表。
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (/* ... */);

-- 定义版本化的货币汇率表。
-- 这可能来自变更数据捕获
-- 例如 Debezium、压缩的 Kafka 主题或任何其他
-- 定义版本化表的方法。
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     orders.currency,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

这个Flink SQL查询的目标是将订单数据和货币汇率数据进行连接,并在连接后的结果中选择特定的列。

  • 首先,我们使用CREATE TABLE语句创建了一个名为orders的表。该表包含了订单的相关信息,如订单ID、价格、货币类型和订单时间。在定义该表时,我们还为订单时间添加了一个WATERMARK。WATERMARK用于指定事件时间,以便Flink能够按照事件时间进行处理。
  • 接下来,我们使用CREATE TABLE语句创建了一个名为currency_rates的表。该表用于存储货币汇率的相关信息,包括货币类型、兑换率和更新时间。这个表是一个版本化的表,可以通过变更数据捕获(如Debezium)、压缩的Kafka主题或其他方式创建。
  • 最后,我们使用SELECT语句从orders表中选择了订单ID、价格、订单货币类型、兑换率和订单时间这几个列。在选择这些列时,我们还使用了LEFT JOIN关键字将orders表和currency_rates表进行连接。连接条件是订单的货币类型(orders.currency)和货币汇率表中的货币类型(currency_rates.currency)必须相等。此外,我们还使用了FOR SYSTEM_TIME AS OF orders.order_time指定使用订单时间来选择货币汇率表中的数据。这样,查询的结果将返回订单数据和相应的货币汇率信息。
  • 查询结果中的order_id、price、currency、conversion_rate和order_time列分别表示订单ID、价格、货币类型、兑换率和订单时间。每一行表示一个订单及其相应的货币汇率。

注意: 事件时间 temporal join 是通过左和右两侧的 watermark 触发的; 这里的 INTERVAL 时间减法用于等待后续事件,以确保 join 满足预期。 请确保 join 两边设置了正确的 watermark 。

注意: 事件时间 temporal join 需要包含主键相等的条件,即:currency_rates 表的主键 currency_rates.currency 包含在条件 orders.currency = currency_rates.currency 中。

与 regular joins 相比,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 的结果也不会被影响。 与 interval joins 对比,temporal join没有定义join的时间窗口。 Probe side (例子中的 orders 表)的记录总是在 time 属性指定的时间与 build side 的版本行进行连接。因此,build side 表的行可能已经过时了。 随着时间的推移,不再被需要的记录版本(对于给定的主键)将从状态中删除。

八、处理时间 Temporal Join

基于处理时间的 temporal join 使用处理时间属性将数据与外部版本表(例如 mysql、hbase)的最新版本相关联。

通过定义一个处理时间属性,这个 join 总是返回最新的值。可以将 build side 中被查找的表想象成一个存储所有记录简单的 HashMap<K,V>。 这种 join 的强大之处在于,当无法在 Flink 中将表具体化为动态表时,它允许 Flink 直接针对外部系统工作。

下面这个处理时间 temporal join 示例展示了一个追加表 orders 与 LatestRates 表进行 join。 LatestRates 是一个最新汇率的维表,比如 HBase 表,在 10:15,10:30,10:52这些时间,LatestRates 表的数据看起来是这样的:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

LastestRates 表的数据在 10:15 和 10:30 是相同的。 欧元(Euro)的汇率(rate)在 10:52 从 114 变更为 116。

Orders 表示支付金额的 amount 和currency的追加表。 例如:在 10:15 ,有一个金额为 2 Euro 的 order。

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

给出下面这些表,我们希望所有 Orders 表的记录转换为一个统一的货币。

amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52
  • 1
  • 2
  • 3
  • 4
  • 5

目前,temporal join 还不支持与任意 view/table 的最新版本 join 时使用 FOR SYSTEM_TIME AS OF 语法。可以像下面这样使用 temporal table function 语法来实现(时态表函数):

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这个Flink SQL查询的目标是将订单数据和货币汇率数据进行连接,并在连接后的结果中选择特定的列。

  • 首先,通过FROM子句指定了两个输入表,即Orders和LATERAL TABLE (Rates(o_proctime))。Orders表示订单数据表,而LATERAL TABLE (Rates(o_proctime))表示根据订单处理时间获取的货币汇率表。
  • 接下来,在WHERE子句中指定了连接条件,即货币汇率表中的货币类型(r_currency)必须与订单表中的货币类型(o_currency)相等。
  • 最后,在SELECT子句中选择了o_amount和r_rate这两个列,分别表示订单金额和货币汇率。
  • 查询结果将返回订单金额和对应的货币汇率。

注意 Temporal join 不支持与 table/view 的最新版本进行 join 时使用 FOR SYSTEM_TIME AS OF 语法是出于语义考虑,因为左流的连接处理不会等待 temporal table 的完整快照,这可能会误导生产环境中的用户。处理时间 temporal join 使用 temporal table function 也存在相同的语义问题,但它已经存在了很长时间,因此我们从兼容性的角度支持它。

processing-time 的结果是不确定的。 processing-time temporal join 常常用在使用外部系统来丰富流的数据。(例如维表)

与 regular joins 的差异,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 结果也不会被影响。 与 interval joins 的差异,temporal join 没有定义数据连接的时间窗口。即:旧数据没存储在状态中。

九、时态表函数连接

使用 temporal table function 去 join 表的语法和 Table Function 相同。

注意:目前只支持 inner join 和 left outer join。

假设Rates是一个 temporal table function,这个 join 在 SQL 中可以被表达为:

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

上述 temporal table DDL 和 temporal table function 的主要区别在于:

  • SQL 中可以定义 temporal table DDL,但不能定义 temporal table 函数;
  • temporal table DDL 和 temporal table function 都支持 temporal join 版本表,但只有 temporal table function 可以 temporal join 任何表/视图的最新版本(即"处理时间 Temporal Join")。

十、Lookup Join

lookup join 通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性,另一个表由查找源连接器(lookup source connnector)支持。

lookup join 和上面的 处理时间 Temporal Join 语法相同,右表使用查找源连接器支持。

下面的例子展示了 lookup join 的语法。

-- Customers 由 JDBC 连接器支持,可用于查找连接
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- 用客户信息丰富每个订单
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在上面的示例中,Orders 表由保存在 MySQL 数据库中的 Customers 表数据来丰富。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时,Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 o.customer_id = c.id。

十一、数组展开

返回给定数组中每个元素的新行。尚不支持 WITH ORDINALITY(会额外生成一个标识顺序的整数列)展开。

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
  • 1
  • 2

十二、表功能

将表与表函数的结果联接。左侧(外部)表的每一行都与表函数的相应调用产生的所有行相连接。用户自定义表函数 必须在使用前注册。

INNER JOIN

如果表函数调用返回一个空结果,那么左表的这行数据将不会输出。

SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
  • 1
  • 2
  • 3

这个Flink SQL查询的目标是从订单数据表中获取订单ID和通过自定义表函数根据订单ID计算得出的结果。

  • 首先,通过FROM子句指定了输入表Orders。
  • 然后,通过LATERAL TABLE子句调用了自定义的表函数table_func,并将订单ID作为参数传递给函数。表函数的作用是根据订单ID计算得出一个结果。
  • 接下来,在SELECT子句中选择了order_id和res这两个列,分别表示订单ID和通过表函数计算得到的结果。
  • 查询结果将返回订单ID和对应的表函数计算结果。

LEFT OUTER JOIN

如果表函数调用返回了一个空结果,则保留相应的行,并用空值填充未关联到的结果。当前,针对 lateral table 的 left outer join 需要 ON 子句中有一个固定的 TRUE 连接条件。

SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  ON TRUE
  • 1
  • 2
  • 3
  • 4

这个Flink SQL查询的目标是从订单数据表中获取订单ID和通过自定义表函数根据订单ID计算得出的结果,并使用左外连接将计算结果与订单数据进行关联。

  • 首先,通过FROM子句指定了输入表Orders。
  • 然后,通过LATERAL TABLE子句调用了自定义的表函数table_func,并将订单ID作为参数传递给函数。表函数的作用是根据订单ID计算得出一个结果。
  • 接下来,使用LEFT OUTER JOIN关键字将计算结果与订单数据进行连接。使用ON TRUE来指定连接条件,这意味着对所有的订单都执行连接操作。
  • 最后,在SELECT子句中选择了order_id和res这两个列,分别表示订单ID和通过表函数计算得到的结果。
  • 查询结果将返回订单ID和对应的表函数计算结果,如果没有计算结果的话,会返回NULL值。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/477349
推荐阅读
相关标签
  

闽ICP备14008679号