当前位置:   article > 正文

Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记_flink官方文档学习

flink官方文档学习

学习文档:概念透析 - 有状态流处理

学习笔记如下:


状态(State)

有状态流:有些 operator 会存储多个事件的信息(例如窗口运算符),这些操作被称为有状态的。

有状态操作的样例:

  • 在搜索满足某些模式的事件时,存储目前为止已经遇到的事件序列;
  • 在计算一定时间范围内的聚合统计时,存储之前的聚合统计结果;
  • 在进行机器学习模型训练时,存储当前的模型参数;
  • 当需要管理历史数据时,使用状态访问之前已发生过的事件。

通过 checkpoints 和 savepoints 机制,允许:

  • 状态的容错机制
  • 在并行实例之间重新分配状态

键控状态(Keyed State)

Key State
  • 键控状态被保存在键 / 值存储中
  • 键对应的状态和 Operator 的分区严格对齐,从而确保所有状态更新都是本地操作,进而保证了一致性且没有额外开销
  • 在运行中,只能获取当前键对应的值
Key Group
  • Keyed State 会被进一步组织为所谓的 Key Group
  • Key Group 是 Flink 可以重新分配 Keyed State 的原子单位
  • Key Group 的数量与定义的最大并行度完全相同
  • 在执行过程中,每个 Keyed Operator 的并行实例都使用一个或多个 Key Group

在这里插入图片描述

状态持久化(State Persistence)

Flink 使用流重放(stream replay)和检查点(checkpointing)来实现容错。检查点会记录每个输入流的特定点以及每个操作器的相应状态。通过恢复操作器的状态并从检查点位置重放记录,流数据可以实现从检查点恢复并保持一致性(Exactly-once)。

检查点间隔(checkpoint interval):是从检查点恢复时的容错开销。

当运行出现问题时,Flink 会执行如下操作:

  • 停止分布式流
  • 重启 operator,并将 operator 重置到最近一次成功的 checkpoint
  • 将输入流重置到最近一次 checkpoint 快照的位置

通过以上操作,Flink 重启前已处理的记录不会影响重启后的状态。

Checkpointing

Flink 生成快照的过程是异步的,在生成快照时不需要加锁。

  • Flink 生成快照的方法称为:Lightweight Asynchronous Snapshots for Distributed Dataflows
  • Flink 生成快照的方法基于 standard Chandy-Lamport algorithm 算法
Barriers

Flink 分布式快照的核心是流屏障(stream barriers)。这些流屏障被插入到并行的数据流中,并随着数据流一起流动,流屏障始终严格保持在固定的位置,不会超过记录。流屏障将数据流中的记录分为当前快照记录集和下一个快照的记录集。

流屏障非常轻量级,不会中断流的流动。

因为每个流屏障都带有快照的 ID,所以不同快照的多个屏障可以同时存在于流中,这意味着不同快照可能同时发生。

在这里插入图片描述

对于快照 n 来说:

首先,在数据流中插入的流屏障的位置,就是快照能够包含的数据位置。例如,对于 Kafka 来说,这个位置就是各个分区的最后一条记录的偏移量。这个流屏障的位置将被报告给 checkpoint coordinator(JobManager)。

然后,流屏障向下游流动。当中间算子从其所有输入流中收到快照 n 的屏障时,它会向其所有输出流中发出快照 n 的屏障。一旦 sink operator(流 DAG 的末尾)在其所有输入流中都收到快照 n 的屏障,它就会向 checkpoint coordinator 确认快照 n。当所有接收器都确认了快照 n 后,该快照将被视为已完成。

一旦完成快照 n,该作业将永远不会再向源请求快照 n 的流屏障之前的记录,因为到那时这些记录(及其后代记录)将已经通过整个数据流拓扑。

在这里插入图片描述

对于接受多个输入流的 Operator,需要对齐各个输入流的流屏障。如上图。

  • Operator 一旦的任意一个输入流中接收到了快照 n 的流屏障,那么该 Operator 将不再处理来自该输入流的任何记录,直到该 Operator 接收到其他所有输入流中的快照 n 的流屏障;否则,它将会混合快照 n 的记录和快照 n + 1 的记录。
  • 当最后一个流接收到快照 n 的流屏障后,Operator 将首先向下游发出挂起的输出记录(快照 n 的结果记录),然后再向下游发出快照 n 的流屏障,接着,它将会对自身的 State 进行快照并恢复从输入流中处理数据。在开始恢复处理消息时,它首先会处理输入缓冲区(input buffer)中的消息,然后再处理流中的消息。
  • 最后,Operator 将会异步地将 state 写到 state backend。

需要注意的是,以下 Operator 都需要进行对齐:

  • 具有多个输入流的 Operator
  • 在 shuffle 后接收多个上游子任务的输出流的 Operator
Operator State 快照

如果 Operator 包含任意 State,那么这个 State 也必须成为快照的一部分。

Operator 快照的生成时机:

  • Operator 的所有的输入流都已经接收到流屏障
  • 当前 Operator 尚未将流屏障添加到自己的输出流中

在这个时候,流屏障之前的记录对 State 的更新都已经发生,而流屏障之后的记录对 State 的更新完全没有发生。

