当前位置:   article > 正文

Flink 1.13 源码解析——Graph的转化以及StreamGraph的构建_flink graph

flink graph

Flink 1.13 源码解析 目录汇总

Flink 1.13 源码解析——Flink 作业提交流程 下

Flink 1.13 源码解析——Flink 作业提交流程 上

Flink 1.13 源码解析——Graph的转化以及JobGraph的构建

Flink 1.13 源码解析——Graph的转化以及ExecutionGraph的构建

目录

前言

一、Graph的重要概念

二、StreamGraph的构建

总结


前言

        Flink中Graph的构建贯穿了整个作业的生命周期,从最初的解析代码中的算子、计算逻辑,到后期的资源申请、资源分配,都有Graph的身影,在接下来几节分析中,我们来看看Flink中StreamGraph的构建,以及StreamGraph到JobGraph的转化,JobGraph到ExecutionGraph的转化。

一、Graph的重要概念

        首先我们来看FLink中Graph的演化过程,Flink1.13相比于Flink1.12的Graph演化,主要在JobGraph到ExecutionGraph处做了优化,这个我们在接下来的内容中细聊,先上图:

         Flink中的Graph概念有四层,分别为StreamGraph、JobGraph、ExecutionGraph和物理执行图。由于物理执行图涉及到Flink资源的分配和调度,这个我们后续再聊,本次主要聊前三层图。其中,StreamGraph和JobGraph是在Client端完成的,或者说是在org.apache.flink.client.cli.CliFrontend类反射执行我们逻辑代码的main方法时完成的,在完成JobGraph的构建后,再将JobGraph以文件形式发送给JobManager的Dispatcher组件,并开始接下来ExecutionGraph的转化工作。

        首先来看StreamGraph,StreamGraph中的每一个顶点都是一个StreamNode,这个StreamNode其实就是一个Operator,连接两个StreamNode的是StreamEdge对象。

        在StreamGraph向JobGraph转化过程中,会对StreamNode进行相应的优化,根据一些条件(看源码的时候将)进行StreamNode的优化合并,合并后就成为了一个JobVertex,而每一个JobVertex就是JobGraph中的端点。JobGraph的输出对象是IntermediateDataSet,存储这JobGraph的输出内容,在JobGraph中,连接上游端点输出和下游端点的边对象叫做JobEdge。

        在JobGraph向ExecutionGraph转化的过程中,主要的工作内容为根据Operator的并行度来拆分JobVertex,每一个JobGraph中的JobVertex对应的ExecutionGraph中的一个ExecutionJonVertex,而每一个JobVertex根据自身并行度会拆分成多个ExecutionVertex。同时会有一个IntermediateResultPartition对象来接收ExecutionVertex的输出。对于同一个ExecutionJobVertex中的多个ExecutionVertex的多个输出IntermediateResultPartition对象组成了一个IntermediateResult对象。但是在Flink1.13版本中,ExecutionGraph不再有ExecutionEdge的概念,取而代之的是ConsumedPartitionGroup和ConsumedVertexGroup。

我们来看看Flink1.12的ExecutionGraph的样子:

 这是Flink1.13的ExecutionGraph的样子:

        在Flink的ExecutionGraph中,有两种分布模式,一对一和多对多,当上下游节点处于多对多模式时,遍历所有edge的时间复杂度为 O(n 平方 ),这意味着随着规模的增加,时间复杂度也会迅速增加。

        在 Flink 1.12 中,ExecutionEdge类用于存储任务之间的连接信息。这意味着对于 all-to-all 分布模式,会有 O(n 平方 )的 ExecutionEdges,这将占用大量内存用于大规模作业。对于两个连接一个 all-to-all 边缘和 10K 并行度的JobVertices,存储 100M ExecutionEdges 将需要超过 4 GiB 的内存。由于生产作业中的顶点之间可能存在多个全对全连接,因此所需的内存量将迅速增加。

        由于同一ExecutionJobVertex中的ExecutionVertex都是由同一个JobVertex根据并行度划分而来,所以承接他们输出的IntermediateResultPartition的结构是相同的,同理,IntermediateResultPartition所连接的下游的ExecutionJobVertex内的所有ExecutionVertex也都是同结构的。因此Flink根据上述条件将ExecutionVertex和IntermediateResultPartiton进行的分组:对于属于同一个ExecutionJobVertex的所有ExecutionVertex构成了一个ConsumerVertexGroup,所有对此ExecutionJobVertex的输入IntermediateResultPartition构成了一个ConsumerPartitionGroup,如下图:

 在调度任务时,Flink需要遍历所有IntermediateResultPartition和所有的ExecutionVertex之间的所有连接,过去由于总共有O(n平方)条边,因此迭代的整体复杂度为O(n平方)。在Flink1.13以后,由于ExecutionEdge被替换为ConsumerPartitionGroup和ConsumedVertexGroup,由于所有同构结果分区都连接到同一个下游ConsumedVertexGroup,当调度器遍历所有连接时,它只需要遍历组一次,计算复杂度从O(n平方)降低到O(n)。

