当前位置:   article > 正文

Flink SQL --命令行的使用(02)_flink sql scan.startup.mode

flink sql scan.startup.mode
1、窗口函数: 
1、创建表:
  1. -- 创建kafka 表
  2. CREATE TABLE bid (
  3. bidtime TIMESTAMP(3),
  4. price DECIMAL(10, 2) ,
  5. item STRING,
  6. WATERMARK FOR bidtime AS bidtime
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'bid', -- 数据的topic
  10. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  11. 'properties.group.id' = 'testGroup', -- 消费者组
  12. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  13. 'format' = 'csv' -- 读取数据的格式
  14. );
  15. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
  16. 2020-04-15 08:05:00,4.00,C
  17. 2020-04-15 08:07:00,2.00,A
  18. 2020-04-15 08:09:00,5.00,D
  19. 2020-04-15 08:11:00,3.00,B
  20. 2020-04-15 08:13:00,1.00,E
  21. 2020-04-15 08:17:00,6.00,F
2、滚动窗口:
        1、滚动的事件时间窗口:
  1. -- TUMBLE: 滚动窗口函数,函数的作用时在原表的基础上增加[窗口开始时间,窗口结束时间,窗口时间]
  2. -- TABLE;表函数,将里面函数的结果转换成动态表
  3. SELECT * FROM
  4. TABLE(
  5. TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
  6. );
  7. -- 在基于窗口函数提供的字段进行聚合计算
  8. -- 实时统计每隔商品的总的金额,每隔10分钟统计一次
  9. SELECT
  10. item,
  11. window_start,
  12. window_end,
  13. sum(price) as sum_price
  14. FROM
  15. TABLE(
  16. -- 滚动的事件时间窗口
  17. TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
  18. )
  19. group by item,window_start,window_end;
        2、滚动的处理时间窗口:
  1. CREATE TABLE words (
  2. word STRING,
  3. proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words', -- 数据的topic
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  8. 'properties.group.id' = 'testGroup', -- 消费者组
  9. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  10. 'format' = 'csv' -- 读取数据的格式
  11. );
  12. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
  13. java
  14. spark
  15. -- 在flink SQL中处理时间和事件时间的sql语法没有区别
  16. SELECT * FROM
  17. TABLE(
  18. TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
  19. );
  20. SELECT
  21. word,window_start,window_end,
  22. count(1) as c
  23. FROM
  24. TABLE(
  25. TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
  26. )
  27. group by
  28. word,window_start,window_end
3、滑动窗口:
  1. -- HOP: 滑动窗口函数
  2. -- 滑动窗口一条数据可能会落到多个窗口中
  3. SELECT * FROM
  4. TABLE(
  5. HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
  6. );
  7. -- 每隔5分钟计算最近10分钟所有商品总的金额
  8. SELECT
  9. window_start,
  10. window_end,
  11. sum(price) as sum_price
  12. FROM
  13. TABLE(
  14. HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
  15. )
  16. group by
  17. window_start,window_end
4、会话窗口:
  1. CREATE TABLE words (
  2. word STRING,
  3. proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words', -- 数据的topic
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  8. 'properties.group.id' = 'testGroup', -- 消费者组
  9. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  10. 'format' = 'csv' -- 读取数据的格式
  11. );
  12. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
  13. java
  14. spark
  15. select
  16. word,
  17. SESSION_START(proctime,INTERVAL '5' SECOND) as window_start,
  18. SESSION_END(proctime,INTERVAL '5' SECOND) as window_end,
  19. count(1) as c
  20. from
  21. words
  22. group by
  23. word,SESSION(proctime,INTERVAL '5' SECOND);
2、OVER聚合:
        1、批处理

