赞
踩
Flink中的每个函数和运算符都可以是有状态的(如果是Keyed Stream
,使用ValueState、ListState
等状态,如果是Operator State
,实现CheckpointedFunction或CheckpointedList接口
。使用ListState、MapState
获取状态)。有状态的函数在各个元素/事件处理中存储数据。使状态成为任何类型的更复杂操作的关键构建块。
为了使状态容错,Flink需要检查状态。Checkpoint(检查点)允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。
一般来说,checkpoint机制和流的状态持久存储 需要满足:
可重放地数据源(Source)。例如Apache Kafka、RabbitMQ、文件系统(HDFS、S3)等。
可靠地文件系统,因为State(状态)需要持久存储,通常是需要分布式文件系统,如HDFS、S3等。
默认情况下,Checkpoint是被禁用的。如需使用Checkpoint
机制,需要调用env.enableCheckpointing(long interval)
。需要设置一个间隔时间,每个间隔时间发出一次Checkpoint。
Checkpoint参数设置
下面直接以代码的方式展示描述:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置启用 Checkpoint,默认 CheckpointMode为 EXACTLY_ONCE(精准一次) env.enableCheckpointing(6000); // enableCheckpointing方法还有2个参数的方法,后面可以修改默认的Checkpoint Mode,改为AT_LEAST_ONCE(至少一次) env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE); // 修改 Checkpoint Mode,Checkpoint Mode有2个枚举值,分别为:EXACTLY_ONCE,AT_LEAST_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); // 修改Checkpoint的间隔时间 env.getCheckpointConfig().setCheckpointInterval(6000); // 设置checkpoint timeout,如果当前checkpoint超过设置的时间,则本次checkpoint终止,默认10分钟 env.getCheckpointConfig().setCheckpointTimeout(6000); // 设置Checkpoint的最大并发度,默认1。 // 如果设置为n,则n个线程同时进行checkpoint,其中只要一个线程写入成功或所有线程都超时,才会进行下一个checkpoint,否则一直尝试。 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置检查点之间的最小暂停时间。一个检查点写入成功或超时后 + 检查点之间的设置时间 = 下一次checkpoint的触发时间 // 如果设置检查点并发度为1时,则使用此设置效率高 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6000); // 当job取消时,是否删除外部储存的checkpoint数据,有2个参数,分别为: // RETAIN_ON_CANCELLATION:当job取消时,保存checkponit // DELETE_ON_CANCELLATION:当job取消时,删除checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
flink-conf.yaml
文件配置选项如下:
key | value(默认) | 描述 |
---|---|---|
state.backend | 没有 | 状态后端,用于储存checkpoint的状态。值有jobmanager、filesystem、rocksdb。具体可见State Backend文档。 |
state.backend.async | true | 是否异步快照保存,默认true。某些状态后端可能不支持异步快照,或仅支持异步快照,并忽略此选项。详情可见State Backend文档。 |
state.backend.fs.memory-threshold | 1024 | 保存状态的文件最小大小1024。所有小于的状态块都以内联方式存储在检查点元数据文件(jobmanager)中。 |
state.backend.incremental | false | 是否支持创建增量检查点。如果状态后端支持则使用增量存储,如果不支持则忽略这个参数。 |
state.backend.local-recovery | false | 此参数是对于本地恢复的。默认false。local recovery当前仅仅支持keyed state。MemoryStateBackend也是不支持的,会忽略这个参数。 |
state.checkpoints.dir | 没有 | 用于存储checkpoint的数据以及元数据的可靠文件系统目录,必须JobManager和TaskManager能同时访问到。 |
state.checkpoints.num-retained | 1 | 保留已完成检查点的最大数目 |
state.savepoints.dir | 没有 | 保存点目录。用于将保存点写入文件系统(MemoryStateBackend、FsStateBackend、RocksDBStateBackend)的状态后端。 |
taskmanager.state.local.root-dirs | 没有 | 定义根目录的配置参数,用于存储用于本地恢复的基于文件的状态。本地恢复目前只覆盖键控状态后端。目前,MemoryStateBackend不支持本地恢复并忽略此选项。 |
状态后端有3种,分别是MemoryStateBackend、FsStateBackend、RocksDBStateBackend
,选择什么状态后端,需要在flink-conf.yaml
中配置指定,或在程序代码中env.setStateBackend()
设置。
在默认情况下,选择MemoryStateBackend
。状态保存在TaskManager的内存中,检查点保存在JobManager内存中。
关于State Backend(状态后端)更多的内容。可参考:[官方文档][https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html],[Flink DataStream State Backend(状态后端)][https://blog.csdn.net/qq_33689414/article/details/94399204]
Flink目前仅为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了强制在迭代程序上检查点,用户在启用检查点时需要设置一个特殊标志force
。
// 参数3:true表示暴力(force)启动checkpoint,该方法已经过时
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。