当前位置:   article > 正文

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State_flink keystate

flink keystate

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了Flink State中的keyed state 基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章:

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

关于Flink state的更多介绍参考文章:

8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
</dependencies>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

二、Keyed State

1、Keyed State 介绍及示例

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(…) 得到 KeyedStream.

所有支持的状态类型如下所示:

  • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

  • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

  • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

  • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 另外从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。

根据不同的状态类型,可以创建ValueStateDescriptor,ListStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor。

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法:

ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
  • 1
  • 2
  • 3
  • 4
  • 5

下面是一个 FlatMapFunction 的例子,展示了如何将这些部分组合起来:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

2、Keyed State状态有效期 (TTL)

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。
所有状态类型都支持单元素的 TTL, 这意味着列表元素和映射元素将独立到期。
在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。

  • TTL 的更新策略(默认是 OnCreateAndWrite):
    StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
    StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
  • 数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
    StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
    StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
    NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

1)、过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。

可以通过 StateTtlConfig 配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以按照如下所示配置更细粒度的后台清理策略。截至Flink 1.17版本的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

2)、全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。截至Flink 1.17版本实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

该策略可以通过 StateTtlConfig 配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

3)、增量数据清理

可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

该策略有两个参数。

第一个是每次清理时检查状态的条目数,在每个状态访问时触发。

第二个参数表示是否在处理每条记录时触发清理。

Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

4)、在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 可以通过StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。

RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

3、keyed state示例:实现地铁站哪个进站口人数最多

实际生产中,一般不需要自己实现state,除非特殊情况。

本示例仅仅用于展示state的工作过程。

实现地铁站哪个进站口人数最多,可以统计最近一段时间内的,也可以统计某一时刻的,简单起见,本处示例模糊该概念,就以输入数据的进行分组,有兴趣的读者可以自己基于前一篇的watermaker进行实现,也比较的简单。

本示例是模拟maxBy的state实现。

1)、java bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
	private Long enterTime;

	public Subway(String sNo, Integer userCount) {
		this.sNo = sNo;
		this.userCount = userCount;
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2)、实现

import java.util.Random;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.watermaker.Subway;

/**
 * @author alanchan
 *
 */
public class KeyedStateDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStreamSource<Subway> subwayDS = env.addSource(new SourceFunction<Subway>() {
			private boolean flag = true;

			@Override
			public void run(SourceContext<Subway> ctx) throws Exception {
				Random random = new Random();
				while (flag) {
					String sNo = "No" + random.nextInt(3);
					int userCount = random.nextInt(100);
					long eventTime = System.currentTimeMillis();
					Subway subway = new Subway(sNo, userCount, eventTime);
					System.err.println(subway + " ,格式化后时间 " + df.format(subway.getEnterTime()));

					ctx.collect(subway);
					Thread.sleep(1000);
				}
			}

			@Override
			public void cancel() {
				flag = false;
			}
		});

		// transformation
		// 实际中使用maxBy即可
		DataStream<Subway> maxByResult = subwayDS.keyBy(subway -> subway.getSNo()).maxBy("userCount");

		// 使用KeyState中的ValueState来实现maxBy的功能
		DataStream<Tuple3<String, Integer, Integer>> stateResult =
				// RichMapFunction<IN, OUT>
				subwayDS.keyBy(subway -> subway.getSNo()).map(new RichMapFunction<Subway, Tuple3<String, Integer, Integer>>() {
					// 定义一个状态用来存放最大值
					private ValueState<Integer> maxValueStateData;

					// 状态初始化
					@Override
					public void open(Configuration parameters) throws Exception {
						// 创建状态描述器
						ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Integer.class);
						// 根据状态描述器获取/初始化状态
						maxValueStateData = getRuntimeContext().getState(stateDescriptor);
					}

					@Override
					public Tuple3<String, Integer, Integer> map(Subway inValue) throws Exception {
						Integer currentValue = inValue.getUserCount();
						Tuple3<String, Integer, Integer> tuple3 = null;
						Integer historyValue = maxValueStateData.value();
						// 判断状态
						if (historyValue == null || currentValue > historyValue) {
							historyValue = currentValue;
							// 更新状态
							maxValueStateData.update(historyValue);

						}
						tuple3 = Tuple3.of(inValue.getSNo(), currentValue, historyValue);
						return tuple3;
					}
				});

		// sink
		maxByResult.print("maxBy");
		stateResult.print("stateResult");

		// execute
		env.execute();
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

3)、验证

此处验证比较简单,比较一下maxby的运行结果与自己实现的maxby运行结果是否一致即可。

maxby采用的是subway输出,自己实现使用的tuple3。

Subway(sNo=No1, userCount=33, enterTime=1689227364582) ,格式化后时间 13:49:24
maxBy:10> Subway(sNo=No1, userCount=33, enterTime=1689227364582)
stateResult:10> (No1,33,33)
Subway(sNo=No1, userCount=10, enterTime=1689227365613) ,格式化后时间 13:49:25
stateResult:10> (No1,10,33)
maxBy:10> Subway(sNo=No1, userCount=33, enterTime=1689227364582)
Subway(sNo=No0, userCount=20, enterTime=1689227366627) ,格式化后时间 13:49:26
stateResult:10> (No0,20,20)
maxBy:10> Subway(sNo=No0, userCount=20, enterTime=1689227366627)
Subway(sNo=No0, userCount=66, enterTime=1689227367633) ,格式化后时间 13:49:27
maxBy:10> Subway(sNo=No0, userCount=66, enterTime=1689227367633)
stateResult:10> (No0,66,66)
Subway(sNo=No2, userCount=2, enterTime=1689227368649) ,格式化后时间 13:49:28
stateResult:3> (No2,2,2)
maxBy:3> Subway(sNo=No2, userCount=2, enterTime=1689227368649)
Subway(sNo=No1, userCount=87, enterTime=1689227369662) ,格式化后时间 13:49:29
stateResult:10> (No1,87,87)
maxBy:10> Subway(sNo=No1, userCount=87, enterTime=1689227369662)
Subway(sNo=No1, userCount=96, enterTime=1689227370675) ,格式化后时间 13:49:30
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,96,96)
Subway(sNo=No1, userCount=58, enterTime=1689227371680) ,格式化后时间 13:49:31
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,58,96)
Subway(sNo=No1, userCount=24, enterTime=1689227372681) ,格式化后时间 13:49:32
maxBy:10> Subway(sNo=No1, userCount=96, enterTime=1689227370675)
stateResult:10> (No1,24,96)
Subway(sNo=No2, userCount=20, enterTime=1689227373695) ,格式化后时间 13:49:33
stateResult:3> (No2,20,20)
maxBy:3> Subway(sNo=No2, userCount=20, enterTime=1689227373695)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

以上,本文介绍了Flink State中的keyed state 基本功能及示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/891472
推荐阅读
相关标签
  

闽ICP备14008679号