当前位置:   article > 正文

Flink SQL join类型_flinksql的几种join

flinksql的几种join

Flink提供了多种流式join操作,我们可以根据实际情况选择最适合自己的类型。下面开始介绍不同的join类型。

Regular Joins(常规join)

Regular Joins是最通用的join类型,和传统数据库的 JOIN 语法完全一致。对于输入表的任何更新(包含插入、更新、删除),都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要定义主键 (PRIMARY KEY NOT ENFORCED)。

支持 INNER、LEFT、RIGHT 等多种 JOIN 类型,但是目前仅支持等值条件的连接(on x=y)。

Regular Joins 运行时需要保留左表和右表的状态,且随着时间的推移,状态会无限增长,最终可能导致作业 OOM 崩溃或异常缓慢。因此我们强烈建议用户在 Flink 参数中设置 table.exec.state.ttl 选项,它可以指定 JOIN 状态的保留时间(定义键的状态在多长时间内没被更新过就会被删除),以便 Flink 及时清理过期的状态,默认值是0 ms,即保存所有状态。比如设置为1天,SET 'table.exec.state.ttl' = '86400 s';

该类型join最终调用的类是:

org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。

从类定义上来看,它属于 TwoInputStreamOperator,即接收两个数据输入的算子。左右两表的状态保存在两个类型为 JoinRecordStateView 实例变量(leftRecordStateView、rightRecordStateView),而具体的关联逻辑在它的 processElement 方法中。

使用示例:

SELECT *

FROM Orders

INNER JOIN Product

ON Orders.product_id = Product.id

SELECT *

FROM Orders

LEFT JOIN Product

ON Orders.product_id = Product.id

SELECT *

FROM Orders

RIGHT JOIN Product

ON Orders.product_id = Product.id

SELECT *

FROM Orders

FULL OUTER JOIN Product

ON Orders.product_id = Product.id

Interval Joins(时间区间join)

指定关联时间区间的join。返回一个受连接条件和时间约束限制的简单笛卡尔积。限制条件至少包含一个等值条件和限制左右表时间的条件(所以左右表都必须有 时间戳字段,且将该时间戳字段用作 WATERMARK FOR 语句指定的时间字段)。时间限制条件可以是比较左右表中相同类型的时间属性(即处理时间或事件时间),可以使用 <, <=, >=, >, BETWEEN

如下:ltime 左表时间属性 rtime 右表时间属性

ltime = rtime

ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

该类型join只支持Append 数据流,不支持含 Retract等 的动态表。由于给定了关联的区间,因此只需要保留很少的状态,内存压力较小。但是缺点是如果关联的数据晚到或者早到,导致落不到 JOIN 区间内,就可能导致结果不准确。此外,只有当区间过了以后,JOIN 结果才会输出,因此会有一定的延迟存在。

该类型join最终调用的类是:

org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin

使用示例:

SELECT *

FROM Orders o, Shipments s

WHERE o.id = s.order_id

AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

Temporal Joins(时态表join)

Event Time Temporal Join

事件时间时态表连接采用一个任意表(左侧输入/探测站点),并将每行与版本表中相应行的相关版本关联起来(右侧输入/构建端)。

即根据左表记录中的时间戳,在右表的历史版本中进行查询和关联。

例如我们的商品价格表会随时间不断变动,左表来了一条时间戳为 10:00 的订单记录,那么它会对右表在 10:00 的商品价格快照(当时的价格)进行关联并输出结果;如果随后左表来了一条 10:30 的订单记录,那么它会对右表在 10:30 时的商品价格进行后续的关联。

语法:FOR SYSTEM_TIME AS

SELECT [column_list]

FROM table1 [AS <alias1>]

[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]

ON table1.column-name1 = table2.column-name1

使用事件时间属性(即行时间属性),可以检索键在过去某个时间点的值。这允许在某个共同的时间点连接两个表。版本表将存储自上一个水印以来的所有版本(按时间标识),即注意左右表都是会删除无用数据的,比如左右表的watermark是w1和w2,取其较小值w1,小于w1的左右表数据都会被删除,所以就算左表后面来了旧数据(小于w1)也join不到右边数据了

注意点:

Event Time Temporal Join 由于是靠watermark触发join的,所以左右表都要设置watermark。

Event Time Temporal Join 要把 Versioned Table 的主键包含在 Join on 的条件中。

与Regular Joins相比,尽管构建端(右表)发生了更改,但之前的时态表结果不会受到影响。与Interval Joins相比,时态表连接不定义时间窗口。来自探测端(左表)的记录总是在time属性指定的时间与构建端的版本连接。因此,构建端的行可能是任意旧的。随着时间的推移,不再需要的记录版本(对于给定的主键)将从状态中删除。支持左连接和内连接。

该类型join最终调用的类是:

org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator

使用示例:

-- Create a table of orders. This is a standard

-- append-only dynamic table.

CREATE TABLE orders (

    order_id    STRING,

    price       DECIMAL(32,2),

    currency    STRING,

    order_time  TIMESTAMP(3),

    WATERMARK FOR order_time AS order_time

) WITH (/* ... */);

-- Define a versioned table of currency rates.

-- This could be from a change-data-capture

-- such as Debezium, a compacted Kafka topic, or any other