到此,FlinkGraph前三次图的相关重要概念已经介绍完毕,物理执行图的相关内容我们在后续章节中再分析,接下来我们来看代码。

二、StreamGraph的构建

        首先我们回到Flink的样例程序 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java,来看env.execute方法,我们点进来:

  1. public JobExecutionResult execute(String jobName) throws Exception {
  2. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  3. // TODO 获取到StreamGraph,并执行StreamGraph
  4. return execute(getStreamGraph(jobName));
  5. }

可以看到,在这里我们执行StreamGraph,我们继续点进getStreamGraph方法:

  1. @Internal
  2. public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  3. // TODO
  4. StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  5. // TODO 清空所有的算子
  6. // TODO 当StreamGraph生成好之后,之前各种算子转换得到的DataStream就没用了
  7. if (clearTransformations) {
  8. this.transformations.clear();
  9. }
  10. return streamGraph;
  11. }

在这段方法里,构建出了StreamGraph,并且清空了transformations。在构建StreamGraph时先构建了一个StreamGraphGenerator对象,并调用该对象的generate()方法完成了StreamGraph的构建,我们来看generate方法:

  1. public StreamGraph generate() {
  2. // TODO 构建了一个空的StreamGraph对象,目前里面没有StreamNode也没有Edge
  3. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  4. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  5. // TODO 设置StateBackend和Checkpoint
  6. configureStreamGraph(streamGraph);
  7. // TODO 初始化一个容器用来存储已经转换过的Transformation
  8. alreadyTransformed = new HashMap<>();
  9. /*
  10. TODO 在之前做算子转换时已经将各个算子转化为Transformation,并添加到了Transformations集合中
  11. */
  12. for (Transformation<?> transformation : transformations) {
  13. // TODO 遍历所有Transformation,然后转换成StreamNode
  14. transform(transformation);
  15. }
  16. for (StreamNode node : streamGraph.getStreamNodes()) {
  17. if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
  18. for (StreamEdge edge : node.getInEdges()) {
  19. edge.setSupportsUnalignedCheckpoints(false);
  20. }
  21. }
  22. }
  23. final StreamGraph builtStreamGraph = streamGraph;
  24. alreadyTransformed.clear();
  25. alreadyTransformed = null;
  26. streamGraph = null;
  27. return builtStreamGraph;
  28. }

在这段方法里,做了以下工作:

1、构建了一个空的StreamGraph对象,

2、设置StateBackend和Checkpoint

3、初始化一个容器来存储之前已经转换过的Transformation,

4、在之前做算子转换时已经将各个算子转化为Transformation,并添加到了Transformations集合中,这里将Transformation从集合中拿出来,逐一转换成StreamNode。

