当前位置:   article > 正文

Flink实例(129):状态管理(十八)Table API 和 SQL 模块状态管理(三) Flink SQL空闲状态保留时间(idle state retention time)实现原理_setidlestateretention

setidlestateretention

为什么要设置

  如果我们在数据流上进行分组查询,分组处理产生的结果(不仅仅是聚合结果)会作为中间状态存储下来。随着分组key的不断增加,状态自然也会不断膨胀。但是这些状态数据基本都有时效性,不必永久保留。例如,使用Top-N语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。Flink SQL提供的idle state retention time特性可以保证当状态中某个key对应的数据未更新的时间达到阈值时,该条状态被自动清理。设置方法是:

stenv.getConfig().setIdleStateRetentionTime(Time.hours(24), Time.hours(36))

注意setIdleStateRetentionTime()方法需要传入两个参数:状态的最小保留时间minRetentionTime和最大保留时间maxRetentionTime(根据实际业务决定),且两者至少相差5分钟。为什么会有这种限制呢?看一下源码就知道了。

如何实现的

idle state retention time特性在底层以o.a.f.table.runtime.functions.CleanupState接口来表示,代码如下。

  1. public interface CleanupState {
  2. default void registerProcessingCleanupTimer(
  3. ValueState<Long> cleanupTimeState,
  4. long currentTime,
  5. long minRetentionTime,
  6. long maxRetentionTime,
  7. TimerService timerService)
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/462283?site
推荐阅读
相关标签
  

闽ICP备14008679号