赞
踩
State 作为 Flink 的基础设施,用于存储流计算计算节点的中间结果。如一个聚合计算的任务,先将历史的计算结果写入 state,当来了新数据时,再读取状态中的数据,更新后再写入 state。
对于一个能在生产环境稳定使用的 State,它需要解决以下问题:
1. 低延时地读写;
2. 可靠、高可用,提供数据 Exactly Once 的语义,失败可恢复;
3. 数据可以分配在 subtask 中,任务中断修改并行度后,任务依然可以恢复,并能重新分配至新的 subtask 中;
4. 可扩展,大状态场景下,依然能够稳定运行。
一般认为 operator state 的数据规模是比较小的;keyed state 规模相对比较大。在实现上 flink 也是这么做的,operator state backend 仅有一种 on-heap 的实现,因此 operator state 过大容易导致任务 OOM。
每种状态都有托管状态(managed state)和原始状态(raw state)两种。
在任务因为流量变更,需要调整资源时,用户可以通过 savepoint 停止任务,修改并行度后,再重新启动恢复任务。如下图所示:
上图分别展示了任务变更并行度时,无 KeyGroup 和有 KeyGroup 时,状态如何在子任务中横向扩展。
其中一共有key:0~19 ,任务并行度从3改为4。
上述问题可以通过 KeyGroup 来缓解。
KeyGroup 在一定程度上缓解了上述问题,但 KeyGroupNum 变化时,问题依然存在。
Operator State:
Keyed State:
其中:绿色表示所创建的operator/keyed state backend 是 on-heap 的,黄色则表示是 off-heap 的
env.setStateBackend(StateBackend)
env.getCheckpointConfig()
.setCheckpointStorage(CheckpointStorage)
图中展示了 RocksDB StateBackend 在 Flink 引擎中的位置。
它是一个本地数据库,数据由 RocksDB 负责在内存和磁盘中调度。
它是基于 Google 开源的 LevelDB 开发的,底层为 LSM Tree 的 KV 存储系统。以 Append Only 的形式写入数据,即使是修改或删除数据。
如图所示:
数据写入流程:WAL -> memtable -> SST(Sorted String Table) -> 下沉
- WAL: 数据库系统常用的预定日志,保证数据的原子性。
- memtable: 可以选择 hash table 或跳表,写满后变成一个个不可修改的 memtable。
- memtable 中的数据会定时或达到一定量时,flush 到 L0 的 SST。
- SST(Sorted String Table) : SST 文件中的数据是有序存储的,其中包含了真实数据、索引数据、Bloom Filter 数据。它是持久化到磁盘的数据。
- L0 的多个 SST 间数据范围存在重叠,例如:SST-1中数据范围为 0~20, SST-2 可能为 10~30。但 L1 及以下层 SST 间的数据范围是不会有交集的,这保证了可以在单层全部 SST 间进行二分查找。
- SST 文件大小会随层数的递增逐渐变大。由异步任务按一定策略执行压缩任务,将上层的数据压缩合并、分配至下层的 SST 中,相同的 key 会合并、更新。
数据读取流程: memtable -> immutable memtable -> block cache -> L0 -> 下沉
- 读 L0 SST 时,因为 SST 间 key 范围存在重叠,所以需要遍历所有 SST 进行查找。而下层数据只需在整体 SST 和单个 SST 中二分查找即可定位数据。Bloom Filter用于加速查找速度。
由于 RocksDB 的 LSM 结构,很适合作增量 Checkpoint。
如上图所示,保留两个 Checkpoint 的 SST 数据。第四列记录了 SST 文件在有效 Checkpoint 中被引用的次数,当 SST 文件不再被当前状态使用时,将引用计数减 1,值为 0 时,清除该 SST。
优缺点:
Heap StateBackend 底层由 StateTable 实现,有两种实现:CopyOnWriteStateTable、 NestedMapsStateTable。
优缺点:
这里仅简单介绍 RocksDB StateBackend 的两种策略。
RocksDB Backend 每次读写数据时,都会对数据进行正反序列化。使用如上图所示的序列化器。
snapshot 分为两个阶段:syncPrepareResources、asyncSnapshot
RocksFullSnapshotStrategy:
其中 State 的数据如下图所示:
状态恢复由实现 RestoreOperation 接口来完成
这部分工作主要为了应对同一个 Flink 任务状态发生变更、演进时,如何保证任务的正常运行。主要包括:
这是2018 年 Flink Forward 大会分享的三个规划,目前前两个已实现,还有细节有待完善。
目的在于当任务更新时,利用 Savepoint 来迁移作业状态,保证兼容性。有两种 Schema 的兼容问题:
目前 Flink 的 schema migration 不支持:key 的 schema 更新;使用 kryo 序列化
提供离线读写 State 的能力。这种对 State 的管理能力对于生产级应用是至关重要的,因为程序 bug 是不能完全避免的,而当 bug 导致作业 State 错误时我们需要有可以修复这种错误的能力。
详细使用可参照官方文档
存在三种角色:
目前存在限制: 与任务生命周期绑定,任务停止就不可查
queryable-state.enable: true
AggregatingStateDescriptor<BumpEvent, Long, Long> countingState = new AggregatingStateDescriptor<>(
"item_counts",
new AggregateFunction<BumpEvent, Long, Long>(){.....},
Long.class);
inputs.keyBy(BumpEvent::getItemId).asQueryableState("item_counts", countingState);
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<String, Long>> descriptor = new ValueStateDescriptor<Tuple2<String, Long>>("key-count",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));
descriptor.setQueryable("query-name");
keyCount = getRuntimeContext().getState(descriptor);
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.13.0</version>
</dependency>
对比 Flink 1.12 与 1.14:
RocksDB Block Cache 中存储的数据包含三部分:Data Block (真实数据)、Index Block (每条数据的索引)、Filter Block (Bloom Filter 数据)
从图中可以看出,Filter Block 和 Index 是明显大于 Data 的。
以 256M SSD 文件为例,Index Block 大概是 0.5M,Filter Block 大概是 5M,Data Block 则默认是 4KB。当 Cache Block 是几百 MB 的时候,如果文件数特别多,Index 和 Filter 不断的替出换入,性能会非常差,尤其是在默认开启了内存管控后。比较明显的现象是,IO 特别频繁,性能始终上不去。
1.13 中,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存比较小的场景中,性能提升 10 倍左右
多个 RocksDB 可以共享一个 Write Buffer Manager 。如上图所示,以单个 Write Buffer Manager 为例,它将自己的内存 reserve 到 Block Cache 中,根据自己的内存管控逻辑来实现记账,Block Cache 内有 LRU Handle,超出预算时,会被踢出。
1.13 中设置非严格模式,可能存在一定程度的内存超用。在Rocksdb升级后解决该问题,1.13 推荐通过调整 JVM OverHead 来解决内存超用的问题。
FLIP-151、50 针对 heap-based state backend 的两个缺点来优化,而 151 与 158 存在一定程度上的相似之处,详细可查阅FLIP-151
该 Feature 主要针对 Heap 没有增量 Checkpoint 的缺点。
该 Feature 主要针对 Heap 只能处理有限数据,容易 OOM 的缺点。
独立于 StateBackend,额外维护一份 State ChangeLog,该 log 会持续持久化到外部存储系统中。
定期对 StateTable 进行持久化,一旦持久化完成,即可将 ChangLog 中包含的数据截断,减少冗余数据。
Checkpoint 时,只需记录 offset,可以快速响应;在状态恢复时,只需要取最新持久化的 StateTable,结合到 offset 为止的 Changlog。
优点:
深度解析-State
Flink State 最佳实践
官方文档:Working with state
Rocksdb github wiki
Rocksdb 调优
万字长文详解 Flink 中的 CopyOnWriteStateTable
Queryable State
官方文档:State Schema Evolution
Flink 1.13,state backend 优化及生产实践分享
1.14 Release management
什么是 Flink State Evolution
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。