当前位置:   article > 正文

flink sql clinet 实战:窗口函数----flink-1.13.6_hop定义滑动窗口

hop定义滑动窗口

1、造模拟数据

2、创建kafka

详细的kafka connector 详见官网

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    `ts` TIMESTAMP(3),
	proctime as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
	'properties.bootstrap.servers' = 'chb1:9092',
	'properties.group.id' = 'testGroup',
	'format' = 'csv'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

三、窗口函数 Windowing table-valued functions (Windowing TVFs)

官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/

窗口函数分类:

  • Tumble Windows
  • Hop Windows
  • Cumulate Windows
  • Session Windows (will be supported soon)

窗口 TVF 的返回值是一个新的关系,它包括原关系的所有列,以及另外3个列“window_start”、“window_end”、“window_time”,以指示分配的窗口。

3.1、滚动窗口 (TUMBLE)

每个窗口没有重叠,窗口长度固定
在这里插入图片描述

滚动窗口的函数格式:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
- data 表名
- timecol 时间属性字段
- 时间窗口的大小  INTERVAL '10' MINUTES
  • 1
  • 2
  • 3
  • 4

测试案例
表结构

Flink SQL> desc user_log;
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
|        name |                        type |  null | key |        extras |                  watermark |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
|     user_id |                      STRING |  true |     |               |                            |
|     item_id |                      STRING |  true |     |               |                            |
| category_id |                      STRING |  true |     |               |                            |
|    behavior |                      STRING |  true |     |               |                            |
|          ts |      TIMESTAMP(3) *ROWTIME* |  true |     |               | `ts` - INTERVAL '5' SECOND |
|    proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false |     | AS PROCTIME() |                            |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
6 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1、窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区

Flink SQL>  SELECT * FROM TABLE(
   TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES));
   
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[1 min]).
 Please use window table-valued function with aggregate together using window_start and window_end as group keys.


-- 按照 window_start、window_end 分区

SELECT window_start, window_end, count(user_id) 
	FROM TABLE(TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
	GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述

3.2、滑动窗口(HOP)

相对于滚动窗口,滑动窗口多一个 slide 参数,用于表示每个窗口滑动大小
在这里插入图片描述
滑动窗口函数表达式:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data- timecol 时间字段
- slide 滑动长度
- size 窗口大小
  • 1
  • 2
  • 3
  • 4
  • 5

测试案例:
一个滑动窗口,窗口长度1分钟,滑动距离30秒

SELECT window_start, window_end, count(1)
  FROM TABLE(
    HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

3.3、累计窗口(CUMULATE )

Cumulate window 就是累计窗口,简单来说,以下图里面时间轴上的一个区间为窗口步长(step)。
第一个 window 统计的是一个区间的数据;

第二个 window 统计的是第一区间和第二个区间的数据;

第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。

累积计算在业务场景中非常常见,如累积 UV 场景。在 UV 大盘曲线中:我们每隔 10 分钟统计一次当天累积用户 UV。

在这里插入图片描述

累计窗口表达式:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data- timecol 时间字段
- step 步长
- size 窗口大小
  • 1
  • 2
  • 3
  • 4
  • 5

每30秒,统一最近一分钟的pv

SELECT window_start, window_end, count(1)
  FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

累加窗口:(1 MINUTE,1 DAY) 按照1分钟划分窗口,每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
按照 订单数据事件时间+水位线 进行窗口触发执行

SELECT window_start, window_end, count(1)
  FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' DAYS))
  GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号