赞
踩
Flink提供了多种流式join操作,我们可以根据实际情况选择最适合自己的类型。下面开始介绍不同的join类型。
Regular Joins是最通用的join类型,和传统数据库的 JOIN 语法完全一致。对于输入表的任何更新(包含插入、更新、删除),都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要定义主键 (PRIMARY KEY NOT ENFORCED)。
支持 INNER、LEFT、RIGHT 等多种 JOIN 类型,但是目前仅支持等值条件的连接(on x=y)。
Regular Joins 运行时需要保留左表和右表的状态,且随着时间的推移,状态会无限增长,最终可能导致作业 OOM 崩溃或异常缓慢。因此我们强烈建议用户在 Flink 参数中设置 table.exec.state.ttl 选项,它可以指定 JOIN 状态的保留时间(定义键的状态在多长时间内没被更新过就会被删除),以便 Flink 及时清理过期的状态,默认值是0 ms,即保存所有状态。比如设置为1天,SET 'table.exec.state.ttl' = '86400 s';
该类型join最终调用的类是:
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。
从类定义上来看,它属于 TwoInputStreamOperator,即接收两个数据输入的算子。左右两表的状态保存在两个类型为 JoinRecordStateView 实例变量(leftRecordStateView、rightRecordStateView),而具体的关联逻辑在它的 processElement 方法中。
使用示例:
SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id SELECT * FROM Orders LEFT JOIN Product ON Orders.product_id = Product.id SELECT * FROM Orders RIGHT JOIN Product ON Orders.product_id = Product.id SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.product_id = Product.id |
指定关联时间区间的join。返回一个受连接条件和时间约束限制的简单笛卡尔积。限制条件至少包含一个等值条件和限制左右表时间的条件(所以左右表都必须有 时间戳字段,且将该时间戳字段用作 WATERMARK FOR 语句指定的时间字段)。时间限制条件可以是比较左右表中相同类型的时间属性(即处理时间或事件时间),可以使用 <, <=, >=, >, BETWEEN
如下:ltime 左表时间属性 rtime 右表时间属性
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
该类型join只支持Append 数据流,不支持含 Retract等 的动态表。由于给定了关联的区间,因此只需要保留很少的状态,内存压力较小。但是缺点是如果关联的数据晚到或者早到,导致落不到 JOIN 区间内,就可能导致结果不准确。此外,只有当区间过了以后,JOIN 结果才会输出,因此会有一定的延迟存在。
该类型join最终调用的类是:
org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin
使用示例:
SELECT * FROM Orders o, Shipments s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time |
Event Time Temporal Join
事件时间时态表连接采用一个任意表(左侧输入/探测站点),并将每行与版本表中相应行的相关版本关联起来(右侧输入/构建端)。
即根据左表记录中的时间戳,在右表的历史版本中进行查询和关联。
例如我们的商品价格表会随时间不断变动,左表来了一条时间戳为 10:00 的订单记录,那么它会对右表在 10:00 的商品价格快照(当时的价格)进行关联并输出结果;如果随后左表来了一条 10:30 的订单记录,那么它会对右表在 10:30 时的商品价格进行后续的关联。
语法:FOR SYSTEM_TIME AS
SELECT [column_list] FROM table1 [AS <alias1>] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] ON table1.column-name1 = table2.column-name1 |
使用事件时间属性(即行时间属性),可以检索键在过去某个时间点的值。这允许在某个共同的时间点连接两个表。版本表将存储自上一个水印以来的所有版本(按时间标识),即注意左右表都是会删除无用数据的,比如左右表的watermark是w1和w2,取其较小值w1,小于w1的左右表数据都会被删除,所以就算左表后面来了旧数据(小于w1)也join不到右边数据了。
注意点:
Event Time Temporal Join 由于是靠watermark触发join的,所以左右表都要设置watermark。
Event Time Temporal Join 要把 Versioned Table 的主键包含在 Join on 的条件中。
与Regular Joins相比,尽管构建端(右表)发生了更改,但之前的时态表结果不会受到影响。与Interval Joins相比,时态表连接不定义时间窗口。来自探测端(左表)的记录总是在time属性指定的时间与构建端的版本连接。因此,构建端的行可能是任意旧的。随着时间的推移,不再需要的记录版本(对于给定的主键)将从状态中删除。支持左连接和内连接。
该类型join最终调用的类是:
org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator
使用示例:
-- Create a table of orders. This is a standard -- append-only dynamic table. CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); -- Define a versioned table of currency rates. -- This could be from a change-data-capture -- such as Debezium, a compacted Kafka topic, or any other -- way of defining a versioned table. CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json', /* ... */ ); SELECT order_id, price, currency, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency; |
Processing Time Temporal Join
处理时间时态表连接使用处理时间属性将行与外部版本控制表中键的最新版本关联起来。
根据定义,使用处理时间属性,联接将始终返回给定键的最新值。可以将查找表看作是一个简单的HashMap<K, V>,它存储来自构建端的所有记录。这种连接的强大之处在于,当在Flink中将表物化为动态表不可行的时候,它允许Flink直接针对外部系统工作。
目前,FOR SYSTEM_TIME AS OF 语法在Processing Time Temporal Join中还不支持,可以使用时态表函数语法(LATERAL TABLE)。不支持FOR SYSTEM_TIME AS OF语法的原因只是语义上的考虑,因为左流的连接处理不会等待时态表的完整快照,这可能会误导生产环境中的用户。使用时态表函数Processing Time Temporal Join也存在同样的语义问题,但它存在已久(很早就支持这种写法),因此我们从兼容性的角度支持它。
Processing Time Temporal Join的常用场景就是join外部表(比如维度表)。
与Regular Joins相比,尽管构建端(右表)发生了更改,但之前的时态表结果不会受到影响。与Interval Joins相比,时态表连接不定义时间窗口。不存储旧数据。
该类型join最终调用的类是:
org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator
使用示例:
SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency |
Lookup Join常用于流与维表之间的join,维表来源于外部系统,比如mysql、redis等。而维表的作用就是对流式数据进行数据补全。比如我们的source stream是来自日志的订单数据,但是日志中我们只是记录了订单商品的id,并没有其他的信息,但是我们把数据存入数仓进行数据分析的时候,却需要商品名称、价格等等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。
Lookup Join要求流表具有处理时间属性(必须是processing time,当前不支持event time,也就是说这种方法不支持根据数据流的事件时间去查维度表里的对应时刻的数据),而维表可以直接使用连接器connector支持。Lookup Join使用Processing Time Temporal Join 的语法,且必须要有等值连接条件。
但是如果每处理一条流里的消息,都要到数据库里查询维表,而维表一般存在第三方数据库,这就导致每次都要有远程请求,特别是数据流大的情况下,频繁的维表查询,也会对外部数据库造成很大压力、降低整体吞吐,所以对维度表进行缓存不失为一个好的策略。但是用缓存也有个潜在的风险:缓存里的数据有可能不是最新的,这要在性能和正确性之间做权衡。所以Lookup Join也提供了缓存的机制,使用的是guava cache实现的。默认缓存机制是禁用的。
其提供了两个配置:lookup.cache.max-rows 和 lookup.cache.ttl。两个都配置了才能启用缓存。
lookup.cache.max-rows:指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。
lookup.cache.ttl:指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。
如果ttl值设置得比较小,则可以获得比较新鲜及时的数据,但是就增大了数据库查询的压力,所以需要一个平衡点。
lookup.cache.caching-missing-key:是否缓存空查询结果的key,默认值是true,即缓存空查询结果的key(即key值没查到结果)
lookup.max-retries:查询数据库的最大重试次数
相关代码详见:org.apache.flink.table.connector.source.LookupTableSource
使用示例:(关键写法:JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime)
CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), customer_id STRING, order_time TIMESTAMP(3), proc_time as PROCTIME() ) with ( 'connector' = 'kafka', 'topic' = 'mdxxx', 'properties.bootstrap.servers' = 'xxx-003:9092', 'properties.group.id' = 'xxx', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); CREATE TABLE customers ( id INT, name STRING, country STRING, zip STRING ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'da.customers', 'username' = 'xxx', 'password' = 'xxx', 'lookup.cache.max-rows' = '100000', 'lookup.cache.ttl' = '86400000' ); SELECT o.order_id, o.price, c.country, c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
将表与表函数的结果join起来。左边(外部)表的每一行都与表函数的相应调用所产生的所有行join。用户定义表函数必须在使用前注册。
使用示例:
SELECT order_id, res FROM Orders, LATERAL TABLE(table_func(order_id)) t(res) SELECT order_id, res FROM Orders LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res) ON TRUE |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。