当前位置:   article > 正文

FlinkSQL中的窗口_flinksql窗口函数怎么用

flinksql窗口函数怎么用

多维分析

需求:有一张test表,表的字段为:A, B, C, amount, 其中A, B, C为维度字段,求以三个维度任意组合,统计sum(amount)

  1. Union方案:
    • A, B, C的任意组合共有8种,分别为(A, B,C,AB,AC,BC,ABC,空集)
    • 然后每种类型的个数也不一样,需要补足空白的字段
    • 将每种类型进行group by+ sum 求和后Union在一起
  2. Flink方案
    • 前面的语法一样
    • group by grouping sets (A,B,C,(A,B),(A,C),(B,C),(A,B,C),())
    • group by cube(A,B,C)
    • roll up (A,B,C) ⇒ ((A,B,C), (A,B),(A),())
  3. Hive提供的Grouping Sets:

窗口

  1. 分组窗口 groupWindow

    • 分类
      • 滚动窗口
      • 滑动窗口Hop Windows
      • 会话窗口
  2. 窗口表值函数 window TVF(支持topN)

    • 滚动窗口
    • 滑动窗口Hop Windows
    • 累积窗口Cumulate Windows
    • 会话窗口Sesssion Windows(不支持)
  3. 开窗函数 over

  4. API的用法:

    • 计数窗口(SQL中不支持计数窗口)
      • 计数滚动:Tumble.over(rowInterval(5L)).on($("处理时间")).as("w")
      • 计数滑动(窗口的首次计算必须达到窗口大小):Slide.over(rowInterval(5L)).every(rowInterval(3L)).on($("pt")).as("w")
      • 使用窗口:table.window(w1).groupBy($("w"), $("id")).select($("id"), $("vc")).execute().print();
    • 时间窗口:
      • 滚动Tumble.over(lit(5).seconds()).on($(“pt”)).as(""w);
      • 滑动Slide.over(lit(5).seconds()).every(lit(3).seconds()).as(“w”);
      • 会话:Sessino.withGap(lit(3).seconds()).on($(“pt”)).as(“w”);
      • 使用窗口: table.window(w7).groupBy( ( " w " ) , ("w"), ("w"),(“id”)).select()
  5. SQL的用法:

//滚动时间窗口
select 
	id,
	tumble_start(pt,interval '5' second) as wStart,
	tumble_end(pt,interval '5' second) as wEnd,
	sum(vc) sumvc
from t1
group by 
	tumble(et,interval '5' second),id;
	
//滑动时间窗口
select
	id,
	hop_start(pt,interval '3' second, interval '5' second) as wStart,
	hop_end(pt,interval '3' second, interval '5' second) as wEnd,
	sum(vc) svc
from t1
group by 
	hop(et,interval '5' second,)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  1. WindowTVF窗口表值函数(只有SQL形式)
//滚动窗口
select 
	window_start,
	window_end,
	SUM(price)
From
	Table(
		tumble(table t1,descriptor(pt)),//事件时间改为et
		interval '5' second
	)
	group by window_start, window_end, id;

//滑动窗口(窗口大小必须是滑动步长的整数倍)
select 
	window_start,
	window_end,
	SUM(price)
From
	Table(
		hop(table t1,descriptor(pt)),
		interval '3' second,//滑动步长
		interval '6' second//窗口大小
	)
	group by window_start, window_end, id;


//累积窗口(统计类似0~1,0~2,0~3这样的窗口/)
select 
	window_start,
	window_end,
	SUM(price)
From
	Table(
		cumulate(table t1,descriptor(pt)),
		interval '2' second,//步长,一般为小时
		interval '10' second//每一轮的大小,一般为一天
	)
	group by window_start, window_end, id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. Over聚合函数(划定一个范围,对窗口内的每条数据都做统计)
    • SQL语法:over(partition by t1 order by t2 )
    • API语法
      • 定义窗口(无法指定下无边界,流式数据无法明确下边界)Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_row).follow(current_row).as("w");
      • 定义上两行到当前行:Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(rowinterval(2L)).follow(current_row).as("w");
      • 基于时间,上无边界到当前时间:Over.partitionBy($("id")).orderBy( $ ("pt")).preceding(unbounded_range).follow(current_range).as("w");
      • 上两秒到当前时间:Over.partitionBy( $ ("id")).orderBy( $ ("pt")).preceding(lit(2).second()).follow(current_range).as("w");
      • 使用窗口sum().over( $ ("w1"))
    • SQL 语法
//上无边界到当前行
select
	id,
	vc,
	sum(vc) over (partition by id 
	order by pt 
	rows between unbounded preceding and
	current row ) sumvc
from t1;

//上两行到当前行

//上无边界到当前时间

//上两秒到当前时间

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

TopN

窗口表值函数 + over窗口实现

  1. 统计用来排名的数值(点击次数)和窗口时间信息
//统计每个user的点击次数
select 
	user,
	count(*) cnt,
	window_start,
	window_end
from Table(
	tumble(talbe t1, descriptor(et), interval '10' second)
)
group by window_start, window_end,user;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 按照点击次数排名(按照窗口结束时间分区,再排名,目前Flink1.17只支持row_number函数)
    • 原本order by 后面只能是时间字段,且只能是升序
    • 如果FLink能够识别当前操作是TopN的情况下,支持在order by后面出现非时间字段
(select
	user,
	cnt,
	row_number() over(partition by window_start,window_end 
	order by cnt desc ) rk
from t2) t3

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 取TopN,进行where过滤 where row_num <= N, 这段代码是识别为TopN查询的关键.
select
	user,
	cnt,
	rk
from t3
where rk <= 3;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

实际上,所有代码可以合并为一个整体:
在这里插入图片描述

去重

TopN的特殊写法,根据主键开窗,只取where row_num = 1的数据,即能达到对重复数据进行去重的效果。

需求:统计每个窗口中每个url最后到达的数据

(select 
	url,
	ts,
	window_start,
	window_end
from ) as t1

//按照窗口的开始时间和结束时间,url进行分区,通过时间排序,求排名
(select
	url,
	ts,
	window_start,
	window_end,
	row_number(partition by window_start, window_end, url order by ts desc) rk
from t1;) as t2

// 取rk = 1
select 
	url,
	ts,
	window_end
from t2
where rk = 1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/639018
推荐阅读
相关标签
  

闽ICP备14008679号