当前位置:   article > 正文

深入理解Flink StreamGraph构建流程_flink steamgraph

flink steamgraph


StreamGraph和JobGraph的构建是在Client完成的,准确的说是Client包下的CliFrontend类反射执行我们逻辑代码的main()方法时完成的。

在开发Flink任务时,在逻辑代码的最后要调用以下代码保证逻辑执行:

// 创建流式作业的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 执行
env.execute();
  • 1
  • 2
  • 3
  • 4
  • 5

调用StreamExecutionEnvironment#execute()这一步实际上就是要利用“Transformation树”去生成StreamGraph(“藏”虚拟,“显”物理),只有物理Transformation才会被添加到Transformation树。

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    // 基于StreamGraphGenerator(StreamGraph的生成器),生成StreamGraph,并清空Transformations集合。
    // 将StreamGraph作为参数传入
    return execute(getStreamGraph(jobName));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

StreamGraph是利用StreamGraphGenerator生成的,以下过程会解析Transformation树、生成StreamNode和StreamEdge并建立连接关系,从而形成StreamGraph。

/**
 * 基于StreamGraphGenerator生成StreamGraph,完成后会清空List<Transformation>集合。
 * 核心:从SinkTransformation向前追溯到SourceTransformation,一边遍历、一边构建StreamGraph
 */
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
    // 使用StreamGraphGenerator(StreamGraph的生成器)生成StreamGraph:包括利用Transformation生成对应的StreamNode和StreamEdge
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
    // 传入的参数为true,清空Transformation树
    if (clearTransformations) {
        this.transformations.clear();
    }
    // 返回生成的StreamGraph
    return streamGraph;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

遍历Transformation树

StreamGraphGenerator创建StreamGraph的主要逻辑是:遍历Transformation树,逐个解析Transformation生成物理StreamNode和StreamEdge,以此来形成StreamGraph结构。最后把转换过程中产生的数据清理掉。

/**
 * 创建StreamGraph的主要逻辑
 * 核心:让StreamOperator作为StreamNode,并根据Transformation之间的关联关系创造StreamEdge。
 * 		然后给StreamEdge指定上下游连接的2个StreamNode,让它作为上游StreamNode的“输出边”、下游StreamNode的“输入边”
 */
