赞
踩
动态表(dynamic table):动态表是流的另一种表达方式,动态表作为一个逻辑的抽象概念,使我们更容易理解flink中将streaming发展到table这个层次的设计,本质都是对无边界、持续变更数据的表示形式,所以动态表与流之间可以相互转换。
版本表(dynamic table):动态表之上的定义,版本是一个拥有主键和时间属性的动态表(建表语句必需包含PRIMARY KEY和WATERMARK),所以版本表可以追踪每个key在某时间点/时间内的变化情况。版本表可以直接从changelog格式的source创建,或者基于append-only的源创建版本视图。
时态表(temporal table):时态表是随着时间变化而变化的,也就是动态表,时态表包含一个或多个版本表的快照;当它能追踪所有记录的历史变更(来自数据库的changelog)时,就是个版本表;如果它只能表示所有记录经过物化后的最新快照(直接一个数据库表),那就是个普通表。
Regular Joins是flink这么多join类型中最普通的,任意一侧的数据流有变更或者新增,都会影响到join结果。Regular joins是通过把双流的输入数据都保存在flink的状态中,存在state过度膨胀的隐患,所以我们在使用时要合理设置table状态的TTL(table.exec.state.ttl),这要结合具体的业务场景,否则会影响join结果的正确性。
有两种join类型,内连接(INNER Equi-JOIN)和外连接(OUTER Equi-JOIN),两者都只支持等值连接,且至少一个连接条件。
Interval join要求至少有一个等值谓词连接和一个时间约束条件,这个时间属性定义了流的时间范围,且作为WATERMARK
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[ tb_order]], fields=[order_id, price, currency, order_time])
实现方式
支持datastream和sql两种方式实现Interval join,推荐使用sql来实现,
KeyedStream#intervalJoin
,两流必须为KeyedStream,算子为IntervalJoinOperator
,实现了inner join的效果,关联不上的数据将被丢弃!!!TimeIntervalJoin
,两表必需都定义WATERMARK,同时支持inner join|left join、right join效果与时态表的join,通过上述时态表的描述可得知,可以关联得到记录的历史版本或只能得到最新版本,flink sql遵循SQL:2011的标准语法
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
从使用形式划分,可以分为3种:Event Time Temporal Join、Processing Time Temporal Join、Temporal Table Function Join
Event Time temporal join的是一个版本表,意味着可以根据主表的事件时间关联到当时维表的具体版本。
Temporal table join currently only supports ‘FOR SYSTEM_TIME AS OF’ left table’s time attribute field
Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found.
Temporal table’s primary key [currency] must be included in the equivalence condition of temporal join
Processing-time temporal join is not supported yet
从flink 1.14开始已经不再支持这种方式,可以使用temporal table function语法替换
右表定义process-time属性,总是能关联到最新的右表记录,其实这和lookup join差不多,只不过右表的记录是以HashMap全部保存着最新的版本在状态中。这种方式的强大之处在于,可以直接对接不能变成flink动态表的外部表(例如hbase)
Join key must be the same as temporal table’s primary key
create table if not exists tb_order (
`order_id` int,
`price` int,
`currency` string,
`order_time` timestamp(3),
`proc_time` AS PROCTIME(),
primary key(order_id) not enforced)
WITH (
'connector'='mysql-cdc',
'table-name'='tb_order',
)
create table if not exists tb_currency (
`currency` string,
`rate` int,
`update_time` timestamp(3),
`proc_time` AS PROCTIME(),
)
WITH (
'connector'='mysql-cdc',
'table-name'='tb_currency',
)
TemporalTableFunction rate = tEnv.from("tb_currency").createTemporalTableFunction("proc_time", "currency");
tEnv.createTemporarySystemFunction("rate",rate);
select * from tb_order o , LATERAL TABLE(rate(o.order_time)) c where o.currency=c.currency
上面例子实现了process time temporal join,两建表语句都不指定事件时间,且tb_currency无需指定primary key(即非版本表),但是在定义TemporalTableFunction可以指定任意字段为主键,所以如果建表语句指定了事件时间,且TemporalTableFunction也使用事件时间,那么相当于间接创建了版本表。
通过查询外部存储系统的数据以丰富流的属性,这种join方式要求流表必须有一个processing time属性,外部数据表的connector要实现LookupTableSource
接口
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'table-name' = 'customers'
);
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。