赞
踩
StreamExecutionEnvironment
是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment
有不同的具体实现类:LocalStreamEnvironment
、RemoteStreamEnvironment
、StreamContextEnvironment
等。StreamExecutionEnvironment
也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfig
和 CheckpointConfig
中。现在先只关注拓扑结构的产生过程。通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:
- env.addSource(XXX)
- .map(XXX)
- .filter(XXX)
- .addSink(XXX)
-
- env.execute("job-name");
添加数据源后获得 DataStream
, 之后通过不同的算子不停地在 DataStream
上实现转换过滤等逻辑,最终将结果输出到 sink
中。通过env.execute()执行作业,这里首先会生成一张StreaGraph:
- public JobExecutionResult execute(String jobName) throws Exception {
- return execute(getStreamGraph(jobName));
- }
-
- @Internal
- public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
- StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
- if (clearTransformations) {
- // transformations是一个list集合,存放所有的Transformation
- this.transformations.clear();
- }
- return streamGraph;
- }
在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations
来保留生成 DataStream
的所有转换。
StreamTransformation
代表了从一个或多个DataStream
生成新DataStream
的操作。DataStream
的底层其实就是一个 StreamTransformation
,描述了这个DataStream
是怎么来的。
DataStream 上常见的 transformation 有 map、flatmap、filter等(更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。比如 DataStream.map
源码如下:
- // SingleOutputStreamOperator为DataStream的子类
- public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
- // 通过java reflection抽出mapper的返回值类型
- TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
- Utils.getCallLocationName(), true);
-
- // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类
- return transform("Map", outType, new StreamMap<>(clean(mapper)));
- }
-
- public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
-
- // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树
- OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operator,
- outTypeInfo,
- environment.getParallelism());
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
-
- // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph
- getExecutionEnvironment().addOperator(resultTransform);
-
- return returnStream;
- }
从上方代码可以了解到,map转换将用户自定义的函数MapFunction
包装到StreamMap
这个Operator中,再将StreamMap
包装到OneInputTransformation
,最后将该transformation存到env中,当调用env.execute
时,遍历其中的transformation集合构造出StreamGraph。其分层实现如下图所示:
另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。
union、split/select、partition中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现,UnionTransformation
,SplitTransformation
,SelectTransformation
,PartitionTransformation
由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。
DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。StreamOperator
类图如下:
所有实现类都继承了AbstractStreamOperator
。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator
,该类是封装了UDF的StreamOperator。UDF就是实现了Function
接口的类,如MapFunction、
FilterFunction
。
StreamGraphGenerator
会基于 StreamExecutionEnvironment
的 transformations
列表来生成 StreamGraph
。
在遍历 List<StreamTransformation>
生成 StreamGraph
的时候,会递归调用StreamGraphGenerator.transform
方法。对于每一个 StreamTransformation
, 确保当前其上游已经完成转换。StreamTransformations
被转换为 StreamGraph
中的节点 StreamNode
,并为上下游节点添加边 StreamEdge
。
- public StreamGraph generate() {
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- ......
- // 遍历transformations列表,进行转换
- for (Transformation<?> transformation : transformations) {
- transform(transformation);
- }
-
- final StreamGraph builtStreamGraph = streamGraph;
-
- alreadyTransformed.clear();
- alreadyTransformed = null;
- streamGraph = null;
-
- return builtStreamGraph;
- }
-
- // 对具体的一个transformation进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
- // 返回值为该transform的id集合,通常大小为1个(除FeedbackTransformation)
- private Collection<Integer> transform(StreamTransformation<?> transform) {
- // 跳过已经转换过的transformation
- if (alreadyTransformed.containsKey(transform)) {
- return alreadyTransformed.get(transform);
- }
-
- LOG.debug("Transforming " + transform);
-
- // 为了触发 MissingTypeInfo 的异常
- transform.getOutputType();
-
- Collection<Integer> transformedIds;
- if (transform instanceof OneInputTransformation<?, ?>) {
- transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
- } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
- transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
- } else if (transform instanceof SourceTransformation<?>) {
- transformedIds = transformSource((SourceTransformation<?>) transform);
- } else if (transform instanceof SinkTransformation<?>) {
- transformedIds = transformSink((SinkTransformation<?>) transform);
- } else if (transform instanceof UnionTransformation<?>) {
- transformedIds = transformUnion((UnionTransformation<?>) transform);
- } else if (transform instanceof SplitTransformation<?>) {
- transformedIds = transformSplit((SplitTransformation<?>) transform);
- } else if (transform instanceof SelectTransformation<?>) {
- transformedIds = transformSelect((SelectTransformation<?>) transform);
- } else if (transform instanceof FeedbackTransformation<?>) {
- transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
- } else if (transform instanceof CoFeedbackTransformation<?>) {
- transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
- } else if (transform instanceof PartitionTransformation<?>) {
- transformedIds = transformPartition((PartitionTransformation<?>) transform);
- } else {
- throw new IllegalStateException("Unknown transformation: " + transform);
- }
-
- // need this check because the iterate transformation adds itself before
- // transforming the feedback edges
- if (!alreadyTransformed.containsKey(transform)) {
- alreadyTransformed.put(transform, transformedIds);
- }
-
- if (transform.getBufferTimeout() > 0) {
- streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
- }
- if (transform.getUid() != null) {
- streamGraph.setTransformationId(transform.getId(), transform.getUid());
- }
-
- return transformedIds;
- }
对于不同类型的 StreamTransformation
,分别调用对应的转换方法,该方法首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamEdge。以 最典型的 transformOneInputTransform
为例:
- private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
- // 递归对该transform的直接上游transform进行转换,获得直接上游id集合
- Collection<Integer> inputIds = transform(transform.getInput());
-
- // 递归调用可能已经处理过该transform了
- if (alreadyTransformed.containsKey(transform)) {
- return alreadyTransformed.get(transform);
- }
-
- String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
-
- // 添加 StreamNode
- streamGraph.addOperator(transform.getId(),
- slotSharingGroup,
- transform.getOperator(),
- transform.getInputType(),
- transform.getOutputType(),
- transform.getName());
-
- if (transform.getStateKeySelector() != null) {
- TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
- streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
- }
-
- streamGraph.setParallelism(transform.getId(), transform.getParallelism());
-
- // 依次连接到上游节点,创建 StreamEdge
- for (Integer inputId: inputIds) {
- streamGraph.addEdge(inputId, transform.getId(), 0);
- }
-
- return Collections.singleton(transform.getId());
- }
接着看一看 StreamGraph
中添加节点和边的方法,首先是添加节点:
- protected StreamNode addNode(Integer vertexID,
- String slotSharingGroup,
- @Nullable String coLocationGroup,
- Class<? extends AbstractInvokable> vertexClass,
- StreamOperator<?> operatorObject,
- String operatorName) {
-
- if (streamNodes.containsKey(vertexID)) {
- throw new RuntimeException("Duplicate vertexID " + vertexID);
- }
-
- StreamNode vertex = new StreamNode(environment,
- vertexID,
- slotSharingGroup,
- coLocationGroup,
- operatorObject,
- operatorName,
- new ArrayList<OutputSelector<?>>(),
- vertexClass);
-
- //创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
- streamNodes.put(vertexID, vertex);
-
- return vertex;
- }
对于一些不包含物理转换操作的 StreamTransformation
,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。以 PartitionTansformation
为例, PartitionTansformation
是 KeyedStream
对应的转换:
- private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
- StreamTransformation<T> input = partition.getInput();
- List<Integer> resultIds = new ArrayList<>();
-
- //递归地转换上游节点
- Collection<Integer> transformedIds = transform(input);
-
- for (Integer transformedId: transformedIds) {
- int virtualId = StreamTransformation.getNewNodeId();
- //添加虚拟的 Partition 节点
- streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
- resultIds.add(virtualId);
- }
-
- return resultIds;
- }
-
- // StreamGraph.addVirtualPartitionNode()
- public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
-
- if (virtualPartitionNodes.containsKey(virtualId)) {
- throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
- }
-
- //添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
- virtualPartitionNodes.put(virtualId,
- new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
- }
前面提到,在每一个物理节点的转换上,会调用 StreamGraph.addEdge
在输入节点和当前节点之间建立边的连接:
- private void addEdgeInternal(Integer upStreamVertexID,
- Integer downStreamVertexID,
- int typeNumber,
- StreamPartitioner<?> partitioner,
- List<String> outputNames,
- OutputTag outputTag) {
-
- //先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
- //在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
- if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
- ......
- } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
- if (partitioner == null) {
- partitioner = virtualPartitionNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
- } else {
- //两个物理节点
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream and downstream
- // operator matches use forward partitioning, use rebalance otherwise.
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
-
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
-
- //创建 StreamEdge,保留了 StreamPartitioner 等属性
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
-
- //分别将StreamEdge添加到上游节点和下游节点
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
- }
- }
这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。
如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区shuffle。我们会分析该程序是如何生成StreamGraph的。
-
- DataStream<String> text = env.socketTextStream(hostName, port);
- text.flatMap(new LineSplitter())
- .shuffle()
- .filter(new HelloFilter())
- .print();
首先会在env中生成一棵transformation树,用List<StreamTransformation<?>>
保存。其结构图如下:
其中符号*
为input指针,指向上游的transformation,从而形成了一棵transformation树。然后,通过调用StreamGraphGenerator.generate(env, transformations)
来生成StreamGraph。自底向上递归调用每一个transformation,也就是说处理顺序是Source->FlatMap->Shuffle->Filter->Sink。
如上图所示:
virtuaPartitionNodes
中。最后可以通过 UI可视化 来观察得到的 StreamGraph。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。