Flink中的批处理的模式,over函数和hive是一致的。

  1. SET 'execution.runtime-mode' = 'batch';
  2. -- 有界流
  3. CREATE TABLE students_hdfs_batch (
  4. sid STRING,
  5. name STRING,
  6. age INT,
  7. sex STRING,
  8. clazz STRING
  9. )WITH (
  10. 'connector' = 'filesystem', -- 必选:指定连接器类型
  11. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
  12. 'format' = 'csv' -- 必选:文件系统连接器指定 format
  13. );
  14. -- row_number,sum,count,avg,lag,lead,max,min
  15. -- 需要注意的是sum,sum在有排序的是聚合,在没有排序的是全局聚合。
  16. -- 获取每隔班级年龄最大的前两个学生
  17. select *
  18. from(
  19. select
  20. *,
  21. row_number() over(partition by clazz order by age desc) as r
  22. from
  23. students_hdfs_batch
  24. ) as a
  25. where r <=2
        2、流处理:

flink流处理中over聚合使用限制

        1、order by 字段必须是时间字段升序排序或者使用over_number时可以增加条件过滤

        2、在流处理里面,Flink中目前只支持按照时间属性升序定义的over的窗口。因为在批处理中,数据量的大小是固定的,不会有新的数据产生,所以在做排序的时候,只需要一次排序,所以排序字段可以随便指定,但是在流处理中,数据量是源源不断的产生,当每做一次排序的时候,就需要将之前的数据都取出来存储,随着时间的推移,数据量会不断的增加,在做排序时计算量非常大。但是按照时间的顺序,时间是有顺序的,可以减少计算的代价。

        3、也可以选择top N 也可以减少计算量。

        4、在Flink中做排序时,需要考虑计算代价的问题,一般使用的排序的字段是时间字段。

  1. SET 'execution.runtime-mode' = 'streaming';
  2. -- 创建kafka 表
  3. CREATE TABLE students_kafka (
  4. sid STRING,
  5. name STRING,
  6. age INT,
  7. sex STRING,
  8. clazz STRING,
  9. proctime as PROCTIME()
  10. ) WITH (
  11. 'connector' = 'kafka',
  12. 'topic' = 'students', -- 数据的topic
  13. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  14. 'properties.group.id' = 'testGroup', -- 消费者组
  15. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  16. 'format' = 'csv' -- 读取数据的格式
  17. );
  18. -- 在流处理模式下,flink只能按照时间字段进行升序排序
  19. -- 如果按照一个普通字段进行排序,在流处理模式下,每来一条新的数据都需重新计算之前的顺序,计算代价太大
  20. -- 在row_number基础上增加条件,可以限制计算的代价不断增加
  21. select * from (
  22. select
  23. *,
  24. row_number() over(partition by clazz order by age desc) as r
  25. from
  26. students_kafka
  27. )
  28. where r <= 2;
  29. -- 在流处理模式下,flink只能按照时间字段进行升序排序
  30. select
  31. *,
  32. sum(age) over(partition by clazz order by proctime)
  33. from
  34. students_kafka
  35. -- 时间边界
  36. -- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
  37. select
  38. *,
  39. sum(age) over(
  40. partition by clazz
  41. order by proctime
  42. -- 统计最近10秒的数据
  43. RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
  44. )
  45. from
  46. students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
  47. -- 数据边界
  48. --ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
  49. select
  50. *,
  51. sum(age) over(
  52. partition by clazz
  53. order by proctime
  54. -- 统计最近10秒的数据
  55. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
  56. )
  57. from
  58. students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
  59. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  60. 1500100003,tom,22,女,理科六班
3、Order  By:

在使用order by进行排序的时候,排序的字段中必须使用到时间字段:

  1. -- 排序字段必须带上时间升序排序,使用到时间字段:proctime
  2. select * from
  3. students_kafka
  4. order by proctime,age;
  5. -- 限制排序的计算代价,避免全局排序,在使用限制的时候,在做排序的时候,就只需要对限制的进行排序,减少了计算的代价。
  6. select *
  7. from
  8. students_kafka
  9. order by age
  10. limit 10;
