当前位置:   article > 正文

Flink State 二三事_flink中包含map的类的序列化与反序列化

flink中包含map的类的序列化与反序列化

一、State 基础

State 作为 Flink 的基础设施,用于存储流计算计算节点的中间结果。如一个聚合计算的任务,先将历史的计算结果写入 state,当来了新数据时,再读取状态中的数据,更新后再写入 state。

对于一个能在生产环境稳定使用的 State,它需要解决以下问题:
1. 低延时地读写;
2. 可靠、高可用,提供数据 Exactly Once 的语义,失败可恢复;
3. 数据可以分配在 subtask 中,任务中断修改并行度后,任务依然可以恢复,并能重新分配至新的 subtask 中;
4. 可扩展,大状态场景下,依然能够稳定运行。

State 类型在这里插入图片描述

  • Keyed State:只能用在 KeyedStream 上,算子只能访问当前 Key 的数据。
    • 在用户任务中比较常用,其包括 ValueState、ListState、ReducingState、AggregatingState、MapState 等,FoldingState 已废弃。
    • 使用方式:重写 RichFunction,通过 RuntimeContext 访问
  • Operator State:可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态。
    • operator state 一般用在 source、sink 上。如记录 Kafka 的 offset。
    • 使用方式:实现 CheckpointedFunction 接口,通过 FunctionInitializationContext 访问
      在这里插入图片描述

一般认为 operator state 的数据规模是比较小的;keyed state 规模相对比较大。在实现上 flink 也是这么做的,operator state backend 仅有一种 on-heap 的实现,因此 operator state 过大容易导致任务 OOM。

每种状态都有托管状态(managed state)和原始状态(raw state)两种。

  • 托管状态:用户不用关心状态的实现细节,包括正反序列化、checkpoint 等。
  • 原始状态:用户需要自行实现

状态横向扩展

在任务因为流量变更,需要调整资源时,用户可以通过 savepoint 停止任务,修改并行度后,再重新启动恢复任务。如下图所示:
在这里插入图片描述

  • Keyed State:会把状态同步至指定的 subtask 中。
    • 用 KeyGroup(见下文) 映射 Key,KeyGroupRange 指定了一个 KeyGroup 的最大最小值的范围,将其分配给 subtask。
  • Operator State:
    • 简单 hash 后,均匀分配到 subtask 中
    • 将所有状态合并,再分发到所有实例

KeyGroup

在这里插入图片描述

上图分别展示了任务变更并行度时,无 KeyGroup 和有 KeyGroup 时,状态如何在子任务中横向扩展。
其中一共有key:0~19 ,任务并行度从3改为4。

  • A图:没有KeyGroup的情况。
    1. SubtaskNum = hash(key) % parallelism。根据该公式取得 key 对应的 subtask 的编号,将该 key 分配到该 subtask 上。
    2. 初始时,subtask 分配到 {0,3,6,9,12,15,18},其余如图所示。
    3. savepoint 时如图所示顺序存储各 subtask 分配的 key。
    4. 任务恢复时,需要将 key 重新分配,有两种实现方案:
      1. 每个 subtask 读取所有数据,过滤出分配给当前 subtask 的 key 数据。该方法顺序读、效率高,但需要读取很多无用数据,当状态大、并行度高时,性能差;
      2. 对所有 key 创建索引,每个 subtask 针对分配到的 key 向外部存储系统(HDFS)发起多个请求获取数据。该方法缺点在于:要发起很多随机读请求,给外部存储系统造成很大压力,随机IO性能差;同时需要将所有 key 的索引序列化存储。

上述问题可以通过 KeyGroup 来缓解。

  • B图:假设 KeyGroupNum 为10,一般为最大并行度。
    1. KeyGroup = hash(key) % KeyGroupNum。根据该公式将 key 分配到 10 个组,再将这 10 个组平均分配给三个 subtask。
    2. savepoint 时,KeyGroup 按顺序写入外部存储系统(HDFS)。
    3. 修改并行度恢复后,KeyGroup 仍然在 subtask 中平均分配。如图所示,每个 subtask 可以顺序从存储系统中读取分配到的 KeyGroup 数据。

KeyGroup 在一定程度上缓解了上述问题,但 KeyGroupNum 变化时,问题依然存在。