-- way of defining a versioned table.

CREATE TABLE currency_rates (

    currency STRING,

    conversion_rate DECIMAL(32, 2),

    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,

    WATERMARK FOR update_time AS update_time,

    PRIMARY KEY(currency) NOT ENFORCED

) WITH (

   'connector' = 'kafka',

   'value.format' = 'debezium-json',

   /* ... */

);

SELECT

     order_id,

     price,

     currency,

     conversion_rate,

     order_time

FROM orders

LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time

ON orders.currency = currency_rates.currency;

Processing Time Temporal Join

处理时间时态表连接使用处理时间属性将行与外部版本控制表中键的最新版本关联起来。

根据定义,使用处理时间属性,联接将始终返回给定键的最新值。可以将查找表看作是一个简单的HashMap<K, V>,它存储来自构建端的所有记录。这种连接的强大之处在于,当在Flink中将表物化为动态表不可行的时候,它允许Flink直接针对外部系统工作。

目前,FOR SYSTEM_TIME AS OF 语法在Processing Time Temporal Join中还不支持,可以使用时态表函数语法(LATERAL TABLE)。不支持FOR SYSTEM_TIME AS OF语法的原因只是语义上的考虑,因为左流的连接处理不会等待时态表的完整快照,这可能会误导生产环境中的用户。使用时态表函数Processing Time Temporal Join也存在同样的语义问题,但它存在已久(很早就支持这种写法),因此我们从兼容性的角度支持它。

Processing Time Temporal Join的常用场景就是join外部表(比如维度表)。

与Regular Joins相比,尽管构建端(右表)发生了更改,但之前的时态表结果不会受到影响。与Interval Joins相比,时态表连接不定义时间窗口。不存储旧数据。

该类型join最终调用的类是:

org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator

使用示例:

SELECT

  o_amount, r_rate

FROM

  Orders,

  LATERAL TABLE (Rates(o_proctime))

WHERE

  r_currency = o_currency

Lookup Join(查找join)

Lookup Join常用于流与维表之间的join,维表来源于外部系统,比如mysql、redis等。而维表的作用就是对流式数据进行数据补全。比如我们的source stream是来自日志的订单数据,但是日志中我们只是记录了订单商品的id,并没有其他的信息,但是我们把数据存入数仓进行数据分析的时候,却需要商品名称、价格等等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。

Lookup Join要求流表具有处理时间属性(必须是processing time,当前不支持event time,也就是说这种方法不支持根据数据流的事件时间去查维度表里的对应时刻的数据),而维表可以直接使用连接器connector支持。Lookup Join使用Processing Time Temporal Join 的语法,且必须要有等值连接条件。

但是如果每处理一条流里的消息,都要到数据库里查询维表,而维表一般存在第三方数据库,这就导致每次都要有远程请求,特别是数据流大的情况下,频繁的维表查询,也会对外部数据库造成很大压力、降低整体吞吐,所以对维度表进行缓存不失为一个好的策略。但是用缓存也有个潜在的风险:缓存里的数据有可能不是最新的,这要在性能和正确性之间做权衡。所以Lookup Join也提供了缓存的机制,使用的是guava cache实现的。默认缓存机制是禁用的。

其提供了两个配置:lookup.cache.max-rows 和 lookup.cache.ttl。两个都配置了才能启用缓存。

lookup.cache.max-rows:指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

lookup.cache.ttl:指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

如果ttl值设置得比较小,则可以获得比较新鲜及时的数据,但是就增大了数据库查询的压力,所以需要一个平衡点。

lookup.cache.caching-missing-key:是否缓存空查询结果的key,默认值是true,即缓存空查询结果的key(即key值没查到结果)

lookup.max-retries:查询数据库的最大重试次数

相关代码详见:org.apache.flink.table.connector.source.LookupTableSource

使用示例:(关键写法:JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime)

CREATE TABLE orders (

    order_id    STRING,

    price       DECIMAL(32,2),

    customer_id    STRING,

    order_time  TIMESTAMP(3),

    proc_time as PROCTIME()

) with (

  'connector' = 'kafka',

  'topic' = 'mdxxx',

  'properties.bootstrap.servers' = 'xxx-003:9092',

  'properties.group.id' = 'xxx',

  'scan.startup.mode' = 'latest-offset',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);

CREATE TABLE customers (

  id INT,

  name STRING,

  country STRING,

  zip STRING

) with (

  'connector' = 'jdbc',

  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',

  'table-name' = 'da.customers',

  'username' = 'xxx',

  'password' = 'xxx',

  'lookup.cache.max-rows' = '100000',

  'lookup.cache.ttl' = '86400000'

);

SELECT o.order_id, o.price, c.country, c.zip

FROM orders AS o

  JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c

    ON o.customer_id = c.id;

Table Function Join(表函数join)

将表与表函数的结果join起来。左边(外部)表的每一行都与表函数的相应调用所产生的所有行join。用户定义表函数必须在使用前注册。

使用示例:

SELECT order_id, res

FROM Orders,

LATERAL TABLE(table_func(order_id)) t(res)

SELECT order_id, res

FROM Orders

LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)

  ON TRUE

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/595303
推荐阅读
相关标签
  

闽ICP备14008679号