当前位置:   article > 正文

flink--state状态管理_flink state包含几个状态

flink state包含几个状态

前提:

1 flink state分为三种,1)operator state是跟算子关联的,粒度是task,即便相同的算子的其他并行的task也不能互相访问各自的状态。 2)keyed state是跟key stream关联的。粒度是key,相同的task不同key的数据状态不共享,只有相同key才可以共享状态。, 3)broadcast state, 分为批Set的广播状态和流stream的流合并,一个广播流,一个事实流。

当Flink左右从checkpoint恢复,或者从savepoint中重启的时候,就回涉及到状态的重新分配,尤其是当并行度发生改变的时候。

Operator State Redistribute

当operator改变并行度的时候(Rescale),会触发状态的Redistribute,即Operator State里的数据会重新分配到Operator的Task实例。这里有三种方式,举例说明平行度由3改为2

1 这个是普通的ListState, 原先所有State中的元素均匀划分给新的Task

2 这个是UnionList State,所有的State中的元素全部分配给新的Task

3 还有一张是BroadState State,所有Task上的State都是一样的,新的Task获得State的一个备份。

选择方式:

由于是operate state所以只能在operate中使用,并且由于是state从checkpoint中的重分配,所以对应的类必须实现CheckpointedFunction接口。如下图方式应用,这样就会初始化ListState了,就可以不用在open方法里初始化了。

  1. public class BufferingSink
  2. implements SinkFunction<Tuple2<String, Integer>>,
  3. CheckpointedFunction {
  4. private final int threshold;
  5. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  6. private List<Tuple2<String, Integer>> bufferedElements;
  7. public BufferingSink(int threshold) {
  8. this.threshold = threshold;
  9. this.bufferedElements = new ArrayList<>();
  10. }
  11. @Override
  12. public void invoke(Tuple2<String, Integer> value) throws Exception {
  13. bufferedElements.add(value);
  14. if (bufferedElements.size() == threshold) {
  15. for (Tuple2<String, Integer> element: bufferedElements) {
  16. // send it to the sink
  17. }
  18. bufferedElements.clear();
  19. }
  20. }
  21. @Override
  22. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  23. checkpointedState.clear();
  24. for (Tuple2<String, Integer> element : bufferedElements) {
  25. checkpointedState.add(element);
  26. }
  27. }
  28. @Override
  29. public void initializeState(FunctionInitializationContext context) throws Exception {
  30. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  31. new ListStateDescriptor<>(
  32. "buffered-elements",
  33. TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  34. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  35. //在这里选择模式,getListState或者getUnionListState
  36. if (context.isRestored()) {
  37. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  38. bufferedElements.add(element);
  39. }
  40. }
  41. }
  42. }

Keyed State Redistribute

对于Keyed State就比较好处理了,对应的Key被Redistribute到哪个task,对应的keyed state就被Redistribute到哪个Task

Keyed state redistribute是基于Key Group来做分配的:

    将key分为group
    每个key分配到唯一的group
    将group分配给task实例
    Keygroup由最大并行度的大小所决定

Keyed State最终被分配到哪个Task,需要经过一下三个步骤:

    hash = hash(key)
    KeyGroup = hash%numOfKeyGroups
    SubTask=KeyGroup*parallelism/numOfKeyGroups
 

小结

  • flink有两种基本的state,分别是Keyed State以及Operator State(non-keyed state);其中Keyed State只能在KeyedStream上的functions及operators上使用;每个operator state会跟parallel operator中的一个实例绑定;Operator State支持parallelism变更时进行redistributing
  • Keyed State及Operator State都分别有managed及raw两种形式,managed由flink runtime来管理,由runtime负责encode及写入checkpoint;raw形式的state由operators自己管理,flink runtime无法了解该state的数据结构,将其视为raw bytes;所有的datastream function都可以使用managed state,而raw state一般仅限于自己实现operators来使用
  • stateful function可以通过CheckpointedFunction接口或者ListCheckpointed接口来使用managed operator state;CheckpointedFunction定义了snapshotState、initializeState两个方法;每当checkpoint执行的时候,snapshotState会被调用;而initializeState方法在每次用户定义的function初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,该方法不仅可以用来初始化state,还可以用于处理state recovery的逻辑
  • 对于manageed operator state,目前仅仅支持list-style的形式,即要求state是serializable objects的List结构,方便在rescale的时候进行redistributed;关于redistribution schemes的模式目前有两种,分别是Even-split redistribution(在restore/redistribution的时候每个operator仅仅得到整个state的sublist)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list)
  • FunctionSnapshotContext继承了ManagedSnapshotContext接口,它定义了getCheckpointId、getCheckpointTimestamp方法;FunctionInitializationContext继承了ManagedInitializationContext接口,它定义了isRestored、getOperatorStateStore、getKeyedStateStore方法,可以用来判断是否是在前一次execution的snapshot中restored,以及获取OperatorStateStore、KeyedStateStore对象

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/891477
推荐阅读
相关标签
  

闽ICP备14008679号