State 实践建议

  • Operator State:

    • 谨慎使用长ListState:由于 Operator State 中没有 KeyGroup 的概念,所以为了实现改并发恢复的功能,需要一个 offset 数组记录 List 中所有元素序列化后的位置偏移 offset。这部分数据会发往 JM,当 operator 的并发数很大时,容易导致 JM 的 OOM。
    • 正确使用 UnionListState:Union 拿到的是全量数据,要是只使用一部分数据,切记恢复的 task 只取其中的一部分进行处理和用于下一次 snapshot,否则会导致state越来越大。
  • Keyed State:

    • 正确清空State:在 KeyedProcessFunction 的单个 key 处理中只能清空自己 key 的数据,如果想清理所有 key 数据,需要 applyToAllKeys 或设 TTL。
    • RocksDB 中考虑 value 值很大的极限场景:Rocksdb 的 value 受 JNI 的限制,单个 value 只支持 4G,如果是 List 或 value 可以改成 map,因为 map 会拆分写入。

二、State Backend

StateBackend 分类

  • 下图为 1.13 版本以前的 API:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LHDElPfc-1646308335969)(http://pfp.ps.netease.com/kmspvt/file/616d4c132dcade2101516b9dPS5L0OF601?sign=a_WQQqw-yNgmWqIgnsV1W45M3Pk=&expire=1646326081)]

其中:绿色表示所创建的operator/keyed state backend 是 on-heap 的,黄色则表示是 off-heap 的

  • 1.13 API 作了变更:
    • State Backend 的概念变窄,只描述状态访问和存储,即任务运行时状态读写的位置。
    • 新增 Checkpoint storage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到远程。
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aaPDzMZ3-1646308335970)(http://pfp.ps.netease.com/kmspvt/file/61736fb86158bc1580d4d237COp0S6f901?sign=XTkne2BFKOpejhN2sZgoFmzrCqs=&expire=1646326081)]
env.setStateBackend(StateBackend)

env.getCheckpointConfig()
    .setCheckpointStorage(CheckpointStorage)
  • 1
  • 2
  • 3
  • 4
  • 1.13 前后 API 对比
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Fu6dxOKK-1646308335970)(http://pfp.ps.netease.com/kmspvt/file/6173712168d8645601b80667LpGwYdt001?sign=uyxvtpXLl4tmEh52Fre6VsxZCdE=&expire=1646326081)]

RocksDB StateBackend

在这里插入图片描述

图中展示了 RocksDB StateBackend 在 Flink 引擎中的位置。
它是一个本地数据库,数据由 RocksDB 负责在内存和磁盘中调度。
它是基于 Google 开源的 LevelDB 开发的,底层为 LSM Tree 的 KV 存储系统。以 Append Only 的形式写入数据,即使是修改或删除数据。

在这里插入图片描述

如图所示:
数据写入流程:WAL -> memtable -> SST(Sorted String Table) -> 下沉
- WAL: 数据库系统常用的预定日志,保证数据的原子性。
- memtable: 可以选择 hash table 或跳表,写满后变成一个个不可修改的 memtable。
- memtable 中的数据会定时或达到一定量时,flush 到 L0 的 SST。
- SST(Sorted String Table) : SST 文件中的数据是有序存储的,其中包含了真实数据、索引数据、Bloom Filter 数据。它是持久化到磁盘的数据。
- L0 的多个 SST 间数据范围存在重叠,例如:SST-1中数据范围为 0~20, SST-2 可能为 10~30。但 L1 及以下层 SST 间的数据范围是不会有交集的,这保证了可以在单层全部 SST 间进行二分查找。
- SST 文件大小会随层数的递增逐渐变大。由异步任务按一定策略执行压缩任务,将上层的数据压缩合并、分配至下层的 SST 中,相同的 key 会合并、更新。
数据读取流程: memtable -> immutable memtable -> block cache -> L0 -> 下沉
- 读 L0 SST 时,因为 SST 间 key 范围存在重叠,所以需要遍历所有 SST 进行查找。而下层数据只需在整体 SST 和单个 SST 中二分查找即可定位数据。Bloom Filter用于加速查找速度。

在这里插入图片描述

由于 RocksDB 的 LSM 结构,很适合作增量 Checkpoint。
如上图所示,保留两个 Checkpoint 的 SST 数据。第四列记录了 SST 文件在有效 Checkpoint 中被引用的次数,当 SST 文件不再被当前状态使用时,将引用计数减 1,值为 0 时,清除该 SST。

优缺点

  • 优点:
    • 管理大状态:可将数据存储至磁盘,控制内存的使用
    • 增量 Checkpoint,提高运行时的 Checkpoint 速度
  • 缺点:
    • 读写放大: 读一条数据,可能需要读 memtable、immutable-memtable、block cache、分层读、索引、bloom filter,导致 IO 放大;写数据,则有 WAL,还会有异步的数据下沉合并。memtable 的大小越大,写放大越小。
    • 空间放大: Append Only 的方式、cache、索引、过滤器等。
    • 应对突发流量的时候削峰能力不足: 当写高峰时,L0 的 sst 过多,compaction不过来会导致 L0 的文件积压,使读性能恶化。这种现象在 Flink 任务追数据时容易出现。
    • 增量 Checkpoint 降低任务恢复速度: 需要合并 Checkpoint 文件来恢复状态。 通过配置任务本地恢复(execution.checkpointing.prefer-checkpoint-for-recovery true),可适当提高速度。
    • 增量 Checkpoint 增加运维成本: Checkpoint 文件的清理作业多、并发大时会有很多 SST 文件发至 HDFS,出现小文件问题
    • 序列化反序列化降低读写速度: 每次对数据的读写都需要正反序列化,增加额外开销。
    • 性能受配置影响大,复杂的调优: 比如 memetable 大小和数据结构的选择、block cache 大小、SST 文件大小、层数等,都会影响 RocskDB 的读写性能。

Heap StateBackend

Heap StateBackend 底层由 StateTable 实现,有两种实现:CopyOnWriteStateTable、 NestedMapsStateTable。

  • NestedMapsStateTable: 同步 snapshot,已不再使用,未来将被废弃
  • CopyOnWriteStateTable: 异步 snapshot。具体实现

优缺点

  • 优点:性能好。无需排序、压缩;不用每次读写都序列化、反序列化;没有 IO 放大、JNI 限制;更短的 checkpoint 同步阶段
  • 缺点:
    • 内存空间有限,容易出现 OOM
    • 缺少增量快照

三、State 正反序列化

State Snapshot

这里仅简单介绍 RocksDB StateBackend 的两种策略。

在这里插入图片描述

  • RocksDB Backend 每次读写数据时,都会对数据进行正反序列化。使用如上图所示的序列化器。

  • snapshot 分为两个阶段:syncPrepareResources、asyncSnapshot

  • RocksFullSnapshotStrategy:

    • 同步阶段:收集元数据信息
    • 异步阶段:createCheckpointStreamSupplier(为了取不同的 stream: local/local+hdfs)
      -> FullSnapshotAsyncWriter#writeSnapshotToOutputStream (1. 写元数据信息;2. 读 RocksDB 全量数据,按下图格式写入到 hdfs。为了统一 Savepoint 格式
      在这里插入图片描述

其中 State 的数据如下图所示:
在这里插入图片描述

  • RocksIncrementalSnapshotStrategy:
    • 同步阶段:触发 RocksDB Checkpoint,收集元数据信息
    • 异步阶段:将 RocksDB 的 SST 文件上传至 hdfs

State Restore

状态恢复由实现 RestoreOperation 接口来完成

  • RocksDBFullRestoreOperation
    1. openDB
    2. 通过 StateHandle 从 HDFS 读出数据,写入本地 DB 中
  • RocksDBIncrementalRestoreOperation
    1. 从 HDFS 下载相关 SST 文件
    2. openDB

四、State Evolution

这部分工作主要为了应对同一个 Flink 任务状态发生变更、演进时,如何保证任务的正常运行。主要包括:

  • State Schema Migration
  • Savepoint Management
  • Upgradability Dry-Runs:用于离线检查作业的版本兼容性,以帮助用户提前发现兼容性问题。常见兼容性问题有:作业拓扑的变更、State Schema 的变更

这是2018 年 Flink Forward 大会分享的三个规划,目前前两个已实现,还有细节有待完善。

State Schema Migration


在这里插入图片描述

目的在于当任务更新时,利用 Savepoint 来迁移作业状态,保证兼容性。有两种 Schema 的兼容问题:

  • State Schema 更新:如新增、移除、重命名字段。这种版本升级主要需要考虑序列化器的兼容性,比如 Java 默认的对象序列化器,可以兼容新增字段但不兼容删除、重命名。如果变更是可兼容的,那么无需额外操作即可迁移,否则可能需要考虑更换序列化器。然而 Flink 默认情况下会使用 POJO 序列化器(PojoSerializer),目前支持增删字段,但字段类型不能变更,pojo 的类名不能变更。不兼容的 State Schema 的更新就等同于 State 序列化器的更新
  • State 序列化器更新:
    1. HeapStateBackend:将上个版本序列化器序列化存储,读取 Savepoint 时用旧的序列化器把数据全部反序列化到内存中。
    2. RocksDBStateBackend:多版本并存,此时保存所有序列化器是不可取的。可用用以下两种方法解决:
      i. 保证序列化器向后兼容: 自定义的序列化器需要考虑这点。Flink 默认的 PojoSerializer 不支持。
      ii. State Migration Process: 在恢复作业状态时用上次的序列化器反序列化所有数据。这在 State 大的情况,可能会导致作业启动需要很长的恢复时间,因此该方法需要结合实际慎重使用。遇到这些情况可以使用下一节的方法,先离线处理。

目前 Flink 的 schema migration 不支持:key 的 schema 更新;使用 kryo 序列化

Savepoint Management

提供离线读写 State 的能力。这种对 State 的管理能力对于生产级应用是至关重要的,因为程序 bug 是不能完全避免的,而当 bug 导致作业 State 错误时我们需要有可以修复这种错误的能力。
详细使用可参照官方文档

五、Queryable State

在这里插入图片描述

存在三种角色:

  • Client: 查询的客户端
  • ClientProxy: 代理负责接收 client 请求,首先会在本地查找,找不到再向 JM 查询状态的真实位置,之后请求真实节点的 StateServer 查询状态数据
  • StateServer: 状态的真实存储位置

目前存在限制: 与任务生命周期绑定,任务停止就不可查

使用方法

  1. 修改 flink-conf.yaml 配置:queryable-state.enable: true
  2. 添加jar包:将 flink-queryable-state-runtime_2.12-1.13.0.jar 从 Flink 的 opt 目录下复制到 lib/ 目录
  3. 让状态可查:
    • QueryableStateStream:asQueryableState() 。该方法类似 print,只能作为流的最终输出,不适合用于生产环境。
    AggregatingStateDescriptor<BumpEvent, Long, Long> countingState = new AggregatingStateDescriptor<>(
        "item_counts",
        new AggregateFunction<BumpEvent, Long, Long>(){.....},
        Long.class);
    
    inputs.keyBy(BumpEvent::getItemId).asQueryableState("item_counts", countingState);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • stateDescriptor.setQueryable(String queryableStateName)
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<String, Long>> descriptor = new  ValueStateDescriptor<Tuple2<String, Long>>("key-count",
            TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));
    
        descriptor.setQueryable("query-name");
        keyCount = getRuntimeContext().getState(descriptor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  4. 创建 client 查询
    • 添加依赖
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-queryable-state-client-java</artifactId>
        <version>1.13.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 代码实现
      在这里插入图片描述

六、社区关于状态的工作

对比 Flink 1.12 与 1.14:

  • StateBackend API 变更,如上文所述
  • State 访问的性能监控:State 访问的性能监控开启后,对不同的 State Backend 性能损失影响不同,默认关闭。
    • 对于 RocksDB State Backend, 性能损失大概在 1% 左右;
    • 对于 Heap State Backend, 性能损失最多可达 10%。
  • 统一 Savepoint 格式:如上文介绍的 RocksDB StateBackend 全量 Snapshot/Restore 策略,将 RocksDB 中的数据全部读出并按一定格式写出到 HDFS
  • RocksDB partitioned Index & filter
  • RocksDB state-backend 内存管控优化
  • Rocksdb 版本升级: 从5.17 -> 6.20,相差上千 commit;会导致 10% 以下的性能倒退。社区通过对比 RocksDB 的 commit 找到一些有问题的 commit,但还不全。权衡利弊后,最终决定升级
    • https://issues.apache.org/jira/browse/FLINK-14482
    • https://issues.apache.org/jira/browse/FLINK-19710

RocksDB partitioned Index & filter

在这里插入图片描述

RocksDB Block Cache 中存储的数据包含三部分:Data Block (真实数据)、Index Block (每条数据的索引)、Filter Block (Bloom Filter 数据)
从图中可以看出,Filter Block 和 Index 是明显大于 Data 的。
以 256M SSD 文件为例,Index Block 大概是 0.5M,Filter Block 大概是 5M,Data Block 则默认是 4KB。当 Cache Block 是几百 MB 的时候,如果文件数特别多,Index 和 Filter 不断的替出换入,性能会非常差,尤其是在默认开启了内存管控后。比较明显的现象是,IO 特别频繁,性能始终上不去。
在这里插入图片描述

1.13 中,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存比较小的场景中,性能提升 10 倍左右

RocksDB state-backend 内存管控优化

  • Flink 申明一个 state 时,对应 RocksDB 的一个 column family(独立内存): 如果声明两个 state,会各自使用自己的 Write Buffer 和 Cache 内存
  • Flink 不会限制用户在一个 operator 内申明的 state 数量: 用户可以设置几千个、几万个 state,可能导致容器内存撑爆
  • Flink 在 slot-sharing 时,一个 slot 内可以存在多个包含 keyed state 的 operator
    在这里插入图片描述

多个 RocksDB 可以共享一个 Write Buffer Manager 。如上图所示,以单个 Write Buffer Manager 为例,它将自己的内存 reserve 到 Block Cache 中,根据自己的内存管控逻辑来实现记账,Block Cache 内有 LRU Handle,超出预算时,会被踢出。

1.13 中设置非严格模式,可能存在一定程度的内存超用。在Rocksdb升级后解决该问题,1.13 推荐通过调整 JVM OverHead 来解决内存超用的问题。

未来工作

FLIP-151、50 针对 heap-based state backend 的两个缺点来优化,而 151 与 158 存在一定程度上的相似之处,详细可查阅FLIP-151

FLIP-151: Incremental snapshots for heap-based state backend

该 Feature 主要针对 Heap 没有增量 Checkpoint 的缺点。

  • 实现上主要有以下工作,可以复用 StateTable 的功能:
    • 计算增量数据,包括:
      • 发生变更的Key
      • 变更Key对应的数据变更
    • 写快照:遍历数据、序列化
    • 关联旧版本
    • 恢复:遍历快照,并合并diff
    • 压缩,清理过期数据

FLIP-50: Spill-able Heap Keyed State Backend

该 Feature 主要针对 Heap 只能处理有限数据,容易 OOM 的缺点。
在这里插入图片描述

  • 由堆状态监测器来监控堆内存的使用情况,当使用量或 GC 时长超过一定阈值时,将内存中一部分 KeyGroup 的状态转移到磁盘上;
  • 当堆内存空闲或者读取的数据不在时,再将对应的状态 load 进内存

FLIP-158: Generalized incremental checkpoints

在这里插入图片描述

独立于 StateBackend,额外维护一份 State ChangeLog,该 log 会持续持久化到外部存储系统中。
在这里插入图片描述

定期对 StateTable 进行持久化,一旦持久化完成,即可将 ChangLog 中包含的数据截断,减少冗余数据。
在这里插入图片描述

Checkpoint 时,只需记录 offset,可以快速响应;在状态恢复时,只需要取最新持久化的 StateTable,结合到 offset 为止的 Changlog。

优点

  • 可以有效降低 Checkpoint 时间,TB 级状态都能秒级完成;
  • 可以预判 Checkpoint 时长,不受状态大小影响。如果是 RocksDB StateBackend,当状态变更的是 L0 时,速度会很快,但如果是压缩 L4、L5 的大 SST,就会变慢。
  • 避免了 RocksDB 出现的小文件问题

Reference

深度解析-State
Flink State 最佳实践
官方文档:Working with state
Rocksdb github wiki
Rocksdb 调优
万字长文详解 Flink 中的 CopyOnWriteStateTable
Queryable State
官方文档:State Schema Evolution
Flink 1.13,state backend 优化及生产实践分享
1.14 Release management
什么是 Flink State Evolution

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/413157
推荐阅读
相关标签
  

闽ICP备14008679号