因为快照中的 State 可能很大,所以存储到可配置的 State backend 中。在默认情况下,它会存储到 JobManager 的内存中,但是在生产环境下,也可以配置到分布式存储中(例如 HDFS)。

在 State 存储完成后,Operator 确认这个 checkpoint,向它的输出流中添加流屏障,并继续处理输入流中的记录。

快照中包含的信息

最终,快照中将会包含如下信息:

  • 并行数据流,将记录快照开始时的偏移量或位置
  • Operator,将记录指向该快照状态的指针

在这里插入图片描述

Recovery
  1. 当错误发生后,Flink 选择最近的一次完成的 checkpoint k
  2. 系统重建分布式 dataflow,并将 checkpoint k 中快照的 state 给到每个 Operator
  3. 将输入流设置到 checkpoint 对应的位置 Sk(例如,对于 Kafka 来说,就是将偏移量重置到 Sk 的位置)

如果 State 是增量快照的,那么 Operator 将首先读取最近的一次全量快照,然后逐个读取一系列增量快照以更新该全量快照。

Unaligned Checkpointing

checkpoint 也可以使用非对齐的模式。其基本思想是:只要正在传输的数据作为 Operator State 的一部分,那么 checkpoint 久可以覆盖所有正在传输的数据。

非对齐的 checkpoint 也使用类似 Chandy-Lamport 算法实现,但是 Flink 仍然会向输入流中插入流屏障以避免使 checkpoint coordinator 负担过重。

在这里插入图片描述

上图描述了 Operator 如何处理未对齐 checkpoint 流屏障:

  • 当前 Operator 在其输入流中接收到第一个流屏障后开始处理
  • Operator 立即在自己的输出流的 output buffers 的末尾插入流屏障以通知后续节点
  • Operator 异步地存储所有越过的记录,并创建其自身 State 的快照

因此,Operator 只需要短暂地停止输入流的处理,以完成缓冲区标记,转发流屏障,创建其自身 State 的快照即可。

不对齐的 checkpoint 保证了流屏障可以最快地传输到 sink。它非常适合于处理,存在至少一个流非常缓慢,导致对齐需要耗费数小时的情况。但是,因为它产生了额外的 IO 压力,所以对于瓶颈是存储 state backends 的 IO 时并无帮助。

需要注意的是,savepoint 永远要求对齐。

Unaligned Recovery

Operator 在恢复 Unaliged checkpoint 时,会优先处理之前处理中的数据,然后再处理上游算子中的数据。除此以外,其他 recovery 逻辑与对齐的 checkpoint 一致。

State Backends

存储 key / value 索引的数据结构取决于选择的 state backend。一种 state backend 将数据存储到内存中的哈希映射中,另一种状态后端使用 RocksDB 存储。

除了定义存储 State 的数据结构外,state backends 还实现了构造 key / value state 的 point-in-time 快照的逻辑,并可以将该快照作为 checkpoint 的一部分存储。

state backends 可以在不修改应用逻辑的情况下配置。

在这里插入图片描述

Savepoints

任何使用 checkpoint 的程序,都可以通过 savepoint 恢复执行。savepoint 支持在发生以下操作时不会丢失任何 State 数据:

  • 更新应用程序
  • 更新 Flink 集群

savepoint 是手动触发的 checkpoint,将会生成一个快照并写入到一个 state backend 中。这依赖于标准的 checkpoint 组件。

相较于 checkpoint,savepoint 具有以下差异:

  • 由用户手动触发
  • 在新的检查点完成时不会自动过期

Exactly Once vs. At Least Once

对齐步骤可能会给流程序带来延迟。通常,这种额外的延迟在几毫秒,但是我们也看到过一些延迟明显增加的情况。对于要求所有记录具有一致的超低延迟(几毫秒)的应用程序,Flink 提供了一个开关用于跳过 checkpoint 的流对齐,Operator 将在接收到每个输入流中的流屏障后,就会立即开始 checkpoint 快照。

当流对齐被跳过时,Operator 将会继续处理输入流的消息,即使一些流屏障可能已经到达。因此,在完成 checkpoint n 的快照开始前,Operator 可能已经处理了属于 checkpoint n + 1 的记录。当 restore 时,这些记录可能会被重复计算,因为它们既在 checkpoint n 的快照中,又会作为 checkpoint 之后的数据被重放。

批处理程序的状态(State)及容错机制(Fault Tolerance)

Flink 将批处理程序(batch program)作为流处理程序(streaming program)的 BATCH 执行模式(ExecutionMode)的一种特殊情况,在该特殊情况下流式有界的(即元素数量时有限的)。因此,与流处理程序一样,上述概念也适用于批处理程序,但也有如下差异:

  • 批处理程序的容错机制不适用 checkpoint,而是通过完全重放流来实现的。因为输入是有界的,所以这是可能的。因为不在使用 checkpoint,所以恢复的成本更高,但是在常规处理中的成本更低。
  • 在批处理模式中,State backend 使用简化的 in-memory/out-of-core 数据结构,而不是 key/value 索引。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/921720
推荐阅读
相关标签
  

闽ICP备14008679号