public StreamGraph generate() {
    // 构建一个空的StreamGraph对象(目前这个StreamGraph里既没有StreamNode,也没有StreamEdge)
    streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
    // 设定StreamGraph的相关参数
    streamGraph.setStateBackend(stateBackend);
    // 设置是否链化:默认true
    streamGraph.setChaining(chaining);
    streamGraph.setScheduleMode(scheduleMode);
    streamGraph.setUserArtifacts(userArtifacts);
    streamGraph.setTimeCharacteristic(timeCharacteristic);
    streamGraph.setJobName(jobName);
    streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

    // 初始化一个HashMap存储“已经转换过的Transformation”:转换过程涉及递归调用,这是为了避免重复转换
    alreadyTransformed = new HashMap<>();

    // 解析物理Transformation:遍历List<Transformation>集合(也就是Transformation树),生成对应的StreamGraph。
    for (Transformation<?> transformation: transformations) {
        // 利用Transformation逐个创建出StreamGraph中的StreamNode和StreamEdge(Transformation类型不同,解析逻辑也不同)
        transform(transformation);
    }

    final StreamGraph builtStreamGraph = streamGraph;

    // 清理转换过程中产生的数据
    alreadyTransformed.clear();
    alreadyTransformed = null;
    streamGraph = null;

    // 返回StreamGraph
    return builtStreamGraph;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

物理Transformation经过解析会生成StreamNode(物理节点)和StreamEdge,而虚拟Transformation只会添加一个虚拟节点。整个Transformation解析过程从Transformation树开始入手,因为Transformation之间有连接关系,因此虚拟Transformation也会被解析。

首先就是要遍历Transformation树,递归解析每个Transformation:

/**
 * 遍历Transformation树,对每个“物理Transformation”进行解析,当然也会一并解析虚拟Transformation。据此生成对应的StreamNode和StreamEdge。
 * 有了StreamNode和StreamEdge,就逐渐形成了DAG
 */
private Collection<Integer> transform(Transformation<?> transform) {

    /**如果Map包含当前Transformation,说明它已经转换过了,就直接return了,避免递归调用时产生“重复解析”*/
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    LOG.debug("Transforming " + transform);

    // 如果某个StreamOperator的MaxParallelism设为了负数或未设置,这里会进行纠正:当前StreamOperator的并行度 = 整个Job的最大并行度
    if (transform.getMaxParallelism() <= 0) {

        // 如果当前Transformation没有设置MaxParallelism,那就获取整个Job的MaxParallelism,作为本次Transformation的最大并行度
        int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
        if (globalMaxParallelismFromConfig > 0) {
            // 默认:当前算子转换的并行度为当前Job的最大并行度
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
        }
    }

    /**获取当前Transformation的输出类型,保证当前Transformation的输出正常*/
    transform.getOutputType();

    /**
	 * 因为在整个递归转换过程中,会将当前Transformation的全部上游(且已转换完毕的)Transformation的唯一ID return。
	 * 像union操作这种Transformation的解析,会将它上游所有已完成解析的Transformation的唯一ID以集合形式返回,
	 * 因此这里需要使用Collection集合接收
	 */
    Collection<Integer> transformedIds;
    /**
	 * 对不同类型的Transformation,执行不同的转换/解析操作:
     * 		(递归调用)对上游的Transformation进行转换,确保当前Transformation的所有上游的Transformation都已经转换完毕;
	 * 		转换完成一个物理Transformation,就对应生成一个StreamNode,并用StreamEdge将StreamNode连接起来;
	 */
    if (transform instanceof OneInputTransformation<?, ?>) {
        // 对具体的转换类型进行解析,如map、filter等
        transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
    } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
        transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
    } else if (transform instanceof SourceTransformation<?>) {
        transformedIds = transformSource((SourceTransformation<?>) transform);
    } else if (transform instanceof SinkTransformation<?>) {
        transformedIds = transformSink((SinkTransformation<?>) transform);
    } else if (transform instanceof UnionTransformation<?>) {
        /**解析union操作对应的Transformation,会将union操作的所有(已完成解析的)上游Transformation的唯一ID以集合的形式返回*/
        transformedIds = transformUnion((UnionTransformation<?>) transform);
    } else if (transform instanceof SplitTransformation<?>) {
        transformedIds = transformSplit((SplitTransformation<?>) transform);
    } else if (transform instanceof SelectTransformation<?>) {
        transformedIds = transformSelect((SelectTransformation<?>) transform);
    } else if (transform instanceof FeedbackTransformation<?>) {
        transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
    } else if (transform instanceof CoFeedbackTransformation<?>) {
        transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
    } else if (transform instanceof PartitionTransformation<?>) {
        // 虚拟Transformation处理(添加虚拟节点):不涉及数据处理,仅仅是描述上下游数据之间的传输关系
        transformedIds = transformPartition((PartitionTransformation<?>) transform);
    } else if (transform instanceof SideOutputTransformation<?>) {
        transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
    } else {
        throw new IllegalStateException("Unknown transformation: " + transform);
    }

    /**将刚刚转换好的Transformation按照“Transformation:Transformation ID集合”的映射关系,存储到Map集合中*/
    if (!alreadyTransformed.containsKey(transform)) {
        alreadyTransformed.put(transform, transformedIds);
    }

    // 省略部分代码...

    return transformedIds;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

对于“Transformation树”的解析过程涉及到了递归调用,目的就是为了确保当前Transformation的所有的上游Transformation全部解析完毕,这样才能进一步构建StreamNode和StreamEdge。为了避免因递归而造成的重复解析,解析完成的Transformation会保存到Map集合中。如果哪个Transformation已经解析过了,那就直接return,避免重复解析。

解析“虚拟Transformation”

“虚拟Transformation”由非物理转换操作而来,如shuffle、union等,解析虚拟Transformation,并不会对应生成StreamNode,而是会生成对应数量的“虚拟节点”(因为这个虚拟Transformation可能会有N个直接上游物理Transformation)。

可以看出,解析虚拟Transformation的过程中同样也涉及到了递归调用,就是为了保证它的所有上游Transformation全都解析完毕。解析完毕后会得到它上游Transformation对应的唯一ID,用来生成对应数量的虚拟节点。

/**
 * 虚拟Transformation处理:不会转换出StreamNode,而是添加为虚拟节点。
 * 当这个虚拟Transformation的下游Transformation(非虚拟Transformation)转换StreamNode、生成StreamEdge时,
 * 会找到这个“虚拟节点”的上游节点,直至找到“物理节点”为止,通过刚刚生成的StreamEdge将“虚拟节点”的上游和下游“物理节点”连接起来,
 * 并将这个虚拟Transformation的Partitioner分区器等属性一并封装到这个StreamEdge中
 *
 * 例如:物理StreamNode--StreamEdge(虚拟Transformation的相关属性)--物理StreamNode
 */
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
    // 获取当前Transformation的“上一个”Transformation
    Transformation<T> input = partition.getInput();
    List<Integer> resultIds = new ArrayList<>();

    // 递归调用,对当前虚拟Transformation的所有上游Transformation进行解析,完成后会返回Transformation唯一ID
    Collection<Integer> transformedIds = transform(input);

    /**
     * 当前虚拟Transformation的所有上游Transformation均已解析处理完毕,现在要为它生成并添加“虚拟节点”。
     */
    for (Integer transformedId: transformedIds) {
        // 虚拟节点id:就是一个“累加递增”的数字编号
        int virtualId = Transformation.getNewNodeId();
        /**
         * 在StreamGraph中添加一个“虚拟节点”--VirtualPartitionNode,此处不会产生物理操作,仅仅表示数据的流转方向。
         * 本质上就是以virtualId(虚拟节点id)为Key,将配置信息封装成Tuple3并作为Value,存储到Map中。
         * 这些配置信息将会帮助Task实例构建上下游算子之间的数据传输策略
         */
        streamGraph.addVirtualPartitionNode(
            // 参数:上游Transformation ID,虚拟节点id,当前虚拟Transformation对应的Partitioner和ShuffleMode
            transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
        // 将虚拟节点id保存到List中
        resultIds.add(virtualId);
    }

    return resultIds;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

添加虚拟节点的核心逻辑:将当前虚拟Transformation和它上游Transformation建立起某种联系(包装对象),保存到StreamGraph中的Map集合中

/**
 * 向StreamGraph中添加一个“虚拟节点”,本质就是将当前虚拟Transformation和它上游Transformation建立起某种联系,
 * 保存到StreamGraph内的Map集合中
 */
public void addVirtualPartitionNode(
    Integer originalId, // 上游Transformation ID
    Integer virtualId, // 当前虚拟Transformation对应的虚拟ID
    StreamPartitioner<?> partitioner,
    ShuffleMode shuffleMode) {

    // 安全检查:如果保存有“虚拟节点”的Map结构中包含当前这个key(虚拟节点ID),那就抛异常
    if (virtualPartitionNodes.containsKey(virtualId)) {
        throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
    }

    // 按照“虚拟节点ID:虚拟Transformation的相关信息”的映射关系,保存到Map结构中,后续添加StreamEdge时会连接到实际的“物理节点“
    virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

与其说是添加虚拟节点,倒不如说就是把虚拟Transformation的相关信息保存起来。虚拟Transformation的相关信息会被封装到Tuple3中,并按照“虚拟节点ID:虚拟Transformation的相关信息”的映射关系,将它们保存到Map结构中,后续流程中生成StreamEdge时会有大用!

解析“物理Transformation”

解析物理Transformation的本质就是生成StreamNode和StreamEdge,并让二者之间建立关联关系。当这个物理Transformation的所有上游全都解析完成后,接下来的2个核心操作就是addOperator和addEdge

/**
 * 以递归的方式,对上游Transformation进行转换,同时伴随着StreamNode以及StreamEdge的构建。
 * 在对Transformation树进行遍历解析时,会优先保证它的”上一个Transformation“已经解析完毕。
 * 由于Transformation中以成员变量(也就是input指针)的形式“合法持有”了它“上一次Transformation“,
 * 因此Transformation树之间的指向关系是从Sink向Source,而解析顺序则是从Source向Sink。
 */
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

    /**
	 * 递归调用,解析Transformation:保证解析当前Transformation时,它前面的Transformation都已经解析完毕!
	 * 所有解析过的Transformation,它们的唯一ID会保存起来,用来参与到“构建StreamEdge”中去。
	 */
    Collection<Integer> inputIds = transform(transform.getInput());

    /**避免递归调用过程中出现“重复转换”现象,将所有已经转换过的Transformation的唯一ID return*/
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    // 获取Slot共享组:slotSharingGroup
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

    /**
	 * 构建StreamGraph结构中的StreamNode:将StreamOperator添加到StreamNode中(相互映射、对应)
	 * 在使用DataStream提供的转换方法时,Transformation持有OperatorFactory,OperatorFactory持有StreamOperator,StreamOperator持有自定义xxxFunction
	 */
    streamGraph.addOperator(transform.getId(), // 对应关系 ---> StreamNode:Transformation:StreamOperator
                            // Slot共享组
                            slotSharingGroup,
                            transform.getCoLocationGroupKey(),
                            // 将Transformation中的OperatorFactory(StreamOperator的“代言人”),add到StreamGraph中
                            transform.getOperatorFactory(),
                            // 本次Transformation的输入类型,即上一次Transformation的输出类型
                            transform.getInputType(),
                            // 本次Transformation的输出类型
                            transform.getOutputType(),
                            transform.getName());

    if (transform.getStateKeySelector() != null) {
        TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
        // 设定KeySelector
        streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
    }

    // 获取Transformation的并行度
    int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        transform.getParallelism() : executionConfig.getParallelism();
    // 为StreamGraph设置并行度
    streamGraph.setParallelism(transform.getId(), parallelism);
    // 为StreamGraph设置最大并行度
    streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

    /**
	 * 创建StreamEdge:当前Transformation对应的StreamNode已经准备好了,它上游的所有Transformation
	 * 也解析完成了(对应的StreamNode也准备好了),可以用StreamEdge把它俩连起来了。
	 */
    for (Integer inputId: inputIds) {
        // 将上一个Transformation和当前Transformation的Transformation唯一ID连接起来,构建成StreamGraph中的StreamEdge
        // 参数:上游Transformation ID、当前Transformation ID
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

    // 返回当前Transformation的唯一ID(递增整数)。作为已经解析完毕的Transformation,它会被用来构建StreamEdge
    return Collections.singleton(transform.getId());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

a. 添加StreamNode

解析物理Transformation的第一步,就是为当前Transformation构建对应的StreamNode:

/**
 * 构建StreamGraph的StreamNode(物理节点,等同于StreamOperator)
 */
public <IN, OUT> void addOperator(
    Integer vertexID, // 当前Transformation的唯一ID
    @Nullable String slotSharingGroup,
    @Nullable String coLocationGroup,
    StreamOperatorFactory<OUT> operatorFactory, // 持有StreamOperator的OperatorFactory
    TypeInformation<IN> inTypeInfo, // 本次Transformation的输入类型,也就是上一个Transformation的输出类型
    TypeInformation<OUT> outTypeInfo, // 本次Transformation的输出类型
    String operatorName) {

    // 区别在于StreamTask的子类不同
    if (operatorFactory.isStreamSource()) {
        // Source节点
        addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
    } else {
        // 非Source节点
        addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
    }

    
    // 省略部分代码(将本次Transformation的输入类型、输出类型交给“持有”StreamOperator的OperatorFactory)...
}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

addNode的区别就在于这个StreamOperator是否为StreamSource

/**
 * 添加StreamNode的本质:按照“Transformation的唯一ID:StreamNode”的映射关系,将创建的StreamNode保存到Map集合中
 */
protected StreamNode addNode(Integer vertexID, // 当前Transformation的唯一ID
                             @Nullable String slotSharingGroup,
                             @Nullable String coLocationGroup,
                             Class<? extends AbstractInvokable> vertexClass, // OneInputStreamTas or SourceStreamTask
                             StreamOperatorFactory<?> operatorFactory,
                             String operatorName) {

    // 安全检查:理论上,当前Transformation所对应的StreamNode尚未生成。
    if (streamNodes.containsKey(vertexID)) {
        throw new RuntimeException("Duplicate vertexID " + vertexID);
    }

    /**
	 * 创建StreamGraph中的StreamNode:因为Transformation以成员变量的形式“合法持有”了StreamOperator,
	 * 现在又要用Transformation的唯一ID来构建StreamNode,因此在逻辑上可以将三者“混为一谈”,也就是StreamNode = Transformation = StreamOperator
	 */
    StreamNode vertex = new StreamNode(
        vertexID, // 当前Transformation的唯一ID
        slotSharingGroup,
        coLocationGroup,
        operatorFactory, // OperatorFactory持有着StreamOperator
        operatorName, // 算子Name
        new ArrayList<OutputSelector<?>>(),
        vertexClass);

    // 将已创建好的StreamNode,按照“Transformation ID:StreamNode”的映射关系,保存到StreamGraph的Map集合中
    streamNodes.put(vertexID, vertex);

    // 返回刚刚new出来的StreamNode
    return vertex;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

添加StreamNode,就是new一个StreamNode出来,将当前Transformation的唯一ID、OperatorFactory(通过成员变量持有着StreamOperator)、算子Name等保存到StreamNode中,说白了就是把StreamOperator包装成StreamNode。然后将StreamNode按照“Transformation ID:StreamNode”的映射关系,保存到StreamGraph内的Map集合中。

从这也能看出来,StreamOperator、StreamNode、Transformation在理解逻辑上是可以将它们三者“混为一谈”的

b.生成StreamEdge

在解析当前物理Transformation的过程中,它对应的StreamNode已经准备好了,并且它的所有上游Transformation均已解析完毕(它们的StreamNode也构建完成了)。

/**
 * 创建StreamEdge:当前Transformation对应的StreamNode已经准备好了,它上游的所有Transformation
 * 也解析完成了(对应的StreamNode也准备好了),可以用StreamEdge把它俩连起来了。
 */
for (Integer inputId: inputIds) {
    // 将上一个Transformation和当前Transformation的Transformation唯一ID连接起来,构建成StreamGraph中的StreamEdge
    // 参数:上游Transformation ID、当前Transformation ID
    streamGraph.addEdge(inputId, transform.getId(), 0);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

接下来就要为当前StreamNode(对应当前Transformation)和它所有上游StreamNode(对应上游Transformation)之间构建StreamEdge。

/**
 * 生成StreamEdge(负责连接上、下StreamNode):将上一个Transformation和当前Transformation分别对应的StreamNode连接起来
 */
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
    // 根据节点类型,创建StreamEdge将物理Transformation对应的StreamNode连接起来
    addEdgeInternal(upStreamVertexID, // 上一个Transformation的唯一ID
                    downStreamVertexID, // 当前Transformation的唯一ID
                    typeNumber,
                    null,
                    new ArrayList<String>(),
                    null,
                    null);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

根据当前Transformation的上游Transformation的节点类型,执行对应的处理逻辑。如果上游Transformation不是物理Transformation,那就会一直向上找,直至找到物理Transformation为止。

/**
 * 形成StreamEdge,将上一个和当前(物理的)Transformation ID对应的StreamNode关联起来(非虚拟节点)。
 * 核心目的:生成StreamEdge,并将StreamNode和StreamEdge关联起来(将生成的StreamEdge指定为上游StreamNode的输出边、下游StreamNode的输入边)
 * 注意:如果是虚拟Transformation转换生成的虚拟节点,此时会将虚拟Transformation内保存的信息,封装到StreamEdge中
 */
private void addEdgeInternal(Integer upStreamVertexID, // 上一个Transformation的唯一ID(物理 or 虚拟)
                             Integer downStreamVertexID, // 当前Transformation的唯一ID(即物理StreamNode)
                             int typeNumber,
                             StreamPartitioner<?> partitioner,
                             List<String> outputNames,
                             OutputTag outputTag,
                             ShuffleMode shuffleMode) {

    /**
	 * 根据上一个Transformation对应的节点类型,执行对应的处理逻辑。只要up不是物理节点,就继续向上找,直至找到物理节点为止。
	 * 最终,虚拟节点的Transformation信息,一定会被封装到2个物理StreamNode之间的StreamEdge中。
	 * 例如:StreamNode---StreamEdge(封装了2个StreamNode之间的虚拟节点的相关虚拟转换属性信息)---StreamNode
	 */
    if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        // 当上游是SlideOutput时
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {
            outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
    } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
        // 当上游是Select时,递归调用,并传入select信息
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
        if (outputNames.isEmpty()) {
            // selections that happen downstream override earlier selections
            outputNames = virtualSelectNodes.get(virtualId).f1;
        }
        // 将虚拟Transformation内的信息,保存到StreamEdge中
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        /**
		 * 如果当前Transformation对应的StreamNode的上游是虚拟节点(虚拟Transformation转换而来),就将这个虚拟节点对应的Partitioner封装到StreamEdge中
		 */

        // 上一个Transformation的唯一ID,也就是虚拟节点ID
        int virtualId = upStreamVertexID;
        /**从保存有“虚拟节点”的Map集合中,找到当前虚拟节点的上游节点,直至找到物理节点为止。*/ 
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {
            // 取出当前虚拟Transformation内保存的Partitioner
            partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        // 取出当前虚拟Transformation内保存的ShuffleMode
        shuffleMode = virtualPartitionNodes.get(virtualId).f2;
        /**将down作为down,将up的up作为新的up,带着旧up的Transformation信息,继续新一轮的逻辑判断*/ 
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else {
        /**
		 * 以上几种情况都是逻辑转换,下面才是真正的构建StreamEdge
		 */

        // 通过上一个Transformation的唯一ID,从保存StreamNode的Map集合中拿到它对应的StreamNode,它将作为这个StreamEdge的上游
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        // 通过当前Transformation的唯一ID,从保存StreamNode的Map集合中拿到它对应的StreamNode,它作为这个StreamEdge的下游
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        /**根据并行度,决定使用哪种数据分发策略:ForwardPartitioner or RebalancePartitioner*/
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            // 如果这个StreamEdge的上游StreamNode和下游StreamNode的并行度相同,那就使用ForwardPartitioner的数据分发策略(直接向后分发)
            partitioner = new ForwardPartitioner<Object>();
        } else if (partitioner == null) {
            // 如果这个StreamEdge的上游StreamNode和下游StreamNode的并行度不一样,那就使用RebalancePartitioner的数据分发策略(轮询向后分发)
            partitioner = new RebalancePartitioner<Object>();
        }

        // 保险:已经确定了ForwardPartitioner,但StreamEdge上下连接的2个StreamNode的并行度不同,那就抛异常
        if (partitioner instanceof ForwardPartitioner) {
            if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                throw new UnsupportedOperationException("Forward partitioning does not allow " +
                                                        "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                                                        ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                                                        " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
        }

        // 默认ShuffleMode为:未定义,交给框架决定(流 or 批)二选一
        if (shuffleMode == null) {
            shuffleMode = ShuffleMode.UNDEFINED;
        }

        /**构建StreamEdge,参数为:up StreamNode、down StreamNode,以及中间夹杂的虚拟Transformation的相关信息 */ 
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

        // 指定当前StreamEdge为up StreamNode的输出边
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        // 指定当前StreamEdge为down StreamNode的输入边
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

一旦找到上游节点为物理StreamNode,那就会创建StreamEdge,这也就意味着此时2个物理的StreamNode即将会被连接在一起,且中间还“隐藏”了虚拟节点。其中StreamEdge会作为这个up物理StreamNode的输出边、down物理StreamNode的输入边。

StreamNode和StreamEdge之间的连接关系的确立,是通过StreamNode内的List集合保存对应StreamEdge的方式完成的。

// 当前StreamNode的所有“输入边”
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
// 当前StreamNode的所有“输出边”
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();



/**
 * 将指定的StreamEdge作为当前StreamNode的“输出边”:上游StreamNode --(StreamEdge)-->
 */
public void addOutEdge(StreamEdge outEdge) {
    if (outEdge.getSourceId() != getId()) {
        throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
    } else {
        // 有可能有N个输出
        outEdges.add(outEdge);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

如此一来,上、下游的2个StreamNode就通过StreamEdge连接起来了,而那些不涉及物理转换操作的虚拟Transformation,则被“隐藏”到了StreamEdge中。

总结

基于“Transformation树”构建StreamGraph,就是利用Transformation之间的关联关系(每个Transformation都有一个input指针,指向前一个Transformation),构建DAG来描述计算逻辑之间的拓扑关系。其中物理Transformation会形成StreamNode,彼此之间用StreamEdge连接。而虚拟Transformation的相关信息会被添加到StreamEdge中“隐藏”起来。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/794408
推荐阅读
相关标签
  

闽ICP备14008679号