4、row_number去重
  1. CREATE TABLE students_kafka (
  2. sid STRING,
  3. name STRING,
  4. age INT,
  5. sex STRING,
  6. clazz STRING,
  7. proctime as PROCTIME()
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'students', -- 数据的topic
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  12. 'properties.group.id' = 'testGroup', -- 消费者组
  13. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  14. 'format' = 'csv' -- 读取数据的格式
  15. );
  16. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  17. 1500100003,tom,22,女,理科六班
  18. select * from (
  19. select
  20. sid,name,age,
  21. row_number() over(partition by sid order by proctime) as r
  22. from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */
  23. )
  24. where r = 1;
5、JOIN

Regular Joins: 主要用于批处理,如果在流处理上使用,状态会越来越大

Interval Join: 主要用于双流join

Temporal Joins:用于流表关联时态表(不同时间状态不一样,比如汇率表)

Lookup Join:用于流表关联维表(不怎么变化的表)

        1、Regular Joins
                1、批处理:
  1. CREATE TABLE students_hdfs_batch (
  2. sid STRING,
  3. name STRING,
  4. age INT,
  5. sex STRING,
  6. clazz STRING
  7. )WITH (
  8. 'connector' = 'filesystem', -- 必选:指定连接器类型
  9. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
  10. 'format' = 'csv' -- 必选:文件系统连接器指定 format
  11. );
  12. CREATE TABLE score_hdfs_batch (
  13. sid STRING,
  14. cid STRING,
  15. score INT
  16. )WITH (
  17. 'connector' = 'filesystem', -- 必选:指定连接器类型
  18. 'path' = 'hdfs://master:9000/data/score', -- 必选:指定路径
  19. 'format' = 'csv' -- 必选:文件系统连接器指定 format
  20. );
  21. SET 'execution.runtime-mode' = 'batch';
  22. -- inner join
  23. select a.sid,a.name,b.score from
  24. students_hdfs_batch as a
  25. inner join
  26. score_hdfs_batch as b
  27. on a.sid=b.sid;
  28. -- left join
  29. select a.sid,a.name,b.score from
  30. students_hdfs_batch as a
  31. left join
  32. score_hdfs_batch as b
  33. on a.sid=b.sid;
  34. -- full join
  35. select a.sid,a.name,b.score from
  36. students_hdfs_batch as a
  37. full join
  38. score_hdfs_batch as b
  39. on a.sid=b.sid;
        2、流处理:
  1. CREATE TABLE students_kafka (
  2. sid STRING,
  3. name STRING,
  4. age INT,
  5. sex STRING,
  6. clazz STRING
  7. )WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'students', -- 数据的topic
  10. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  11. 'properties.group.id' = 'testGroup', -- 消费者组
  12. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  13. 'format' = 'csv', -- 读取数据的格式
  14. 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
  15. );
  16. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  17. 1500100001,tom,22,女,文科六班
  18. 1500100002,tom1,24,男,文科六班
  19. 1500100003,tom2,22,女,理科六班
  20. CREATE TABLE score_kafka (
  21. sid STRING,
  22. cid STRING,
  23. score INT
  24. )WITH (
  25. 'connector' = 'kafka',
  26. 'topic' = 'scores', -- 数据的topic
  27. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  28. 'properties.group.id' = 'testGroup', -- 消费者组
  29. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  30. 'format' = 'csv', -- 读取数据的格式
  31. 'csv.ignore-parse-errors' = 'true'
  32. );
  33. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  34. 1500100001,1000001,98
  35. 1500100001,1000002,5
  36. 1500100001,1000003,137
  37. SET 'execution.runtime-mode' = 'streaming';
  38. -- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大
  39. -- 可以设置状态有效期避免状态无限增大
  40. SET 'table.exec.state.ttl' = '5000';
  41. -- full join
  42. select a.sid,b.sid,a.name,b.score from
  43. students_kafka as a
  44. full join
  45. score_kafka as b
  46. on a.sid=b.sid;
注意:以为在使用流处理的join的时候,首先流处理模式中,会将两张表中的实时数据存入当状态中

假设:前提是流处理模式,需要将两张实时的表中的姓名和成绩关联在一起,此时使用到join,当过了很长一段时间假设是一年,依旧可以将学生姓名和成绩关联在一起,原因就是之前的数据都会存储在状态中,但是也会产生问题,随着时间的推移,状态中的数据会越来越多。可能会导致任务失败。