我们继续来看StreamNode的转换过程,点进transform(transformation)里:

  1. // TODO 对具体的一个transformation进行转换,转换成StreamGraph中的StreamNode和StreamEdge
  2. private Collection<Integer> transform(Transformation<?> transform) {
  3. // TODO 先判断是否已经被transform了
  4. if (alreadyTransformed.containsKey(transform)) {
  5. return alreadyTransformed.get(transform);
  6. }
  7. LOG.debug("Transforming " + transform);
  8. if (transform.getMaxParallelism() <= 0) {
  9. // if the max parallelism hasn't been set, then first use the job wide max parallelism
  10. // from the ExecutionConfig.
  11. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
  12. if (globalMaxParallelismFromConfig > 0) {
  13. transform.setMaxParallelism(globalMaxParallelismFromConfig);
  14. }
  15. }
  16. // call at least once to trigger exceptions about MissingTypeInfo
  17. transform.getOutputType();
  18. // TODO 将transformation和transformationTranslator放入map
  19. // TODO transformationTranslator是用来将transformation转换成StreamNode的
  20. @SuppressWarnings("unchecked")
  21. final TransformationTranslator<?, Transformation<?>> translator =
  22. (TransformationTranslator<?, Transformation<?>>)
  23. translatorMap.get(transform.getClass());
  24. // TODO 根据不同类型的transform,做相应的不同的转换
  25. // TODO 将当前transformation转换成StreamNode和StreamEdge,用于构建StreamGraph
  26. Collection<Integer> transformedIds;
  27. if (translator != null) {
  28. transformedIds = translate(translator, transform);
  29. } else {
  30. transformedIds = legacyTransform(transform);
  31. }
  32. // need this check because the iterate transformation adds itself before
  33. // transforming the feedback edges
  34. if (!alreadyTransformed.containsKey(transform)) {
  35. alreadyTransformed.put(transform, transformedIds);
  36. }
  37. return transformedIds;
  38. }

在这段方法里,构建处理StreamGraph中的StreamNode和StreamGraph,我们来看详细步骤:

1、首先判断拿到的transform是否已经被转换

2、从map里拿出transformation和transformationTranslator,transformationTranslator的作用就是将Transformation转换为StreamNode。

3、接下来就是将Transformation转换为StreamNode和StreamEdge。

我们继续看StreamEdge和StreamNode的构建方法,我们点进translate(translator, transform)方法:

  1. private Collection<Integer> translate(
  2. final TransformationTranslator<?, Transformation<?>> translator,
  3. final Transformation<?> transform) {
  4. checkNotNull(translator);
  5. checkNotNull(transform);
  6. // TODO 获取所有输入
  7. final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
  8. // the recursive call might have already transformed this
  9. if (alreadyTransformed.containsKey(transform)) {
  10. return alreadyTransformed.get(transform);
  11. }
  12. // TODO Slot共享,如果没有设置,就是default
  13. final String slotSharingGroup =
  14. determineSlotSharingGroup(
  15. transform.getSlotSharingGroup(),
  16. allInputIds.stream()
  17. .flatMap(Collection::stream)
  18. .collect(Collectors.toList()));
  19. final TransformationTranslator.Context context =
  20. new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
  21. return shouldExecuteInBatchMode
  22. // TODO 批处理
  23. ? translator.translateForBatch(transform, context)
  24. // TODO 流处理
  25. : translator.translateForStreaming(transform, context);
  26. }

在这段代码里完成了以下工作:

1、获取当前算子转换成的transform的所接收的所有上游输出的transform节点

2、Slot共享的相关设置(后面讲)

3、做了一个执行模式的判断

我们直接进流处理模式,点进translator.translateForStreaming,选择SimpleTransformationTranslator实现:

  1. @Override
  2. public final Collection<Integer> translateForStreaming(
  3. final T transformation, final Context context) {
  4. checkNotNull(transformation);
  5. checkNotNull(context);
  6. // TODO 这个地方可以是任意类型的算子transformation
  7. // TODO Source类型算子作为StreamGraph的顶点,在进行StreamNode转换时是无法得到下游算子信息的,
  8. // 所以Source类型算子在转换StreamNode的过程中不会构建StreamEdge
  9. final Collection<Integer> transformedIds =
  10. translateForStreamingInternal(transformation, context);
  11. configure(transformation, context);
  12. return transformedIds;
  13. }

由于当前的转换只针对当前的算子节点,此处是无法得到下游算子的信息,所以在这里不会进行StreamEdge 的构建,我们点进translateForStreamingInternal方法,此处我们选哪个算子类型都行,我们此处以OneInputTransformationTranslator举例,我们点进来:

  1. @Override
  2. public Collection<Integer> translateForStreamingInternal(
  3. final OneInputTransformation<IN, OUT> transformation, final Context context) {
  4. // TODO
  5. return translateInternal(
  6. transformation,
  7. transformation.getOperatorFactory(),
  8. transformation.getInputType(),
  9. transformation.getStateKeySelector(),
  10. transformation.getStateKeyType(),
  11. context);
  12. }

