当前位置:   article > 正文

Flink SQL Join_flinksql hashjoin

flinksql hashjoin

Flink SQL的 Join 查询分两类 :

  • SQL 原生的联结查询方式
  • 流处理中特有的联结查询

常规联查

常规联查 (Regular Join) 的几种 (L : 左流 , R : 右流) :

  • Inner Join (Inner Equal Join) : 只有两条流 Join 到 , 才输出,输出 +[L, R]
  • Left Join (Outer Equal Join) :只要左流到达,就输出 (Join 到输出 +[L, R] ,没 Join 输出 +[L, null]) ; 当右流到达,但左流没到,就发回撤流,先输出 -[L, null] ,再输出 +[L, R]
  • Right Join (Outer Equal Join) :与 Left Join 相反
  • Full Join (Outer Equal Join) : 当左流或右流到达后,都会输出 (右流 : Join 到输出 +[L, R] ,没 Join 到输出 +[null, R];左流 : Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ) ; 当一条流到达,但另一条流没到,就会发回撤流(左流到达:回撤 -[null, R] ,输出 +[L, R] ,右流到达:回撤 -[L, null] ,输出 +[L, R]

注意点 :

  • 等值 Join : shuffle 策略是 Hash,会按 Join on 的等值条件作为 id 发往下游
  • 非等值 join : shuffle 策略是 Global,所有数据发往一个并发,按非等值条件关联
  • 当 Join 时,Flink 会将两条流的所有数据都存储在 State 中,所以注意 State 配置合适的 TTL,防止 State 过大

Join 表 :

CREATE TABLE ws1 (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.id.min' = '3',
  'fields.id.max' = '5',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
-- 内联结, 只支持等值条件
SELECT *
FROM ws INNER JOIN ws1
	ON ws.id = ws1.id
  • 1
  • 2
  • 3
  • 4
-- 左外
SELECT *
FROM ws
LEFT JOIN ws1
ON ws.id = ws1.id

-- 右外
SELECT *
FROM ws
RIGHT JOIN ws1
ON ws.id = ws1.id

-- 全外
SELECT *
FROM ws
FULL OUTER JOIN ws1
ON ws.id = ws.id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

间隔联查

DataStream 的双流 Join有 : 窗口联结 (window join) , 间隔联结 (interval join)

  • Flink SQL 1.17 : 只支持 间隔联结

时间间隔限制 :

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

例子 :

SELECT *
FROM ws, ws1
WHERE ws.id = ws1. id 
	AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND
  • 1
  • 2
  • 3
  • 4

维表联查

Lookup Join : 维表 Join,实时获取外部缓存的 Join

  • 流与 Redis,Mysql,HBase 外部存储介质的 Join
  • 仅支持 : 处理时间字段
-- 维表在 MySQL 
CREATE TABLE customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
  'table-name' = 'customers'
);



-- order 表每来条数据,就去 mysql 的 customers 表查找维度数据

SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
  JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/520146
推荐阅读
相关标签
  

闽ICP备14008679号