可以通过参数指定保存状态的时间,时间一过,状态就会消失,数据就不存在:

  1. -- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大
  2. -- 可以设置状态有效期避免状态无限增大
  3. SET 'table.exec.state.ttl' = '5000';
  4. 'csv.ignore-parse-errors' = 'true'
  5. -- 如果数据解析异常自动跳过当前行
2、Interval Join

两个表在join时只关联一段时间内的数据,之前的数据就不需要保存在状态中,可以避免状态无限增大

  1. CREATE TABLE students_kafka_time (
  2. sid STRING,
  3. name STRING,
  4. age INT,
  5. sex STRING,
  6. clazz STRING,
  7. ts TIMESTAMP(3),
  8. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
  9. )WITH (
  10. 'connector' = 'kafka',
  11. 'topic' = 'students', -- 数据的topic
  12. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  13. 'properties.group.id' = 'testGroup', -- 消费者组
  14. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  15. 'format' = 'csv', -- 读取数据的格式
  16. 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
  17. );
  18. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
  19. 1500100001,tom,22,女,文科六班,2023-11-10 17:10:10
  20. 1500100001,tom1,24,男,文科六班,2023-11-10 17:10:11
  21. 1500100001,tom2,22,女,理科六班,2023-11-10 17:10:12
  22. CREATE TABLE score_kafka_time (
  23. sid STRING,
  24. cid STRING,
  25. score INT,
  26. ts TIMESTAMP(3),
  27. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
  28. )WITH (
  29. 'connector' = 'kafka',
  30. 'topic' = 'scores', -- 数据的topic
  31. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  32. 'properties.group.id' = 'testGroup', -- 消费者组
  33. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  34. 'format' = 'csv', -- 读取数据的格式
  35. 'csv.ignore-parse-errors' = 'true'
  36. );
  37. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  38. 1500100001,1000001,98,2023-11-10 17:10:09
  39. 1500100001,1000002,5,2023-11-10 17:10:11
  40. 1500100001,1000003,137,2023-11-10 17:10:12
  41. -- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
  42. -- a表数据的时间需要在b表数据的时间减去5秒到b表数据时间的范围内
  43. SELECT a.sid,b.sid,a.name,b.score
  44. FROM students_kafka_time a, score_kafka_time b
  45. WHERE a.sid = b.sid
  46. AND a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
3、Temporal Joins

        1、用于流表关联时态表,比如订单表和汇率表的关联

        2、每一个时间数据都会存在不同的状态,如果只是用普通的关联,之恶能关联到最新的数

  1. -- 订单表
  2. CREATE TABLE orders (
  3. order_id STRING, -- 订单编号
  4. price DECIMAL(32,2), --订单金额
  5. currency STRING, -- 汇率编号
  6. order_time TIMESTAMP(3), -- 订单时间
  7. WATERMARK FOR order_time AS order_time -- 水位线
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'orders', -- 数据的topic
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  12. 'properties.group.id' = 'testGroup', -- 消费者组
  13. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  14. 'format' = 'csv' -- 读取数据的格式
  15. );
  16. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
  17. 001,100,CN,2023-11-11 09:48:10
  18. 002,200,CN,2023-11-11 09:48:11
  19. 003,300,CN,2023-11-11 09:48:14
  20. 004,400,CN,2023-11-11 09:48:16
  21. 005,500,CN,2023-11-11 09:48:18
  22. -- 汇率表
  23. CREATE TABLE currency_rates (
  24. currency STRING, -- 汇率编号
  25. conversion_rate DECIMAL(32, 2), -- 汇率
  26. update_time TIMESTAMP(3), -- 汇率更新时间
  27. WATERMARK FOR update_time AS update_time, -- 水位线
  28. PRIMARY KEY(currency) NOT ENFORCED -- 主键
  29. ) WITH (
  30. 'connector' = 'kafka',
  31. 'topic' = 'currency_rates', -- 数据的topic
  32. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  33. 'properties.group.id' = 'testGroup', -- 消费者组
  34. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  35. 'format' = 'canal-json' -- 读取数据的格式
  36. );
  37. insert into currency_rates
  38. values
  39. ('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'),
  40. ('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'),
  41. ('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'),
  42. ('CN',7.4,TIMESTAMP'2023-11-11 09:48:20');
  43. kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates
  44. -- 如果使用常规关联方式,取的时最新的汇率,不是对应时间的汇率
  45. select a.order_id,b.* from
  46. orders as a
  47. left join
  48. currency_rates as b
  49. on a.currency=b.currency;
  50. -- 时态表join
  51. -- FOR SYSTEM_TIME AS OF orders.order_time: 使用订单表的时间到汇率表中查询对应时间的数据
  52. SELECT
  53. order_id,
  54. price,
  55. conversion_rate,
  56. order_time
  57. FROM orders
  58. LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
  59. ON orders.currency = currency_rates.currency;

 4、Look Join:主要是用来关联维度表。维度表:指的是数据不怎么变化的表。

        1、传统的方式是将数据库中的数据都读取到流表中,当来一条数据就会取关联一条数据。如果数据库中学生表更新了,flink不知道,关联不到最新的数据。

        2、Look Join使用的原理:是当流表中的数据发生改变的时候,就会使用关联字段维表的数据源中查询数据。