再进入translateInternal方法

  1. protected Collection<Integer> translateInternal(
  2. final Transformation<OUT> transformation,
  3. final StreamOperatorFactory<OUT> operatorFactory,
  4. final TypeInformation<IN> inputType,
  5. @Nullable final KeySelector<IN, ?> stateKeySelector,
  6. @Nullable final TypeInformation<?> stateKeyType,
  7. final Context context) {
  8. checkNotNull(transformation);
  9. checkNotNull(operatorFactory);
  10. checkNotNull(inputType);
  11. checkNotNull(context);
  12. final StreamGraph streamGraph = context.getStreamGraph();
  13. final String slotSharingGroup = context.getSlotSharingGroup();
  14. final int transformationId = transformation.getId();
  15. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  16. // TODO 添加一个Operator(StreamGraph端会添加一个StreamNode)
  17. streamGraph.addOperator(
  18. transformationId,
  19. slotSharingGroup,
  20. transformation.getCoLocationGroupKey(),
  21. operatorFactory,
  22. inputType,
  23. transformation.getOutputType(),
  24. transformation.getName());
  25. if (stateKeySelector != null) {
  26. TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
  27. streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
  28. }
  29. int parallelism =
  30. transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
  31. ? transformation.getParallelism()
  32. : executionConfig.getParallelism();
  33. streamGraph.setParallelism(transformationId, parallelism);
  34. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  35. // TODO 获取所有输入
  36. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  37. checkState(
  38. parentTransformations.size() == 1,
  39. "Expected exactly one input transformation but found "
  40. + parentTransformations.size());
  41. // TODO 设置当前StreamNode和上游所有StreamNode之间的StreamEdge
  42. for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
  43. // TODO 设置StreamGraph的边
  44. // TODO transformationId 为当前顶点ID
  45. // TODO inputId 为上游顶点ID
  46. streamGraph.addEdge(inputId, transformationId, 0);
  47. }
  48. return Collections.singleton(transformationId);
  49. }

可以看到此处:

1、先调用streamGraph.addOperator将当前这个transform转为StreamNode并添加到StreamGraph内,

2、然后获取当前transform的所有上游输出节点的id,通过streamGraph.addEdge来构建StreamEdge,并将StreamEdge添加入StreamGraph中。

我们首先来看StreamNode的构建和添加过程,我们点进streamGraph.addOperator方法:

  1. public <IN, OUT> void addOperator(
  2. Integer vertexID,
  3. @Nullable String slotSharingGroup,
  4. @Nullable String coLocationGroup,
  5. StreamOperatorFactory<OUT> operatorFactory,
  6. TypeInformation<IN> inTypeInfo,
  7. TypeInformation<OUT> outTypeInfo,
  8. String operatorName) {
  9. // TODO 此时会选择当前 invokableClass类型
  10. Class<? extends AbstractInvokable> invokableClass =
  11. operatorFactory.isStreamSource()
  12. ? SourceStreamTask.class
  13. : OneInputStreamTask.class;
  14. // TODO
  15. addOperator(
  16. vertexID,
  17. slotSharingGroup,
  18. coLocationGroup,
  19. operatorFactory,
  20. inTypeInfo,
  21. outTypeInfo,
  22. operatorName,
  23. invokableClass);
  24. }

我们在进入addOperator:

  1. private <IN, OUT> void addOperator(
  2. Integer vertexID,
  3. @Nullable String slotSharingGroup,
  4. @Nullable String coLocationGroup,
  5. StreamOperatorFactory<OUT> operatorFactory,
  6. TypeInformation<IN> inTypeInfo,
  7. TypeInformation<OUT> outTypeInfo,
  8. String operatorName,
  9. Class<? extends AbstractInvokable> invokableClass) {
  10. // TODO 一个StreamOperator对应一个StreamNode
  11. addNode(
  12. vertexID,
  13. slotSharingGroup,
  14. coLocationGroup,
  15. invokableClass,
  16. operatorFactory,
  17. operatorName);
  18. setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
  19. if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
  20. // sets the output type which must be know at StreamGraph creation time
  21. operatorFactory.setOutputType(outTypeInfo, executionConfig);
  22. }
  23. if (operatorFactory.isInputTypeConfigurable()) {
  24. operatorFactory.setInputType(inTypeInfo, executionConfig);
  25. }
  26. if (LOG.isDebugEnabled()) {
  27. LOG.debug("Vertex: {}", vertexID);
  28. }
  29. }

