赞
踩
前提:
1 flink state分为三种,1)operator state是跟算子关联的,粒度是task,即便相同的算子的其他并行的task也不能互相访问各自的状态。 2)keyed state是跟key stream关联的。粒度是key,相同的task不同key的数据状态不共享,只有相同key才可以共享状态。, 3)broadcast state, 分为批Set的广播状态和流stream的流合并,一个广播流,一个事实流。
当Flink左右从checkpoint恢复,或者从savepoint中重启的时候,就回涉及到状态的重新分配,尤其是当并行度发生改变的时候。
当operator改变并行度的时候(Rescale),会触发状态的Redistribute,即Operator State里的数据会重新分配到Operator的Task实例。这里有三种方式,举例说明平行度由3改为2
1 这个是普通的ListState, 原先所有State中的元素均匀划分给新的Task
2 这个是UnionList State,所有的State中的元素全部分配给新的Task
3 还有一张是BroadState State,所有Task上的State都是一样的,新的Task获得State的一个备份。
选择方式:
由于是operate state所以只能在operate中使用,并且由于是state从checkpoint中的重分配,所以对应的类必须实现CheckpointedFunction接口。如下图方式应用,这样就会初始化ListState了,就可以不用在open方法里初始化了。
- public class BufferingSink
- implements SinkFunction<Tuple2<String, Integer>>,
- CheckpointedFunction {
-
- private final int threshold;
-
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
- private List<Tuple2<String, Integer>> bufferedElements;
-
- public BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
-
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- checkpointedState.clear();
- for (Tuple2<String, Integer> element : bufferedElements) {
- checkpointedState.add(element);
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- ListStateDescriptor<Tuple2<String, Integer>> descriptor =
- new ListStateDescriptor<>(
- "buffered-elements",
- TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
-
- checkpointedState = context.getOperatorStateStore().getListState(descriptor);
- //在这里选择模式,getListState或者getUnionListState
-
- if (context.isRestored()) {
- for (Tuple2<String, Integer> element : checkpointedState.get()) {
- bufferedElements.add(element);
- }
- }
- }
- }

对于Keyed State就比较好处理了,对应的Key被Redistribute到哪个task,对应的keyed state就被Redistribute到哪个Task
Keyed state redistribute是基于Key Group来做分配的:
将key分为group
每个key分配到唯一的group
将group分配给task实例
Keygroup由最大并行度的大小所决定
Keyed State最终被分配到哪个Task,需要经过一下三个步骤:
hash = hash(key)
KeyGroup = hash%numOfKeyGroups
SubTask=KeyGroup*parallelism/numOfKeyGroups
non-keyed state
);其中Keyed State只能在KeyedStream上的functions及operators上使用;每个operator state会跟parallel operator中的一个实例绑定;Operator State支持parallelism变更时进行redistributing第一次初始化或者从前一次checkpoint recover的时候
)被调用,该方法不仅可以用来初始化state,还可以用于处理state recovery的逻辑在restore/redistribution的时候每个operator仅仅得到整个state的sublist
)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list
)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。