当前位置:   article > 正文

Flink SQl 语法(hint,with,select,分组窗口聚合,时间属性(处理,事件))

lateral table
1、查询语句
1、hint

在对表进行查询的是偶动态修改表的属性

  1. -- 创建表
  2. CREATE TABLE word (
  3. lines STRING
  4. )
  5. WITH (
  6. 'connector' = 'kafka',
  7. 'topic' = 'word',
  8. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  9. 'properties.group.id' = 'testGroup',
  10. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  11. 'format' = 'csv',
  12. 'csv.field-delimiter'='\t'
  13. )
  14. -- 加载hive函数
  15. LOAD MODULE hive WITH ('hive-version' = '1.2.1');
  16. --统计单词的数量
  17. --不动态指定开始读取的参数
  18. select word,count(1) from
  19. word,
  20. lateral table(explode(split(lines,','))) as t(word)
  21. group by word
  22. -- OPTIONS 动态指定参数
  23. select word,count(1) from
  24. word /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ,
  25. lateral table(explode(split(lines,','))) as t(word)
  26. group by word

3、WITH
  1. -- temp可以在后面的sql中使用多次
  2. with temp as (
  3. select word from word,
  4. lateral table(explode(split(lines,','))) as t(word)
  5. )
  6. select * from temp
  7. union all
  8. select * from temp

4、SELECT
  1. SELECT order_id, price
  2. FROM
  3. (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)

5、分组窗口聚合

老版本语法,新版本中不推荐使用

  1. -- PROCTIME(): 获取处理时间的函数
  2. CREATE TABLE words_window (
  3. lines STRING,
  4. proc_time as PROCTIME()
  5. ) WITH (
  6. 'connector' = 'kafka',
  7. 'topic' = 'words',
  8. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  9. 'properties.group.id' = 'testGroup',
  10. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  11. 'format' = 'csv',
  12. 'csv.field-delimiter'='\t'
  13. )
  14. -- TUMBLE:滚动窗口
  15. -- HOP": 滑动黄口
  16. -- SESSION: 会话窗口
  17. --TUMBLE:处理时间的滑动窗口
  18. select
  19. word,
  20. TUMBLE_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
  21. TUMBLE_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
  22. count(1) as c
  23. from
  24. words_window,
  25. lateral table(explode(split(lines,','))) as t(word)
  26. group by
  27. word,
  28. TUMBLE(proc_time, INTERVAL '5' SECOND) -- 每5秒计算一次

  • 会话窗口

    一段时间没有数据开始计算

    暂时只能在老板本api中使用

  1. CREATE TABLE words_window (
  2. lines STRING,
  3. proc_time as PROCTIME()
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words',
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  8. 'properties.group.id' = 'testGroup',
  9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  10. 'format' = 'csv',
  11. 'csv.field-delimiter'='\t'
  12. )
  13. select
  14. word,
  15. SESSION_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
  16. SESSION_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口结束使时间
  17. count(1) as c
  18. from
  19. words_window,
  20. lateral table(explode(split(lines,','))) as t(word)
  21. group by
  22. word,
  23. SESSION(proc_time, INTERVAL '5' SECOND) -- 会话超过5秒中没有发送消息,就开始进行计算

6、TVFs(重点)
  • 滚动窗口函数
  1. CREATE TABLE words_window (
  2. lines STRING,
  3. proc_time as PROCTIME()
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words',
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  8. 'properties.group.id' = 'testGroup',
  9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  10. 'format' = 'csv',
  11. 'csv.field-delimiter'='\t'
  12. )
  13. -- TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
  14. -- TUMBLE: 窗口函数,可以给原表增加床i偶开始时间,窗口的结束时间,窗口时间
  15. -- TABLE words_window : 指定原表
  16. -- DESCRIPTOR(proc_time) 指定时间字段,可以处理时间,也可以是事件时间
  17. -- INTERVAL '5' SECOND 指定窗口大小
  18. SELECT lines,proc_time,window_start,window_end,window_time FROM TABLE(
  19. TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
  20. );
  21. -- 在划分和窗口之后进行聚合计算
  22. SELECT word,window_start,count(1) as c FROM
  23. TABLE(
  24. TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
  25. ),
  26. lateral table(explode(split(lines,','))) as t(word)
  27. group by word,window_start
  • 滑动窗口函数

    一条数据会出现在多个窗口中,所以输入一条数据,会输出多条数据

  1. CREATE TABLE words_window (
  2. lines STRING,
  3. proc_time as PROCTIME()
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words',
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  8. 'properties.group.id' = 'testGroup',
  9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  10. 'format' = 'csv',
  11. 'csv.field-delimiter'='\t'
  12. )
  13. -- HOP: 滑动窗口函数,需要指定窗口大小和滑动时间
  14. -- 输入一条数据会输出多条数据
  15. with temp as (
  16. select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */
  17. )
  18. SELECT * FROM
  19. TABLE(
  20. HOP(TABLE temp , DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)
  21. )
  22. ;
  23. -- 窗口止呕进行聚合
  24. with temp as (
  25. select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */
  26. )
  27. SELECT word ,window_start,count(1) as c FROM
  28. TABLE(
  29. HOP(TABLE temp, DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)),
  30. lateral table(explode(split(lines,','))) as t(word)
  31. group by word,window_start
  32. ;

