赞
踩
Flink 是一个分布式的流处理引擎,而流处理的其中一个特点就是 7X24。那么,如何保障 Flink 作业的持续运行呢?Flink 的内部会将应用状态(state)存储到本地内存或者嵌入式的 kv 数据库(RocksDB)中,由于采用的是分布式架构,Flink 需要对本地生成的状态进行持久化存储,以避免因应用或者节点机器故障等原因导致数据的丢失,Flink 是通过 checkpoint(检查点)的方式将状态写入到远程的持久化存储,从而就可以实现不同语义的结果保障。通过本文,你可以了解到什么是 Flink 的状态,Flink 的状态是怎么存储的,Flink 可选择的状态后端(statebackend)有哪些,什么是全局一致性检查点,Flink 内部如何通过检查点实现 Exactly Once 的结果保障。
Flink 是一个分布式的流处理引擎,而流处理的其中一个特点就是 7X24。那么,如何保障 Flink 作业的持续运行呢?Flink 的内部会将应用状态(state)存储到本地内存或者嵌入式的 kv 数据库(RocksDB)中,由于采用的是分布式架构,Flink 需要对本地生成的状态进行持久化存储,以避免因应用或者节点机器故障等原因导致数据的丢失,Flink 是通过 checkpoint(检查点)的方式将状态写入到远程的持久化存储,从而就可以实现不同语义的结果保障。通过本文,你可以了解到什么是 Flink 的状态,Flink 的状态是怎么存储的,Flink 可选择的状态后端(statebackend)有哪些,什么是全局一致性检查点,Flink 内部如何通过检查点实现 Exactly Once 的结果保障。另外,本文内容较长,建议关注加收藏。
关于什么是状态,我们先不做过多的分析。首先看一个代码案例,其中案例 1 是 Spark 的 WordCount 代码,案例 2 是 Flink 的 WorkCount 代码。
object WordCount { def main(args:Array[String]){ val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()}}
输入:
C:\WINDOWS\system32>nc -lp 9999hello sparkhello spark
输出:
public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(Tuple2.of(word, 1)); } } }); words.keyBy(0).sum(1).print(); env.execute("WC"); }}
输入:
C:\WINDOWS\system32>nc -lp 9999hello Flinkhello Flink
输出:从上面的两个例子可以看出,在使用 Spark 进行词频统计时,当前的统计结果不受历史统计结果的影响,只计算接收的当前数据的结果,这个就可以理解为无状态的计算。再来看一下 Flink 的例子,可以看出当第二次词频统计时,把第一次的结果值也统计在了一起,即 Flink 把上一次的计算结果保存在了状态里,第二次计算的时候会先拿到上一次的结果状态,然后结合新到来的数据再进行计算,这就可以理解成有状态的计算,如下图所示。
Flink 提供了两种基本类型的状态:分别是 Keyed State
和Operator State
。根据不同的状态管理方式,每种状态又有两种存在形式,分别为:managed(托管状态)
和raw(原生状态)
。具体如下表格所示。需要注意的是,由于 Flink 推荐使用 managed state,所以下文主要讨论 managed state,对于 raw state,本文不会做过多的讨论。
Keyed State 只能由作用在 KeyedStream 上面的函数使用,该状态与某个 key 进行绑定,即每一个 key 对应一个 state。Keyed State 按照 key 进行维护和访问的,Flink 会为每一个 Key 都维护一个状态实例,该状态实例总是位于处理该 key 记录的算子任务上,因此同一个 key 的记录可以访问到一样的状态。如下图所示,可以通过在一条流上使用 keyBy()方法来生成一个 KeyedStream。Flink 提供了很多种 keyed state,具体如下:
用于保存类型为 T 的单个值。用户可以通过 ValueState.value()来获取该状态值,通过 ValueState.update()来更新该状态。使用ValueStateDescriptor
来获取状态句柄。
用于保存类型为 T 的元素列表,即 key 的状态值是一个列表。用户可以使用 ListState.add()或者 ListState.addAll()将新元素添加到列表中,通过 ListState.get()访问状态元素,该方法会返回一个可遍历所有元素的 Iterable<T>对象,注意 ListState 不支持删除单个元素,但是用户可以使用 update(List<T> values)来更新整个列表。使用 ListStateDescriptor
来获取状态句柄。
调用 add()方法添加值时,会立即返回一个使用 ReduceFunction 聚合后的值,用户可以使用 ReducingState.get()来获取该状态值。使用 ReducingStateDescriptor
来获取状态句柄。
与 ReducingState<T>类似,不同的是它使用的是 AggregateFunction 来聚合内部的值,AggregatingState.get()方法会计算最终的结果并将其返回。使用 AggregatingStateDescriptor
来获取状态句柄
用于保存一组 key、value 的映射,类似于 java 的 Map 集合。用户可以通过 get(UK key)方法获取 key 对应的状态,可以通过 put(UK k,UV value)方法添加一个键值,可以通过 remove(UK key)删除给定 key 的值,可以通过 contains(UK key)判断是否存在对应的 key。使用 MapStateDescriptor
来获取状态句柄。
在 Flink 1.4 的版本中标记过时,在未来的版本中会被移除,使用 AggregatingState 进行代替。
值得注意的是,上面的状态原语都支持通过 State.clear()方法来进行清除状态。另外,上述的状态原语仅用于与状态进行交互,真正的状态是存储在状态后端(后面会介绍状态后端)的,通过该状态原语相当于持有了状态的句柄(handle)。
下面给出一个 MapState 的使用案例,关于 ValueState 的使用情况可以参考官网,具体如下:
public class MapStateExample { //统计每个用户每种行为的个数 public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> { //定义一个 MapState 句柄 private transient MapState<String, Integer> behaviorCntState; // 初始化状态 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>( "userBehavior", // 状态描述符的名称 TypeInformation.of(new TypeHint<String>() {}), // MapState 状态的 key 的数据类型 TypeInformation.of(new TypeHint<Integer>() {}) // MapState 状态的 value 的数据类型 ); behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态 } @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception { Integer behaviorCnt = 1; // 如果当前状态包括该行为,则+1 if (behaviorCntState.contains(value.f1)) { behaviorCnt = behaviorCntState.get(value.f1) + 1; } // 更新状态 behaviorCntState.put(value.f1, behaviorCnt); out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt)); } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模拟数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .keyBy(0) .flatMap(new UserBehaviorCnt()) .print(); env.execute("MapStateExample"); }}
结果输出:
对于任何类型 Keyed State 都可以设定状态的生命周期(TTL),即状态的存活时间,以确保能够在规定时间内及时地清理状态数据。如果配置了状态的 TTL,那么当状态过期时,存储的状态会被清除。状态生命周期功能可以通过 StateTtlConfig 配置,然后将 StateTtlConfig 配置传入 StateDescriptor 中的 enableTimeToLive 方法中即可。代码示例如下:
StateTtlConfig ttlConfig = StateTtlConfig // 指定 TTL 时长为 10S .newBuilder(Time.seconds(10)) // 只对创建和写入操作有效 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 不返回过期的数据 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); // 初始化状态 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>( "userBehavior", // 状态描述符的名称 TypeInformation.of(new TypeHint<String>() {}), // MapState 状态的 key 的数据类型 TypeInformation.of(new TypeHint<Integer>() {}) // MapState 状态的 value 的数据类型 ); // 设置 stateTtlConfig userBehaviorMapStateDesc.enableTimeToLive(ttlConfig); behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 获取状态 }
在 StateTtlConfig 创建时,newBuilder 方法是必须要指定的,newBuilder 中设定过期时间的参数。对于其他参数都是可选的或使用默认值。其中 setUpdateType 方法中传入的类型有三种:
public enum UpdateType { //禁用 TTL,永远不会过期 Disabled, // 创建和写入时更新 TTL OnCreateAndWrite, // 与 OnCreateAndWrite 类似,但是在读操作时也会更新 TTL OnReadAndWrite }
值得注意的是,过期的状态数据根据 UpdateType 参数进行配置,只有被写入或者读取的时间才会更新 TTL,也就是说如果某个状态指标一直不被使用或者更新,则永远不会触发对该状态数据的清理操作,这种情况可能会导致系统中的状态数据越来越大。目前用户可以使用 StateTtlConfig.cleanupFullSnapshot 设定当触发 State Snapshot 的时候清理状态数据,但是改配置不适合用于 RocksDB 做增量 Checkpointing 的操作。
上面的 StateTtlConfig 创建时,可以指定 setStateVisibility,用于状态的可见性配置,根据过期数据是否被清理来确定是否返回状态数据。
/** * 是否返回过期的数据 */ public enum StateVisibility { //如果数据没有被清理,就可以返回 ReturnExpiredIfNotCleanedUp, //永远不返回过期的数据,默认值 NeverReturnExpired }
Operator State 的作用于是某个算子任务,这意味着所有在同一个并行任务之内的记录都能访问到相同的状态 。算子状态不能通过其他任务访问,无论该任务是相同的算子。如下图所示。Operator State 是一种 non-keyed state,与并行的操作算子实例相关联,例如在 Kafka Connector 中,每个 Kafka 消费端算子实例都对应到 Kafka 的一个分区中,维护 Topic 分区和 Offsets 偏移量作为算子的 Operator State。在 Flink 中可以实现 ListCheckpointed<T extends Serializable>接口或者 CheckpointedFunction 接口来实现一个 Operator State。
首先,我们先看一下这两个接口的具体实现,然后再给出这两种接口的具体使用案例。先看一下 ListCheckpointed 接口的源码,如下:
public interface ListCheckpointed<T extends Serializable> { /** * 获取某个算子实例的当前状态,该状态包括该算子实例之前被调用时的所有结果 * 以列表的形式返回一个函数状态的快照 * Flink 触发生成检查点时调用该方法 * @param checkpointId checkpoint 的 ID,是一个唯一的、单调递增的值 * @param timestamp Job Manager 触发 checkpoint 时的时间戳 * @return 返回一个 operator state list,如果为 null 时,返回空 list * @throws Exception */ List<T> snapshotState(long checkpointId, long timestamp) throws Exception; /** * 初始化函数状态时调用,可能是在作业启动时或者故障恢复时 * 根据提供的列表恢复函数状态 * 注意:当实现该方法时,需要在 RichFunction#open()方法之前调用该方法 * @param state 被恢复算子实例的 state 列表 ,可能为空 * @throws Exception */ void restoreState(List<T> state) throws Exception;}
使用 Operator ListState 时,在进行扩缩容时,重分布的策略(状态恢复的模式)如下图所示:上面的重分布策略为Even-split Redistribution,即每个算子实例中含有部分状态元素的 List 列表,整个状态数据是所有 List 列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度相同数量的 List 列表,每个 task 实例中有一个 List,其可以为空或者含有多个元素。
我们再来看一下 CheckpointedFunction 接口,源码如下:
public interface CheckpointedFunction { /** * 会在生成检查点之前调用 * 该方法的目的是确保检查点开始之前所有状态对象都已经更新完毕 * @param context 使用 FunctionSnapshotContext 作为参数 * 从 FunctionSnapshotContext 可以获取 checkpoint 的元数据信息, * 比如 checkpoint 编号,JobManager 在初始化 checkpoint 时的时间戳 * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * 在创建 checkpointedFunction 的并行实例时被调用, * 在应用启动或者故障重启时触发该方法的调用 * @param context 传入 FunctionInitializationContext 对象, * 可以使用该对象访问 OperatorStateStore 和 KeyedStateStore 对象, * 这两个对象可以获取状态的句柄,即通过 Flink runtime 来注册函数状态并返回 state 对象 * 比如:ValueState、ListState 等 * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception;}
CheckpointedFunction 接口是用于指定有状态函数的最底层的接口,该接口提供了用于注册和维护 keyed state 与 operator state 的 hook(即可以同时使用 keyed state 和 operator state),另外也是唯一支持使用 list union state。关于 Union List State,使用的是 Flink 为 Operator state 提供的另一种重分布的策略:Union Redistribution,即每个算子实例中含有所有状态元素的 List 列表,当触发 restore/redistribution 动作时,每个算子都能够获取到完整的状态元素列表。具体如下图所示:
ListCheckpointed 接口和 CheckpointedFunction 接口相比在灵活性上相对弱一些,只能支持 List 类型的状态,并且在数据恢复的时候仅支持even-redistribution策略。该接口不像 Flink 提供的 Keyed State(比如 Value State、ListState)那样直接在状态后端(state backend)注册,需要将 operator state 实现为成员变量,然后通过接口提供的回调函数与状态后端进行交互。使用代码案例如下:
public class ListCheckpointedExample { private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> { private Long userBuyBehaviorCnt = 0L; @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception { if(value.f1.equals("buy")){ userBuyBehaviorCnt ++; out.collect(Tuple2.of("buy",userBuyBehaviorCnt)); } } @Override public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { //返回单个元素的 List 集合,该集合元素是用户购买行为的数量 return Collections.singletonList(userBuyBehaviorCnt); } @Override public void restoreState(List<Long> state) throws Exception { // 在进行扩缩容之后,进行状态恢复,需要把其他 subtask 的状态加在一起 for (Long cnt : state) { userBuyBehaviorCnt += 1; } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模拟数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .flatMap(new UserBehaviorCnt()) .print(); env.execute("ListCheckpointedExample"); }}
CheckpointedFunction 接口提供了更加丰富的操作,比如支持 Union list state,可以访问 keyedState,关于重分布策略,如果使用 Even-split Redistribution 策略,则通过 context. getListState(descriptor)获取 Operator State;如果使用 UnionRedistribution 策略,则通过 context. getUnionList State(descriptor)来获取。使用案例如下:
public class CheckpointFunctionExample { private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> { // 统计每个 operator 实例的用户行为数量的本地变量 private Long opUserBehaviorCnt = 0L; // 每个 key 的 state,存储 key 对应的相关状态 private ValueState<Long> keyedCntState; // 定义 operator state,存储算子的状态 private ListState<Long> opCntState; @Override public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception { if (value.f1.equals("buy")) { // 更新算子状态本地变量值 opUserBehaviorCnt += 1; Long keyedCount = keyedCntState.value(); // 更新 keyedstate 的状态 ,判断状态是否为 null,否则空指针异常 keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 ); // 结果输出 out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt)); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 使用 opUserBehaviorCnt 本地变量更新 operator state opCntState.clear(); opCntState.add(opUserBehaviorCnt); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { // 通过 KeyedStateStore,定义 keyedState 的 StateDescriptor 描述符 ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() { })); // 通过 OperatorStateStore,定义 OperatorState 的 StateDescriptor 描述符 ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() { })); // 初始化 keyed state 状态值 keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor); // 初始化 operator state 状态 opCntState = context.getOperatorStateStore().getListState(opStateDescriptor); // 初始化本地变量 operator state for (Long state : opCntState.get()) { opUserBehaviorCnt += state; } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模拟数据源[userId,behavior,product] DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements( Tuple3.of(1L, "buy", "iphone"), Tuple3.of(1L, "cart", "huawei"), Tuple3.of(1L, "buy", "logi"), Tuple3.of(1L, "fav", "oppo"), Tuple3.of(2L, "buy", "huawei"), Tuple3.of(2L, "buy", "onemore"), Tuple3.of(2L, "fav", "iphone")); userBehaviors .keyBy(0) .flatMap(new UserBehaviorCnt()) .print(); env.execute("CheckpointFunctionExample"); }}
上面使用的状态都需要存储到状态后端(StateBackend),然后在 checkpoint 触发时,将状态持久化到外部存储系统。Flink 提供了三种类型的状态后端,分别是基于内存的状态后端(MemoryStateBackend、基于文件系统的状态后端(FsStateBackend)以及基于 RockDB 作为存储介质的RocksDB StateBackend。这三种类型的 StateBackend 都能够有效地存储 Flink 流式计算过程中产生的状态数据,在默认情况下 Flink 使用的是 MemoryStateBackend,区别见下表。下面分别对每种状态后端的特点进行说明。
MemoryStateBackend 将状态数据全部存储在 JVM 堆内存中,包括用户在使用 DataStream API 中创建的 Key/Value State,窗口中缓存的状态数据,以及触发器等数据。MemoryStateBackend 具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用 MemoryStateBackend。Flink 将 MemoryStateBackend 作为默认状态后端。
MemoryStateBackend 比较适合用于测试环境中,并用于本地调试和验证,不建议在生产环境中使用。但如果应用状态数据量不是很大,例如使用了大量的非状态计算算子,也可以在生产环境中使 MemoryStateBackend.
FsStateBackend 是基于文件系统的一种状态后端,这里的文件系统可以是本地文件系统,也可以是 HDFS 分布式文件系统。创建 FsStateBackend 的构造函数如下:
FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
其中 path 如果为本地路径,其格式为“file:///data/flink/checkpoints”,如果 path 为 HDFS 路径,其格式为“hdfs://nameservice/flink/checkpoints”。FsStateBackend 中第二个 Boolean 类型的参数指定是否以同步的方式进行状态数据记录,默认采用异步的方式将状态数据同步到文件系统中,异步方式能够尽可能避免在 Checkpoint 的过程中影响流式计算任务。如果用户想采用同步的方式进行状态数据的检查点数据,则将第二个参数指定为 True 即可。
相比于 MemoryStateBackend, FsStateBackend 更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或 Key/value State 状态数据量非常大的场景,这时系统内存不足以支撑状态数据的存储。同时 FsStateBackend 最大的好处是相对比较稳定,在 checkpoint 时,将状态持久化到像 HDFS 分布式文件系统中,能最大程度保证状态数据的安全性。
与前面的状态后端不同,RocksDBStateBackend 需要单独引入相关的依赖包。RocksDB 是一个 key/value 的内存存储系统,类似于 HBase,是一种内存磁盘混合的 LSM DB。当写数据时会先写进 write buffer(类似于 HBase 的 memstore),然后在 flush 到磁盘文件,当读取数据时会现在 block cache(类似于 HBase 的 block cache),所以速度会很快。
RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因为借助于 RocksDB 存储了最新热数据,然后通过异步的方式再同步到文件系统中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能就会较弱一些。
需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或 HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key 最大 2G,总大小不超过配置的文件系统容量即可。对于超大状态的作业,例如天级窗口聚合等场景下可以使会用该状态后端。
Flink 默认使用的状态后端是 MemoryStateBackend,所以不需要显示配置。对于其他的状态后端,都需要进行显性配置。在 Flink 中包含了两种级别的 StateBackend 配置:一种是在程序中进行配置,该配置只对当前应用有效;另外一种是通过 flink-conf.yaml
进行全局配置,一旦配置就会对整个 Flink 集群上的所有应用有效。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果使用 RocksDBStateBackend 则需要单独引入 rockdb 依赖库,如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope></dependency>
使用方式与 FsStateBackend 类似,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
具体的配置项在 flink-conf.yaml 文件中,如下代码所示,参数 state.backend 指明 StateBackend 类型,state.checkpoints.dir 配置具体的状态存储路径,代码中使用 filesystem 作为 StateBackend,然后指定相应的 HDFS 文件路径作为 state 的 checkpoint 文件夹。
# 使用 filesystem 存储state.backend: filesystem# checkpoint 存储路径state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
如果想用 RocksDBStateBackend 配置集群级别的状态后端,可以使用下面的配置:
# 操作 RocksDBStateBackend 的线程数量,默认值为 1state.backend.rocksdb.checkpoint.transfer.thread.num: 1# 指定 RocksDB 存储状态数据的本地文件路径state.backend.rocksdb.localdir: /var/rockdb/checkpoints# 用于指定定时器服务的工厂类实现类,默认为“HEAP”,也可以指定为“RocksDB”state.backend.rocksdb.timer-service.factory: HEAP
上面讲解了 Flink 的状态以及状态后端,状态是存储在状态后端。为了保证 state 容错,Flink 提供了处理故障的措施,这种措施称之为 checkpoint(一致性检查点)。checkpoint 是 Flink 实现容错的核心功能,主要是周期性地触发 checkpoint,将 state 生成快照持久化到外部存储系统(比如 HDFS)。这样一来,如果 Flink 程序出现故障,那么就可以从上一次 checkpoint 中进行状态恢复,从而提供容错保障。另外,通过 checkpoint 机制,Flink 可以实现 Exactly-once 语义(Flink 内部的 Exactly-once,关于端到端的 exactly_once,Flink 是通过两阶段提交协议实现的)。下面将会详细分析 Flink 的 checkpoint 机制。
如上图,输入流是用户行为数据,包括购买(buy)和加入购物车(cart)两种,每种行为数据都有一个偏移量,统计每种行为的个数。
第一步:JobManager checkpoint coordinator 触发 checkpoint。
第二步:假设当消费到[cart,3]这条数据时,触发了 checkpoint。那么此时数据源会把消费的偏移量 3 写入持久化存储。
第三步:当写入结束后,source 会将 state handle(状态存储路径)反馈给 JobManager 的 checkpoint coordinator。
第四步:接着算子 count buy 与 count cart 也会进行同样的步骤
第五步:等所有的算子都完成了上述步骤之后,即当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件,那么整个 checkpoint 也就完成了,如果中间有一个不成功,那么本次 checkpoin 就宣告失败。
通过上面的分析,或许你已经对 Flink 的 checkpoint 有了初步的认识。那么接下来,我们看一下是如何从检查点恢复的。
重启作业
恢复检查点
继续处理数据上述过程具体总结如下:
第一步:重启作业
第二步:从上一次检查点恢复状态数据
第三步:继续处理新的数据
Flink 提供了精确一次的处理语义,精确一次的处理语义可以理解为:数据可能会重复计算,但是结果状态只有一个。Flink 通过 Checkpoint 机制实现了精确一次的处理语义,Flink 在触发 Checkpoint 时会向 Source 端插入 checkpoint barrier,checkpoint barriers 是从 source 端插入的,并且会向下游算子进行传递。checkpoint barriers 携带一个 checkpoint ID,用于标识属于哪一个 checkpoint,checkpoint barriers 将流逻辑是哪个分为了两部分。对于双流的情况,通过 barrier 对齐的方式实现精确一次的处理语义。
关于什么是 checkpoint barrier,可以看一下 CheckpointBarrier 类的源码描述,如下:
/** * Checkpoint barriers 用来在数据流中实现 checkpoint 对齐的. * Checkpoint barrier 由 JobManager 的 checkpoint coordinator 插入到 Source 中, * Source 会把 barrier 广播发送到下游算子,当一个算子接收到了其中一个输入流的 Checkpoint barrier 时, * 它就会知道已经处理完了本次 checkpoint 与上次 checkpoint 之间的数据. * * 一旦某个算子接收到了所有输入流的 checkpoint barrier 时, * 意味着该算子的已经处理完了截止到当前 checkpoint 的数据, * 可以触发 checkpoint,并将 barrier 向下游传递 * * 根据用户选择的处理语义,在 checkpoint 完成之前会缓存后一次 checkpoint 的数据, * 直到本次 checkpoint 完成(exactly once) * * checkpoint barrier 的 id 是严格单调递增的 * */ public class CheckpointBarrier extends RuntimeEvent {...}
可以看出 checkpoint barrier 主要功能是实现 checkpoint 对齐的,从而可以实现 Exactly-Once 处理语义。
下面将会对 checkpoint 过程进行分解,具体如下:
图 1,包括两个流,每个任务都会消费一条用户行为数据(包括购买(buy)和加购(cart)),数字代表该数据的偏移量,count buy 任务统计购买行为的个数,coun cart 统计加购行为的个数。图 2,触发 checkpoint,JobManager 会向每个数据源发送一个新的 checkpoint 编号,以此来启动检查点生成流程。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// checkpoint 的时间间隔,如果状态比较大,可以适当调大该值env.enableCheckpointing(1000);// 配置处理语义,默认是 exactly-onceenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 两个 checkpoint 之间的最小时间间隔,防止因 checkpoint 时间过长,导致 checkpoint 积压env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// checkpoint 执行的上限时间,如果超过该阈值,则会中断 checkpointenv.getCheckpointConfig().setCheckpointTimeout(60000);// 最大并行执行的检查点数量,默认为 1,可以指定多个,从而同时出发多个 checkpoint,提升效率env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设定周期性外部检查点,将状态数据持久化到外部系统中,// 使用该方式不会在任务正常停止的过程中清理掉检查点数据env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// allow job recovery fallback to checkpoint when there is a more recent savepointenv.getCheckpointConfig().setPreferCheckpointForRecovery(true);
本文首先从 Flink 的状态入手,通过 Spark 的 WordCount 和 Flink 的 Work Count 进行说明什么是状态。接着对状态的分类以及状态的使用进行了详细说明。然后对 Flink 提供的三种状态后端进行讨论,并给出了状态后端的使用说明。最后,以图解加文字的形式详细解释了 Flink 的 checkpoint 机制,并给出了使用 Checkpoint 时的程序配置。
阅读全文: http://gitbook.cn/gitchat/activity/5ea99e1beca1907bf8645b72
您还可以下载 CSDN 旗下精品原创内容社区 GitChat App ,阅读更多 GitChat 专享技术内容哦。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。