赞
踩
Flink SQL的 Join 查询分两类 :
常规联查 (Regular Join) 的几种 (L : 左流 , R : 右流) :
+[L, R]
+[L, R]
,没 Join 输出 +[L, null]
) ; 当右流到达,但左流没到,就发回撤流,先输出 -[L, null]
,再输出 +[L, R]
+[L, R]
,没 Join 到输出 +[null, R]
;左流 : Join 到输出 +[L, R]
,没 Join 到输出 +[L, null]
) ; 当一条流到达,但另一条流没到,就会发回撤流(左流到达:回撤 -[null, R]
,输出 +[L, R]
,右流到达:回撤 -[L, null]
,输出 +[L, R]
注意点 :
Join 表 :
CREATE TABLE ws1 (
id INT,
vc INT,
pt AS PROCTIME(), --处理时间
et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.min' = '3',
'fields.id.max' = '5',
'fields.vc.min' = '1',
'fields.vc.max' = '100'
);
-- 内联结, 只支持等值条件
SELECT *
FROM ws INNER JOIN ws1
ON ws.id = ws1.id
-- 左外 SELECT * FROM ws LEFT JOIN ws1 ON ws.id = ws1.id -- 右外 SELECT * FROM ws RIGHT JOIN ws1 ON ws.id = ws1.id -- 全外 SELECT * FROM ws FULL OUTER JOIN ws1 ON ws.id = ws.id
DataStream 的双流 Join有 : 窗口联结 (window join) , 间隔联结 (interval join)
时间间隔限制 :
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
例子 :
SELECT *
FROM ws, ws1
WHERE ws.id = ws1. id
AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND
Lookup Join
: 维表 Join,实时获取外部缓存的 Join
-- 维表在 MySQL CREATE TABLE customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://hadoop102:3306/customerdb', 'table-name' = 'customers' ); -- order 表每来条数据,就去 mysql 的 customers 表查找维度数据 SELECT o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。