当前位置:   article > 正文

Flink Sql:四种Join方式详解(基于flink1.15官方文档)_flinksql join

flinksql join

JOINs

flink sql主要有四种连接方式,分别是Regular Joins、Interval Joins、Temporal Joins、lookup join

1、Regular Joins(常规连接 )

这种连接方式和hive sql中的join是一样的,包括inner join,left join,right join,full join

  1. 1、指定数据源建立students表
  2. CREATE TABLE students (
  3. id STRING,
  4. name STRING,
  5. age INT,
  6. sex STRING,
  7. clazz STRING
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'students', -- 指定topic
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  12. 'properties.group.id' = 'testGroup', -- 指定消费者组
  13. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置为最新生成的数据
  14. 'format' = 'csv' -- 指定数据的格式
  15. );
  16. 2、kafka生产students表数据
  17. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  18. 1500100001,施笑槐,22,女,文科六班
  19. 1500100002,吕金鹏,24,男,文科六班
  20. 1500100003,单乐蕊,22,女,理科六班
  21. 3、创建关联表scores
  22. CREATE TABLE scores (
  23. sid STRING,
  24. cid STRING, --学科id
  25. score INT
  26. ) WITH (
  27. 'connector' = 'kafka',
  28. 'topic' = 'scores', -- 指定topic
  29. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  30. 'properties.group.id' = 'testGroup', -- 指定消费者组
  31. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  32. 'format' = 'csv' -- 指定数据的格式
  33. );
  34. 4、kafka生产scores数据
  35. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  36. 1500100001,1000001,98
  37. 1500100001,1000002,56
  38. 1500100002,1000001,139
  39. 1500100002,1000002,102
  40. 1500100004,1000001,42
  41. 1500100004,1000002,142
  42. -- inner jion 两边数据都不为null的才会关联
  43. select
  44. a.id,a.name,b.sid,b.score
  45. from
  46. students as a
  47. inner join
  48. scores as b
  49. on a.id=b.sid;
  50. -- left join/right join 保证左边/右边数据的完整性
  51. select
  52. a.id,a.name,b.sid,b.score
  53. from
  54. students as a
  55. right join
  56. scores as b
  57. on a.id=b.sid;
  58. -- full join 保证两边数据的完整性
  59. select
  60. a.id,a.name,b.sid,b.score
  61. from
  62. students as a
  63. full join
  64. scores as b
  65. on a.id=b.sid;
  66. -- 注意:
  67. -- 常规连接,会将两个表的数据一直保存在状态中,时间长了,状态会越来越大,导致任务执行失败,通常在批处理中使用,因为批处理没有状态这个概念。
  68. 为了避免状态过大可能会导致的任务失败问题,我们可以设置状态有效期
  69. -- 状态有效期,状态在flink中保存的时间,但是如果sql中除了关联操作还有聚合这样也需要将数据保存在状态中的操作,状态有效期设置的太短可能会让聚合这样的操作失败,设置的太长延迟也会增加。所以,状态保留多久需要根据实际业务分析
  70. SET 'table.exec.state.ttl' = '20000';
  71. 设置该参数后,那么只有在20秒内到达的数据才会被保存到状态中进行关联。

inner join结果:

left join 结果:

right join结果:

full join结果:

2、Interval Joins(间隔连接

Interval Joins:在一段时间内关联

对于流式查询,与常规连接相比,间隔连接仅支持具有时间属性的追加表。由于时间属性是拟单调递增的,因此 Flink 可以从其状态中删除旧值,而不会影响结果的正确性。

这种方式可以变相弥补Regular Joins中时间长了状态过大的问题。

  1. CREATE TABLE students_proctime (
  2. id STRING,
  3. name STRING,
  4. age INT,
  5. sex STRING,
  6. clazz STRING,
  7. proctime AS PROCTIME()
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'students', -- 指定topic
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  12. 'properties.group.id' = 'testGroup', -- 指定消费者组
  13. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  14. 'format' = 'csv' -- 指定数据的格式
  15. );
  16. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  17. 1500100001,施笑槐,22,女,文科六班
  18. 1500100002,吕金鹏,24,男,文科六班
  19. 1500100003,单乐蕊,22,女,理科六班
  20. CREATE TABLE scores_proctime (
  21. sid STRING,
  22. cid STRING,
  23. score INT,
  24. proctime AS PROCTIME()
  25. ) WITH (
  26. 'connector' = 'kafka',
  27. 'topic' = 'scores', -- 指定topic
  28. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  29. 'properties.group.id' = 'testGroup', -- 指定消费者组
  30. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  31. 'format' = 'csv' -- 指定数据的格式
  32. );
  33. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  34. 1500100001,1000001,98
  35. 1500100001,1000002,56
  36. 1500100002,1000001,139
  37. 1500100002,1000002,102
  38. 1500100004,1000001,42
  39. 1500100004,1000002,142
  40. select a.id,a.name,b.sid,b.score from
  41. students_proctime a, scores_proctime b
  42. where a.id=b.sid
  43. -- a表的时间需要在b表时间10秒内或b表的时间需要在a表时间10秒内
  44. and (
  45. a.proctime BETWEEN b.proctime - INTERVAL '10' SECOND AND b.proctime
  46. or b.proctime BETWEEN a.proctime - INTERVAL '10' SECOND AND a.proctime
  47. );

3、Temporal Joins(时态连接)

这种关联方式是专门用来关联时态表的。

  • Temporal Joins(时态连接)是在流式计算或数据处理中,对两个或多个随时间变化的表(也称为动态表或时态表)进行连接的操作。这些表包含随时间变化的数据,并且行与一个或多个时态周期相关联。

在我们生活中最常见的时态表就是汇率表,汇率随着时间变化而变化。

 

案例:

例如,假设我们有一张订单表,每张订单的价格都采用不同的货币。为了正确地将此表标准化为单一货币(如美元),每张订单都需要与下订单时相应的货币兑换率相结合。

  1. 1、创建订单表
  2. CREATE TABLE orders (
  3. order_id STRING,
  4. price DECIMAL(32,2),
  5. currency STRING, --币种
  6. order_time TIMESTAMP(3),
  7. WATERMARK FOR order_time AS order_time
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'orders', -- 指定topic
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  12. 'properties.group.id' = 'testGroup', -- 指定消费者组
  13. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  14. 'format' = 'csv' -- 指定数据的格式
  15. );
  16. 2、订单表数据
  17. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
  18. o_001,1,EUR,2024-06-06 12:00:00
  19. o_002,100,EUR,2024-06-06 12:00:07
  20. o_003,200,EUR,2024-06-06 12:00:16
  21. o_004,10,EUR,2024-06-06 12:00:21
  22. o_005,20,EUR,2024-06-06 12:00:25
  23. 3、创建汇率表
  24. CREATE TABLE currency_rates (
  25. currency STRING,
  26. conversion_rate DECIMAL(32, 2),
  27. update_time TIMESTAMP(3),
  28. WATERMARK FOR update_time AS update_time,
  29. PRIMARY KEY(currency) NOT ENFORCED -- 主键,区分不同的汇率
  30. ) WITH (
  31. 'connector' = 'kafka',
  32. 'topic' = 'currency_rates1', -- 指定topic
  33. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  34. 'properties.group.id' = 'testGroup', -- 指定消费者组
  35. 'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置
  36. 'format' = 'canal-json' -- 指定数据的格式
  37. );
  38. 4、向汇率表中添加数据
  39. insert into currency_rates
  40. values
  41. ('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),
  42. ('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),
  43. ('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),
  44. ('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');
  45. kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates
  46. -- 使用常规关联方式关联时态表只能关联到最新的数据
  47. select
  48. a.price,a.order_time,b.conversion_rate,b.update_time
  49. from
  50. orders as a
  51. join
  52. currency_rates as b
  53. on a.currency=b.currency;
  54. -- 时态表join
  55. -- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据
  56. select
  57. a.price,a.order_time,b.conversion_rate,b.update_time
  58. from
  59. orders as a
  60. join
  61. currency_rates FOR SYSTEM_TIME AS OF a.order_time as b
  62. on a.currency=b.currency;

常规join结果:

时态join结果:

4、lookup join(查找连接

Lookup Join,也称为维表 Join,通常用于从外部系统查询的数据表。连接要求一个表具有处理时间属性,另一个表由查找源连接器支持。

具体来说:

lookup join用于流表(动态表)关联维度表

流表:动态表

维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql等外部数据源


扩展:流表、事实表、维度表

  1. -- 流表(动态表)
  2. 1、流表的数据来源通常是实时数据流,这些数据流可以来自各种数据源,如 Kafka、RabbitMQ、Kinesis 等。Flink可以通过数据源连接器(Source Connectors)将这些实时数据流接入到 Flink 系统中
  3. 2、与传统数据库中的表不同,流表的行是动态生成的,随着数据流的持续产生而不断增加
  4. -- 维度表
  5. 1、主要提供数据的分析角度,包含了描述业务环境的属性信息,如时间、地理、产品等。
  6. 2、维度表:通常比较宽(包含多个属性列),但行数相对较少,因为维度表中的每一行通常代表一个具体的业务实体或类别,如一个商品、一个客户、一个日期等。
  7. 3、维度表与事实表之间通过外键相关联,共同构成了星型模型或雪花模型。事实表中的外键用于与维度表中的主键相匹配,从而提供数据的上下文和分类信息。
  8. 4、维度表存储的是对数据的描述性信息,这些信息通常不随时间变化,或者变化不频繁。例如,商品的品牌、型号、颜色等属性一旦确定后很少会发生变化。但在某些情况下,如新产品上市或促销活动,可能需要更新维度表以添加新的维度成员。
  9. -- 事实表
  10. 1、存储了实际的数据度量值,如销售额、订单数量等。事实表是数据分析的核心,包含了所有用于分析的数据指标。
  11. 2、通常比较窄(包含较少的列),但行数非常多,因为事实表中的每一行通常代表一个具体的事件或交易,如一个订单、一次点击等。
  12. 3、事实表存储的是度量数据(即指标),这些数据会随时间变化,并且经常需要被汇总和分析。例如,销售额、订单数量、点击量等指标会随着业务活动的进行而不断更新。
  13. 4、事实表的数据更新频率通常较高,因为事实数据会随着业务活动的进行而不断产生。例如,每当有新的订单产生时,都需要在事实表中插入一条新的记录。

 

  1. 1、创建分数表
  2. CREATE TABLE scores (
  3. sid INT,
  4. cid STRING,
  5. score INT,
  6. proctime AS PROCTIME()
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'scores', -- 指定topic
  10. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
  11. 'properties.group.id' = 'testGroup', -- 指定消费者组
  12. 'scan.startup.mode' = 'latest-offset', -- 指定读取数据的位置
  13. 'format' = 'csv' -- 指定数据的格式
  14. );
  15. 2、生产分数表数据
  16. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  17. 1500100001,1000001,98
  18. 1500100001,1000003,137
  19. 3、建立学生表,我们将学生表当作维度表放在mysql中
  20. CREATE TABLE students_test (
  21. id INT,
  22. name STRING,
  23. age INT,
  24. gender STRING,
  25. clazz STRING
  26. ) WITH (
  27. 'connector' = 'jdbc',
  28. 'url' = 'jdbc:mysql://master:3306/bigdata29',
  29. 'table-name' = 'students_test',
  30. 'username' ='root',
  31. 'password' = '123456',
  32. 'lookup.cache.max-rows' = '1000', -- 最大缓存行数
  33. 'lookup.cache.ttl' ='10000' -- 缓存过期时间
  34. );
  35. 学生表数据
  36. 1500100001,施笑槐,22,女,文科六班
  37. -- 使用常规关联方式
  38. -- 维表的数据只在任务启动的时候读取一次,后面不再实时读取,
  39. -- 只能关联到任务启动时读取的数据
  40. -- 一旦mysql中的学生表更新数据,但是关联的学生表数据却是任务启动时从mysql读取的,这就有错误了,lookup join可以解决该问题。
  41. select a.sid,a.score,b.id,b.name from
  42. scores as a
  43. left join
  44. students_test as b
  45. on a.sid=b.id;
  46. -- lookup join
  47. -- 当流表每来一条数据时,使用关联字段到维表的数据源中查询
  48. -- 优点:实时更新数据源,准确性高
  49. -- 缺点:每一次都需要查询数据库,性能会降低
  50. select a.sid,a.score,b.id,b.name from
  51. scores as a
  52. left join
  53. students_test FOR SYSTEM_TIME AS OF a.proctime as b
  54. on a.sid=b.id;

此时我们修改更新mysql中的学生表数据

修改之前

修改后:

常规关联结果:

look up关联结果:

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号