赞
踩
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下Flink Checkpoint机制,如官网下图所示:
Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
开启Checkpoint功能,有两种方式。其一是在conf/flink_conf.yaml中做系统设置;其二是针对任务再代码里灵活配置。但是我个人推荐第二种方式,针对当前任务设置,设置代码如下所示:
1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2 // 设置保存点的保存路径,这里是保存在hdfs中
3 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
4 CheckpointConfig config = env.getCheckpointConfig();
5 // 任务流取消和故障应保留检查点
6 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
7 // 保存点模式:exactly_once
8 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
9 // 触发保存点的时间间隔
10 config.setCheckpointInterval(60000);
上面调用enableExternalizedCheckpoints设置为ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。上面代码配置了执行Checkpointing的时间间隔为1分钟。
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
这样设置以后,运行Flink Job,并查看对应的Checkpoint在HDFS上存储的文件目录,示例如下所示:
hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/ Found 22 items drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877 drwxr-xr-x - hadoop supergroup 0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared drwxr-xr-x - hadoop supergroup 0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned
可见,我们配置了state.checkpoints.num-retained的值为20,只保留了最近的20个Checkpoint。如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令:
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据,如下所示:
hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
Found 6 items
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned
从上面我们可以看到,前面Flink Job的ID为582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID为名称的目录里面,当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863等等。
checkpoint是Flink容错的核心机制。它可以定期的将各个Operator处理的数据进行快照存储(Snapshot)。
如果Flink程序出现宕机,可以重新从这些快照中恢复数据。
Flink容错机制的核心就是持续创建分布式数据流及其状态的一致快照。Flink的checkpoint是通过分布式快照实现的,
所以在flink中这两个词是一个意思。
checkpoint用来保证任务的错误恢复。任务失败可以从最新的checkpoint恢复。
checkpoint机制需要一个可靠的可以回放数据的数据源(kafka,RabbitMQ,HDFS…)和一个存放state的持久存储(hdfs,S3)
通过调用StreamExecutionEnvironment.enableCheckpointing(internal,mode) 启用checkpoint 。
internal 默认是 -1,表示checkpoint不开启,mode默认是EXACTLY_ONCE模式。
可设置checkpoint timeout,超过这个时间 checkpoint 没有成功,checkpoint 终止。默认 10分钟。
可设置 chekpoint 失败任务是否任务也失败,默认是true
可设置同时进行的checkpoint数量,默认是1.
将barrier插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据流严格有序。
一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入到下一个快照。
每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。
多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。
Barrier在数据源端插入,当 snapshot 的barrier 插入后,系统会记录当前snashot 位置值n (用Sn表示。)
例如:在 Apache Kafka中,这个变量表示某个分区中最后一条数据的偏移量。这个位置值Sn 会被发送到一个称为 checkpoint coordinator的模块。
然后 barrier 继续向下移动,当一个 operator 从其输入流接收到所有标识 snapshot n 的 barrier时,它会向其所有输入流插入一个标识 snapshotn 的barrier。当sink operator (DAG流的终点) 从其输入流接收到所有 barrier n时,它向 the checkpoint coordinator 确认 snapshot n 已完成。
当所有 sink 都确认了这个快照,快照就被标识为完成。
Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpoint机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。
Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创建Savepoint过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。
而且,强烈建议手动为每个Operator设置ID,即使未来Flink应用程序可能会改动很大,比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。
下面,我们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码如下所示:
DataStream<String> stream = env.
// 有状态的source ID (例如:Kafka)
.addSource(new StatefulSource())
.uid("source-id") // source操作的ID
.shuffle()
// 有状态的mapper ID
.map(new StatefulMapper())
.uid("mapper-id") // mapper的ID
// 无状态sink打印
.print(); // 自动生成ID
创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定:
一种是,需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录,例如如下所示:
state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另一种是,在手动执行savepoint命令的时候,指定Savepoint存储目录,命令格式如下所示:
bin/flink savepoint :jobId [:targetDirectory]
例如,正在运行的Flink Job对应的ID为40dcc6d2ba90f13930abce295de8d038,使用默认state.savepoints.dir配置指定的Savepoint目录,执行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
可以看到,在目录hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。
为正在运行的Flink Job指定一个目录存储Savepoint数据,执行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
可以看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。
现在,我们可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然后通过Savepoint命令来恢复Job运行,命令格式如下所示:
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint为例,恢复Job运行,执行如下命令:
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
可以看到,启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd。
下面,我们看一下Savepoint目录下面存储内容的结构,如下所示:
hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r-- 3 hadoop supergroup 4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r-- 3 hadoop supergroup 4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r-- 3 hadoop supergroup 4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r-- 3 hadoop supergroup 4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r-- 3 hadoop supergroup 4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a
如上面列出的HDFS路径中,11bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串,然后savepoint-11bbc5-bd967f90709b作为存储此次Savepoint数据的根目录,最后savepoint-11bbc5-bd967f90709b目录下面_metadata文件包含了Savepoint的元数据信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目录下面其它文件的路径,这些文件内容都是序列化的状态信息。
flink web页面中提供了针对Job Checkpoint相关的监控信息。Checkpoint监控页面共有overview、history、summary和configuration四个页签,分别对Checkpoint从不同的角度进行了监控,每个页面中都包含了与Checkpointing相关的指标。
overview页签中宏观地记录了flink应用中Checkpoint的数量以及Checkpoint的最新记录,包括失败和完成的Checkpoint记录。
overview页签中包含了一下指标:
history页面记录了历史触发Checkpoint的详情,包括Checkpoint的ID、状态、触发时间,最后一次Acknowledgement信息等,通过点击More details对应的链接可以查看子task对应的Checkpoint数据
summary页面中记录了所有完成的Checkpoint统计指标的最大值、最小值,以及平均值等,指标中包含端对端的持续时间、状态大小,以及分配过程中缓冲的数据大小。
Flink 提供了三种可用的状态后端:MemoryStateBackend,FsStateBackend,和RocksDBStateBackend。
MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序 checkpoint 时,此后端会在将状态发给 JobManager 之前快照下状态,JobManager 也将状态存储在 Java 堆上。默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。
使用 MemoryStateBackend 时的注意点:
默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。
状态大小受到 akka 帧大小的限制,所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。也可以通过 akka.framesize 调整 akka 帧大小(通过配置文档了解更多)。
状态的总大小不能超过 JobManager 的内存。
何时使用 MemoryStateBackend:
本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。
MemoryStateBackend 最适合小状态的应用场景。例如 Kafka consumer,或者一次仅一记录的函数 (Map, FlatMap,或 Filter)。
FsStateBackend 需要配置的主要是文件系统,如 URL(类型,地址,路径)。举个例子,比如可以是:
“hdfs://namenode:40010/flink/checkpoints” 或
“s3://flink/checkpoints”
当选择使用 FsStateBackend 时,正在进行的数据会被存在 TaskManager 的内存中。在 checkpoint 时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在 JobManager 的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。
默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入 false 的布尔标志来禁用掉,例如:
new FsStateBackend(path, false);
使用 FsStateBackend 时的注意点:
当前的状态仍然会先存在 TaskManager 中,所以状态的大小不能超过 TaskManager 的内存。
何时使用 FsStateBackend:
FsStateBackend 适用于处理大状态,长窗口,或大键值状态的有状态处理任务。
FsStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径),如下所示:
“hdfs://namenode:40010/flink/checkpoints” 或
“s3://flink/checkpoints”
RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。
使用 RocksDBStateBackend 时的注意点:
RocksDB 支持的单 key 和单 value 的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。
我们需要强调的是,对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。
何时使用 RocksDBStateBackend:
RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。
RocksDBStateBackend 非常适合用于高可用方案。
RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端。增量 checkpoint 非常使用于超大状态的场景。
当使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。使用 RocksDB 的权衡点在于所有的状态相关的操作都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。
本文主要研究下flink的checkpoint配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE
也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
#============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false
state.backend用于指定checkpoint state存储的backend,默认为none
state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数
state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file
state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
state.backend.local-recovery,默认为false
state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
state.savepoints.dir,默认为none,用于指定savepoints的默认目录
taskmanager.state.local.root-dirs,默认为none
可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)
checkpoint的高级配置可以配置checkpointTimeout(用于指定checkpoint执行的超时时间,单位milliseconds),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起作用),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)
在flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir等
来源:
Flink的Checkpoint和Savepoint介绍:https://www.shuzhiduo.com/A/q4zVy2D2dK/
Flink的CheckPoint:https://www.shuzhiduo.com/A/xl56Y1XxJr/
flink的checkpoint页面监控:https://www.shuzhiduo.com/A/Gkz13xY65R/
flink checkpoint状态储存三种方式选择:https://www.shuzhiduo.com/A/nAJv0qYQdr/
聊聊flink的checkpoint配置:https://www.shuzhiduo.com/A/kjdwp8NEzN/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。