7、时间属性

1、处理时间

使用PROCTIME()函数给表增加一个时间字段

  1. CREATE TABLE student_kafka_proc_time (
  2. id STRING,
  3. name STRING,
  4. age INT,
  5. gender STRING,
  6. clazz STRING,
  7. proc as PROCTIME() -- 处理时间字段
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'student',
  11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  12. 'properties.group.id' = 'testGroup',
  13. 'scan.startup.mode' = 'earliest-offset',
  14. 'format' = 'csv',
  15. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
  16. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
  17. 'csv.allow-comments'='true'--跳过#注释行
  18. )
  19. -- 使用处理时间可以做窗口统计
  20. SELECT clazz,window_start,count(1) as c FROM
  21. TABLE(
  22. TUMBLE(TABLE student_kafka_proc_time, DESCRIPTOR(proc), INTERVAL '5' SECOND)
  23. )
  24. group by clazz,window_start

2、事件时间
  • 测试数据

    1. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:10
    2. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:11
    3. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:12
    4. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:20
    5. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:15
    6. 1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:25
  • 创建表指定时间字段和水位线

    1. -- TIMESTAMP(3) flink的时间戳类型
    2. -- ts - INTERVAL '5' SECOND 水位线前移5秒
    3. CREATE TABLE student_kafka_event_time (
    4. id STRING,
    5. name STRING,
    6. age INT,
    7. gender STRING,
    8. clazz STRING,
    9. ts TIMESTAMP(3),
    10. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 指定时间字段和水位线
    11. ) WITH (
    12. 'connector' = 'kafka',
    13. 'topic' = 'student_event_time',
    14. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    15. 'properties.group.id' = 'testGroup',
    16. 'scan.startup.mode' = 'earliest-offset',
    17. 'format' = 'csv'
    18. )
    19. -- 使用事件时间 做窗口函数统计
    20. -- 每一条数据都会计算出一个结果,会取更新之前已经输出的结果
    21. -- 不存在数据丢失问题
    22. -- 需要将统计结果保存在状态中
    23. SELECT clazz,window_start,count(1) as c FROM
    24. TABLE(
    25. TUMBLE(TABLE student_kafka_event_time, DESCRIPTOR(ts), INTERVAL '5' SECOND)
    26. )
    27. group by clazz,window_start
    28. -- 分钟窗口统计
    29. -- 如果数据乱序可能会丢失数据
    30. -- 不需要将统计的结果保存在状态中
    31. select
    32. clazz,
    33. TUMBLE_START(ts, INTERVAL '5' SECOND) as s, -- 窗口开始时间
    34. TUMBLE_END(ts, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
    35. count(1) as c
    36. from
    37. student_kafka_event_time
    38. group by
    39. clazz,
    40. TUMBLE(ts, INTERVAL '5' SECOND) -- 没4秒计算一次
    41. -- 生产数据
    42. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_event_time

练习

统计单词的数量,
每隔5秒统计一次
每个窗口中取单词数量最多个两个单词

  1. CREATE TABLE words_window_demo (
  2. lines STRING,
  3. proc_time as PROCTIME()
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'words',
  7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  8. 'properties.group.id' = 'testGroup',
  9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
  10. 'format' = 'csv',
  11. 'csv.field-delimiter'='\t'
  12. )
  13. -- 在夫林卡 sql 流处理中row_number()必须要取topN
  14. select * from (
  15. select
  16. word,
  17. window_start,
  18. c,
  19. row_number() over(partition by window_start order by c desc) as r
  20. from (
  21. select word,window_start,count(1) as c from
  22. TABLE(
  23. TUMBLE(TABLE words_window_demo, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
  24. ),
  25. lateral table(explode(split(lines,','))) as t(word)
  26. group by word,window_start
  27. ) as a
  28. ) as b
  29. where r <= 2
  • 统计每个城市中每个区县的车流量
  • 每隔5分钟统计一次,统计最近15分钟的数据
  • 每个城市中取车流量最大的前2个区县
  • 将统计好的结果保存到数据库中
  1. -- 数据
  2. {
  3. "car": "皖AK0H90",
  4. "city_code": "340100",
  5. "county_code": "340111",
  6. "card": 117303031813010,
  7. "camera_id": "00004",
  8. "orientation": "北",
  9. "road_id": 34130440,
  10. "time": 1614799929,
  11. "speed": 84.51
  12. }
  13. -- TIMESTAMP(3) flink的时间戳类型
  14. -- ts - INTERVAL '5' SECOND 水位线前移5秒
  15. -- 创建表读取kafka中的json数据
  16. CREATE TABLE cars_kafka_event_time (
  17. car STRING,
  18. city_code STRING,
  19. county_code STRING,
  20. card BIGINT,
  21. camera_id STRING,
  22. orientation STRING,
  23. road_id BIGINT,
  24. `time` BIGINT,
  25. speed DOUBLE,
  26. ts_ltz AS TO_TIMESTAMP_LTZ(`time`, 3),
  27. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 指定时间字段和水位线
  28. ) WITH (
  29. 'connector' = 'kafka',
  30. 'topic' = 'car_test',
  31. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  32. 'properties.group.id' = 'carGroup',
  33. 'scan.startup.mode' = 'earliest-offset',
  34. 'format' = 'json'
  35. )
  36. -- 测试一下是否存在数据
  37. select * from cars_kafka_event_time
  38. -- 统计每个城市中每个区县的车流量,每隔5分钟统计一次,统计最近15分钟的数据,每个城市中取车流量最大的前2个区县
  39. select *
  40. from (
  41. select
  42. county_code
  43. ,city_code
  44. ,window_start
  45. , c
  46. ,row_number() over(partition by window_start order by c desc) as r
  47. from
  48. (
  49. with temp as (
  50. select * from cars_kafka_event_time /*+ OPTIONS('scan.startup.mode'='latest-offset') */
  51. )
  52. SELECT
  53. county_code
  54. ,city_code
  55. ,window_start
  56. ,count(1) as c
  57. FROM
  58. TABLE(
  59. HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND))
  60. group by county_code,city_code,window_start
  61. ) as b ) as h
  62. where r <= 2;
  63. -- 创建mysql的sink表
  64. CREATE TABLE clazz_num_mysql (
  65. country_city_r_count STRING,
  66. window_start STRING,
  67. PRIMARY KEY (country_city_r_count) NOT ENFORCED -- 按照主键更新数据
  68. ) WITH (
  69. 'connector' = 'jdbc',
  70. 'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
  71. 'table-name' = 'city_top_2', -- 需要手动到数据库中创建表
  72. 'username' = 'root',
  73. 'password' = '123456'
  74. );
  75. -- 发送到mysql中
  76. insert into clazz_num_mysql
  77. select concat_ws('_',county_code,city_code,r,c) country_city_r_count ,window_start
  78. from (
  79. select
  80. cast(county_code as STRING) county_code
  81. ,cast(city_code as STRING) city_code
  82. ,cast(window_start as STRING) window_start
  83. ,cast(c as STRING) c
  84. ,cast(row_number() over(partition by window_start order by c desc) as STRING) as r
  85. from
  86. (
  87. with temp as (
  88. select * from cars_kafka_event_time
  89. )
  90. SELECT
  91. county_code
  92. ,city_code
  93. ,window_start
  94. ,count(1) as c
  95. FROM
  96. TABLE(
  97. HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND))
  98. group by county_code,city_code,window_start
  99. ) as b ) as h
  100. where r <= 2;
  101. -- mysql 中的查询方法如下(笨方法)
  102. select SUBSTRING_INDEX(country_city_r_count,'_',1) as country ,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',2),'_',1)as city,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',3) ,'_',-1) as topn , SUBSTRING_INDEX(country_city_r_count,'_',-1) as count_car ,window_start from city_top_2

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/639088
推荐阅读
相关标签
  

闽ICP备14008679号