当前位置:   article > 正文

Flink1.15源码阅读——StreamGraph流图_flink streamgraph

flink streamgraph

Graph的概念

Flink中的执行图可以分成四层:

StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
  • 1

在这里插入图片描述

StreamGraph

是根据用户代码通过Stream API编写的代码生成的最初的图。用来表示程序的拓扑结构。
在客户端生成流图
在这里插入图片描述

  • StreamNode: 用来代表operator的类,并具有所有相关的属性,如并发度、入边、出边等。
  • StreamEdge: 表示连接两个StreamNode的边。

JobGraph

StreamGraph 经过优化后生成了JobGraph,提交给Jobmanager的数据结构,主要的优化为,将多个符合条件的节点 chain在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

在客户端生成作业图
在这里插入图片描述

  • JobVertex: 经过优化后符合条件的多个StreamNode可能会chain在一起生成一个Vertex,即一个JobVertex包含一个或者多个Operator, JobVertex 的输入是 JobEdge. 输出是 IntermediateDataSet。
  • JobEdge: 表示 JobGraph 中的一 个数据流转通道, 其上游数据源是 IntermediateDataSet ,下游消费者是 JobVertex 。即数据通过 JobEdge 由IntermediateDataSet 传递给目标 JobVertex; JobEdge 中的数据分发模式,会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。
  • IntermediateDataSet: 中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即经过算子处理产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决 定了在执行时刻数据交换的模式。

ExecutionGraph

JobManager根据JobGraph生成的。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
从jobgraph生成ExecutionGraph是在JobManager中完成的。
在这里插入图片描述

  • ExecutionJobVertex
    该对象和 JobGraph 中的 JobVertex 一 一对应。该对象还包含一组 ExecutionVertex, 数量 与该 JobVertex 中所包含的StreamNode 的并行度一致,假设 StreamNode 的并行度为5 ,那么ExecutionJobVertex中也会包含 5个ExecutionVertex。
    ExecutionJobVertex用来将一个JobVertex 封装成 ExecutionJobVertex,并依次创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富ExecutionGraph。

  • ExecutionVertex
    ExecutionJobVertex会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。

  • IntermediateResult
    IntermediateResult 又叫作中间结果集,该对象是个逻辑概念 表示 ExecutionJobVertex输出,和 JobGraph 中的IntermediateDalaSet 一 一对应,同样 一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)。

  • IntermediateResultPartition
    IntermediateResultPartition 又叫作中间结果分区。表示一个 ExecutionVertex分区的输出结果,与 Execution Edge 相关联。

  • ExecutionEdge
    表示ExecutionVertex 的输入,连接到上游产生的IntermediateResultPartition,一个Execution对应唯一的一个IntermediateResultPartition 和一个ExecutionVertex,一个ExecutionVertex 可以有多个ExecutionEdge。

  • Execution
    ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。
    当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。
    JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。

物理执行图

JobManager根据Execution对job进行调度后,在各个TaskManager上部署Task后形成的图,并不是一个具体的数据结构。
在这里插入图片描述

  • Task: Execution被调度后在分配的TaskManager中启动对应的Task。Task包裹了具有用户执行逻辑的operator。其实Task实际上就是代码执行片段。
  • ResultPartition:代表由一个Task生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
  • ResultSubPartition:是ResultPartition的一个子分区。每一个ResultPartition包含多个ResultSubPartition,其数目要由下游消费Task数和DistributionPattern来决定。
  • InputGate:代表Task的输入封装,和JobGraph中的JobEdge一一对应,每个InputGate消费了一个或多个的ResultPartition。
  • InputChannel: 每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubPartition一对一地相连,即一个InputChannel接收一个ResultSubParition的输出。

StreamGraph源码阅读

在per job模式下,该图是在客户端生成的,找到StreamExecutionEnvironment.java,找到execute方法,查看如下源码

execute()

public JobExecutionResult execute() throws Exception {
        // 获取流图 并将流图传入execute
        return execute(getStreamGraph());
    }
  • 1
  • 2
  • 3
  • 4

getStreamGraph()