再点入addNode方法:

  1. protected StreamNode addNode(
  2. Integer vertexID,
  3. @Nullable String slotSharingGroup,
  4. @Nullable String coLocationGroup,
  5. Class<? extends AbstractInvokable> vertexClass,
  6. StreamOperatorFactory<?> operatorFactory,
  7. String operatorName) {
  8. if (streamNodes.containsKey(vertexID)) {
  9. throw new RuntimeException("Duplicate vertexID " + vertexID);
  10. }
  11. // TODO 对于每一个StreamOperator,初始化了一个StreamNode
  12. StreamNode vertex =
  13. new StreamNode(
  14. vertexID,
  15. slotSharingGroup,
  16. coLocationGroup,
  17. operatorFactory,
  18. operatorName,
  19. vertexClass);
  20. // TODO 将该StreamNode加入到StreamGraph中
  21. // TODO 编写算子处理逻辑(UserFunction) ==> StreamOperator ==> Transformation ==> StreamNode
  22. // TODO 构建StreamNode的时候,会多做一件事,指定InvokableClass
  23. // TODO 判断是否是Source算子,如果是则InvokableClass = SourceStreamTask,如果不是则为OneInputStreamTask或Two...等等
  24. streamNodes.put(vertexID, vertex);
  25. return vertex;
  26. }

        到这里,开始真正构建StreamNode,每一个StreamOperator对应一个StreamNode。在完成StreamNode的构建之后,会将StreamNode加入到StreamGraph之中。结合前面章节所分析的,可以看出StreamNode的构建流程为:

(UserFunction) ==> StreamOperator ==> Transformation ==> StreamNode

        在构建StreamNode的过程中,会指定InvokableClass。此时会判断当前transform是否为Source算子,如果是则

  • InvokableClass = SourceStreamTask,
  • 如果不是则InvokableClass = OneInputStreamTask或其他。

        到此StreamNode就构建完成了,我们继续看StreamEdge的构建,我们回到streamGraph.addEdge方法:

  1. public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
  2. // TODO
  3. addEdgeInternal(
  4. upStreamVertexID,
  5. downStreamVertexID,
  6. typeNumber,
  7. null,
  8. new ArrayList<String>(),
  9. null,
  10. null);
  11. }

再点进addEdgeInternal方法:

  1. private void addEdgeInternal(
  2. Integer upStreamVertexID,
  3. Integer downStreamVertexID,
  4. int typeNumber,
  5. StreamPartitioner<?> partitioner,
  6. List<String> outputNames,
  7. OutputTag outputTag,
  8. ShuffleMode shuffleMode) {
  9. if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
  10. int virtualId = upStreamVertexID;
  11. upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
  12. if (outputTag == null) {
  13. outputTag = virtualSideOutputNodes.get(virtualId).f1;
  14. }
  15. addEdgeInternal(
  16. upStreamVertexID,
  17. downStreamVertexID,
  18. typeNumber,
  19. partitioner,
  20. null,
  21. outputTag,
  22. shuffleMode);
  23. } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
  24. int virtualId = upStreamVertexID;
  25. upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
  26. if (partitioner == null) {
  27. partitioner = virtualPartitionNodes.get(virtualId).f1;
  28. }
  29. shuffleMode = virtualPartitionNodes.get(virtualId).f2;
  30. addEdgeInternal(
  31. upStreamVertexID,
  32. downStreamVertexID,
  33. typeNumber,
  34. partitioner,
  35. outputNames,
  36. outputTag,
  37. shuffleMode);
  38. } else {
  39. // TODO
  40. createActualEdge(
  41. upStreamVertexID,
  42. downStreamVertexID,
  43. typeNumber,
  44. partitioner,
  45. outputTag,
  46. shuffleMode);
  47. }
  48. }