优化:

        在使用的时候可以使用缓存,将数据进行缓存,但是随着时间的推移,缓存的数量就会越来大,此时就可以对缓存设置一个过期时间。可以在建表的时候设置参数:

  1. 'lookup.cache.max-rows' = '1000', -- 缓存的最大行数
  2. 'lookup.cache.ttl' = '20000' -- 缓存过期时间
  1. -- 学生表
  2. CREATE TABLE students_jdbc (
  3. id BIGINT,
  4. name STRING,
  5. age BIGINT,
  6. gender STRING,
  7. clazz STRING,
  8. PRIMARY KEY (id) NOT ENFORCED -- 主键
  9. ) WITH (
  10. 'connector' = 'jdbc',
  11. 'url' = 'jdbc:mysql://master:3306/student',
  12. 'table-name' = 'students',
  13. 'username' ='root',
  14. 'password' ='123456',
  15. 'lookup.cache.max-rows' = '1000', -- 缓存的最大行数
  16. 'lookup.cache.ttl' = '20000' -- 缓存过期时间
  17. );
  18. -- 分数表
  19. CREATE TABLE score_kafka (
  20. sid BIGINT,
  21. cid STRING,
  22. score INT,
  23. proc_time as PROCTIME()
  24. )WITH (
  25. 'connector' = 'kafka',
  26. 'topic' = 'scores', -- 数据的topic
  27. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  28. 'properties.group.id' = 'testGroup', -- 消费者组
  29. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  30. 'format' = 'csv', -- 读取数据的格式
  31. 'csv.ignore-parse-errors' = 'true'
  32. );
  33. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
  34. 1500100001,1000001,98
  35. 1500100001,1000002,5
  36. 1500100001,1000003,137
  37. -- 使用常规关联方式,关联维度表
  38. -- 1、任务在启动的时候会将维表加载到flink 的状态中,如果数据库中学生表更新了,flink不知道,关联不到最新的数据
  39. select
  40. b.id,b.name,a.score
  41. from
  42. score_kafka as a
  43. left join
  44. students_jdbc as b
  45. on a.sid=b.id;
  46. -- lookup join
  47. -- FOR SYSTEM_TIME AS OF a.proc_time : 使用关联字段到维表中查询最新的数据
  48. -- 优点: 流表每来一条数据都会去mysql中查询,可以关联到最新的数据
  49. -- 每次查询mysql会降低性能
  50. select
  51. b.id,b.name,a.score
  52. from
  53. score_kafka as a
  54. left join
  55. students_jdbc FOR SYSTEM_TIME AS OF a.proc_time as b
  56. on a.sid=b.id;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/520115
推荐阅读
相关标签
  

闽ICP备14008679号