流作业的{@link StreamGraph}的Getter方法。
传入的true表示调用清除以前注册的{@link Transformation转换}。
返回结果表示转换的流图

@Internal
    public StreamGraph getStreamGraph() {
        return getStreamGraph(true);
    }
  • 1
  • 2
  • 3
  • 4
getStreamGraph(true)

流作业的{@link StreamGraph}的Getter方法,带有清除以前注册的{@link Transformation转换}的选项。例如,清除转换允许在多次调用{@link #execute()}时不重新执行相同的操作。

@Internal
    public StreamGraph getStreamGraph(boolean clearTransformations) {
        final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
        if (clearTransformations) {
            transformations.clear();
        }
        return streamGraph;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
transformations

上面传入的 transformations是怎么来的呢?
在StreamExecutionEnvironment.java类中,看到了这个全局变量,并且protected final

protected final List<Transformation<?>> transformations = new ArrayList<>();
  • 1

点击变量名transformations,分别找到2276行、2353行和2668行(2668行是一个获取transformations的方法,暂时不介绍)
在这里插入图片描述

  • 2276行: 是调用了clearTransformations标识,如果为true就会把transformations集合清空;
  • 2353行:调用了一个addOperator方法
@Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

那么现在咱们只知道添加到这个transformations集合体了,但是源头是放进去的,谁调用了这个方法呢?
该方法是一个内部方法,并不意味着用户可以使用。创建operator的API方法必须调用此方法。
在这里插入图片描述
上面图中圈住都是调用了addOperator这个方法的,其它要么是抽象类,那么是测试类,直接跳过。
上面圈中的类中,可以挨个点进去看下,方法体里面都执行了addOperator(transformation)这个方法,其实这个正好和咱们用户代码对应上,transformations也是依次被添加的。

当然上面还有剩余的类没有查看,如果有时间你们可以自己查看下。
上面介绍了trasnformation到底是从哪里来的,后面会讲每个种类的算子,到那个时候,会讲到transformation是怎么生成的,算子分哪些类等等问题。

getStreamGraphGenerator(transformations).generate()

分两个方法 第一是:getStreamGraphGenerator(transformations); 第二是generate()。我们一个一个来看下是怎么回事。

getStreamGraphGenerator(transformations)

源码如下:

private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
		// 这个判断对于新手而已是经常遇到的问题,明明提交了jar,但是为什么会报这个没有算子呢?一般都是打包的时候出的问题
        if (transformations.size() <= 0) {
            throw new IllegalStateException(
                    "No operators defined in streaming topology. Cannot execute.");
        }

        // We copy the transformation so that newly added transformations cannot intervene with the
        // stream graph generation.
        return new StreamGraphGenerator(
                new ArrayList<>(transformations), config, checkpointCfg, configuration)
                .setStateBackend(defaultStateBackend)
                .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
                .setSavepointDir(defaultSavepointDirectory)
                .setChaining(isChainingEnabled)
                .setUserArtifacts(cacheFile)
                .setTimeCharacteristic(timeCharacteristic)
                .setDefaultBufferTimeout(bufferTimeout)
                .setSlotSharingGroupResource(slotSharingGroupResources);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

上面new StreamGraphGenerator复制转换,以便新添加的转换不能干预流图的生成。
下面我们接着查看generate()方法。

generate()
public StreamGraph generate() {
// 初始化流图对象
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(streamGraph);
// 初始化唯一身份hashMap,这个transform作为key
        alreadyTransformed = new IdentityHashMap<>();

// 用户代码里面的算子依次被添加到transforms集合中
        for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }

        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                for (StreamEdge edge : node.getInEdges()) {
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
transformations是怎么来的?
for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }
  • 1
  • 2
  • 3

代码中使用到Map算子,源码如下:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

        TypeInformation<R> outType =
                TypeExtractor.getMapReturnTypes(
                        clean(mapper), getType(), Utils.getCallLocationName(), true);

        return map(mapper, outType);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

又调用了一层map

public <R> SingleOutputStreamOperator<R> map(
            MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }
  • 1
  • 2
  • 3
  • 4

这个时候调用transform方法 并且传入了三个参数,第一个是operatorName,map算子的名字默认是Map,当然也可以自定义,在用户代码里调用name()方法就行;第二个参数是输出的类型;第三个参数是用户代码传入的匿名函数

@PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperator<T, R> operator) {

        return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

上面又调用了doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));

protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 读取输入Transform的输出类型,找出关于MissingTypeInfo的错误
        transformation.getOutputType();

		// 新的transformation会连接上当前的DataStream中的transformation,从而构建成一棵树
        OneInputTransformation<T, R> resultTransform =
                new OneInputTransformation<>(
                        this.transformation,
                        operatorName,
                        operatorFactory,
                        outTypeInfo,
                        environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream =
                new SingleOutputStreamOperator(environment, resultTransform);

		// 调用addOperator
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

上面介绍了 transformations 集合中的元素是怎么来的。

接下来回到generate()方法中,

// transforms是一个list, 依次存放了 用户代码里的 算子
for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }
  • 1
  • 2
  • 3
  • 4

transform(transformation)

    private Collection<Integer> transform(Transformation<?> transform) {
        // 判断,如果包含
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        transform
                .getSlotSharingGroup()
                .ifPresent(
                        slotSharingGroup -> {
                            final ResourceSpec resourceSpec =
                                    SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
                            if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
                                slotSharingGroupResources.compute(
                                        slotSharingGroup.getName(),
                                        (name, profile) -> {
                                            if (profile == null) {
                                                return ResourceProfile.fromResourceSpec(
                                                        resourceSpec, MemorySize.ZERO);
                                            } else if (!ResourceProfile.fromResourceSpec(
                                                            resourceSpec, MemorySize.ZERO)
                                                    .equals(profile)) {
                                                throw new IllegalArgumentException(
                                                        "The slot sharing group "
                                                                + slotSharingGroup.getName()
                                                                + " has been configured with two different resource spec.");
                                            } else {
                                                return profile;
                                            }
                                        });
                            }
                        });

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        @SuppressWarnings("unchecked")
        final TransformationTranslator<?, Transformation<?>> translator =
                (TransformationTranslator<?, Transformation<?>>)
                        translatorMap.get(transform.getClass());

        Collection<Integer> transformedIds;
        // 主要逻辑
        if (translator != null) {
            transformedIds = translate(translator, transform);
        } else {
            transformedIds = legacyTransform(transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        return transformedIds;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

transformedIds = translate(translator, transform)

// 跳到最后一行
private Collection<Integer> translate(
            final TransformationTranslator<?, Transformation<?>> translator,
            final Transformation<?> transform) {
        checkNotNull(translator);
        checkNotNull(transform);

        final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        final String slotSharingGroup =
                determineSlotSharingGroup(
                        transform.getSlotSharingGroup().isPresent()
                                ? transform.getSlotSharingGroup().get().getName()
                                : null,
                        allInputIds.stream()
                                .flatMap(Collection::stream)
                                .collect(Collectors.toList()));

        final TransformationTranslator.Context context =
                new ContextImpl(this, streamGraph, slotSharingGroup, configuration);

// 选择流模式
        return shouldExecuteInBatchMode
                ? translator.translateForBatch(transform, context)
                : translator.translateForStreaming(transform, context);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

进入到SimpleTransformationTranslator.java类中
在这里插入图片描述

@Override
    public final Collection<Integer> translateForStreaming(
            final T transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final Collection<Integer> transformedIds =
                translateForStreamingInternal(transformation, context);
        configure(transformation, context);

        return transformedIds;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

真的是套娃啊,套的都懵了,笔者只要不专注就得重新来, 不过到了这种超企业级得架构就得把各种模式运用得淋漓尽致,也庆幸看到了flink得源码 ,真的老代码还在持续得更新,有很多框架只要没有逻辑和性能方面得问题,都不会再动改之前得代码,flink在这方面很难得。

protected abstract Collection<Integer> translateForStreamingInternal(
            final T transformation, final Context context);

  • 1
  • 2
  • 3

这是一个接口,使用快捷键 ctrl+H可以看到好多实现类
在这里插入图片描述
这些实现类到底是什么呢?
由于算子类型不一样,比如map,flatmap这些和keyby,union,connect本质上对数据得操作是有区别得,所以根据使用算子得类型差异使用对应得转换器。

基于map等类型算子,我们先看OnInputTransformationTranslator.java类中,有两个重写方法,translateForBatchInternal和translateForStreamingInternal,前者很明显,是关于批得,我还是看后者吧。

@Override
    public Collection<Integer> translateForStreamingInternal(
            final OneInputTransformation<IN, OUT> transformation, final Context context) {
        return translateInternal(
                transformation,
                transformation.getOperatorFactory(),
                transformation.getInputType(),
                transformation.getStateKeySelector(),
                transformation.getStateKeyType(),
                context);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

继续套娃

protected Collection<Integer> translateInternal(
            final Transformation<OUT> transformation,
            final StreamOperatorFactory<OUT> operatorFactory,
            final TypeInformation<IN> inputType,
            @Nullable final KeySelector<IN, ?> stateKeySelector,
            @Nullable final TypeInformation<?> stateKeyType,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(operatorFactory);
        checkNotNull(inputType);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// 添加StreamNode 即顶点,点到这个方法里面可以到 顶点ID
        streamGraph.addOperator(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                operatorFactory,
                inputType,
                transformation.getOutputType(),
                transformation.getName());

        if (stateKeySelector != null) {
            TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
        }

        int parallelism =
                transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                        ? transformation.getParallelism()
                        : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());

        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found "
                        + parentTransformations.size());
// 这里添加边Edge,这里根据
        for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
            streamGraph.addEdge(inputId, transformationId, 0);
        }

        return Collections.singleton(transformationId);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

从addOperator()可以看到又调用了addNode()

private <IN, OUT> void addOperator(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo,
            String operatorName,
            Class<? extends TaskInvokable> invokableClass) {

// 此处调用了addNode
        addNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                invokableClass,
                operatorFactory,
                operatorName);
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));

        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            // sets the output type which must be know at StreamGraph creation time
            operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }

        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, executionConfig);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", vertexID);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在看下方法streamGraph.addEdge(inputId, transformationId, 0);