上面进行了一些判断,我们直接来看StreamEdge的新建过程,点进createActualEdge方法:

  1. private void createActualEdge(
  2. Integer upStreamVertexID,
  3. Integer downStreamVertexID,
  4. int typeNumber,
  5. StreamPartitioner<?> partitioner,
  6. OutputTag outputTag,
  7. ShuffleMode shuffleMode) {
  8. // TODO 通过上游顶点拿到上游StreamNodeId
  9. StreamNode upstreamNode = getStreamNode(upStreamVertexID);
  10. // TODO 其实就是当前顶点的StreamNodeId,对StreamEdge来说,该StreamNode为这条边的下游
  11. StreamNode downstreamNode = getStreamNode(downStreamVertexID);
  12. // If no partitioner was specified and the parallelism of upstream and downstream
  13. // operator matches use forward partitioning, use rebalance otherwise.
  14. /* TODO 如果没有设置partitioner
  15. 1.如果上游StreamNode和下游StreamNode并行度一样,则使用ForwardPartitioner数据分发策略
  16. 2.如果上游StreamNode和下游StreamNode并行度不一样,则使用RebalancePartitioner数据分发策略
  17. */
  18. if (partitioner == null
  19. && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  20. partitioner = new ForwardPartitioner<Object>();
  21. } else if (partitioner == null) {
  22. partitioner = new RebalancePartitioner<Object>();
  23. }
  24. if (partitioner instanceof ForwardPartitioner) {
  25. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  26. throw new UnsupportedOperationException(
  27. "Forward partitioning does not allow "
  28. + "change of parallelism. Upstream operation: "
  29. + upstreamNode
  30. + " parallelism: "
  31. + upstreamNode.getParallelism()
  32. + ", downstream operation: "
  33. + downstreamNode
  34. + " parallelism: "
  35. + downstreamNode.getParallelism()
  36. + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  37. }
  38. }
  39. if (shuffleMode == null) {
  40. shuffleMode = ShuffleMode.UNDEFINED;
  41. }
  42. /**
  43. * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of
  44. * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be
  45. * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link
  46. * StreamEdge}.
  47. */
  48. int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
  49. // TODO 构建StreamEdge对象
  50. StreamEdge edge =
  51. new StreamEdge(
  52. upstreamNode,
  53. downstreamNode,
  54. typeNumber,
  55. partitioner,
  56. outputTag,
  57. shuffleMode,
  58. uniqueId);
  59. // TODO 将当前的StreamEdge对象设置为上游StreamNode的输出边
  60. getStreamNode(edge.getSourceId()).addOutEdge(edge);
  61. // TODO 将当前的StreamEdge对象设置为下游StreamNode的输入边
  62. getStreamNode(edge.getTargetId()).addInEdge(edge);
  63. }

在这个方法里,首先会去拿上游StreamNode的Id,然后去拿下游StreamNode的Id。然后会判断一下并行度的设置:

1.如果上游StreamNode和下游StreamNode并行度一样,则使用ForwardPartitioner数据分发策略
2.如果上游StreamNode和下游StreamNode并行度不一样,则使用RebalancePartitioner数据分发策略

然后new StreamEdge来构建StreamEdge,然后将当前的StreamEdge与上下游StreamNode连接起来,当期StreamEdge为上游StreamNode的输出边,为下游StreamNode的输入边。

 到这里,StreamGraph的构建就已经完成

总结

        在上面的过程中,首先根据用户调用的算子,生成StreamOperator,然后将StreamOperator转化为Transformation,最后再将Transformation转化为StreamNode,在StreamNode构建完成之后先将StreamNode放入StreamGraph对象,再根据StreamNode的类型以及上下游StreamNode的关系开始构建StreamEdge,构建完成后使用StreamEdge将上下游有输出输入关系的StreamNode连接起来,在所有的StreamEdge连接完成后,StreamGraph就构建完成了。

        在下一章我们来分析StreamGraph到JobGraph的转化以及JobGraph向ExecutionGraph的转化。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号