private void addEdgeInternal(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode) {

        // 当上游是侧输出时,递归调用,并传入侧输出信息
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    null,
                    outputTag,
                    exchangeMode);
        }
        //  当上游是partition时,递归调用,并传入partitioner信息
        else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            exchangeMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputNames,
                    outputTag,
                    exchangeMode);
        } else {
            // 真正构建StreamEdge
            createActualEdge(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputTag,
                    exchangeMode);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

真的是在套娃啊,,一层一层的套,而且还递归,然后看到createActualEdge方法,才真是创建边

private void createActualEdge(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode) {
        // 上游节点
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        // 下游节点
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        // If no partitioner was specified and the parallelism of upstream and downstream
        // operator matches use forward partitioning, use rebalance otherwise.
        // 如果没有指定分区器,会为其选择forward 或 rebalance
        if (partitioner == null
                && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner =
                    executionConfig.isDynamicGraph()
                            ? new ForwardForUnspecifiedPartitioner<>()
                            : new ForwardPartitioner<>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }

        if (partitioner instanceof ForwardPartitioner) {
            if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                throw new UnsupportedOperationException(
                        "Forward partitioning does not allow "
                                + "change of parallelism. Upstream operation: "
                                + upstreamNode
                                + " parallelism: "
                                + upstreamNode.getParallelism()
                                + ", downstream operation: "
                                + downstreamNode
                                + " parallelism: "
                                + downstreamNode.getParallelism()
                                + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
        }

        if (exchangeMode == null) {
            exchangeMode = StreamExchangeMode.UNDEFINED;
        }

        /**
         * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of
         * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be
         * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link
         * StreamEdge}.
         */
        int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();

// 此处是创建边
        StreamEdge edge =
                new StreamEdge(
                        upstreamNode,
                        downstreamNode,
                        typeNumber,
                        partitioner,
                        outputTag,
                        exchangeMode,
                        uniqueId);

// 此处是将前后顶点通过边连在了一起
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/794383
推荐阅读
相关标签
  

闽ICP备14008679号