赞
踩
在开发Flink任务时,在逻辑代码的最后要调用以下代码保证逻辑执行:
// 创建流式作业的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 执行
env.execute();
调用StreamExecutionEnvironment#execute()这一步实际上就是要利用“Transformation树”去生成StreamGraph(“藏”虚拟,“显”物理),只有物理Transformation才会被添加到Transformation树。
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
// 基于StreamGraphGenerator(StreamGraph的生成器),生成StreamGraph,并清空Transformations集合。
// 将StreamGraph作为参数传入
return execute(getStreamGraph(jobName));
}
StreamGraph是利用StreamGraphGenerator生成的,以下过程会解析Transformation树、生成StreamNode和StreamEdge并建立连接关系,从而形成StreamGraph。
/**
* 基于StreamGraphGenerator生成StreamGraph,完成后会清空List<Transformation>集合。
* 核心:从SinkTransformation向前追溯到SourceTransformation,一边遍历、一边构建StreamGraph
*/
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
// 使用StreamGraphGenerator(StreamGraph的生成器)生成StreamGraph:包括利用Transformation生成对应的StreamNode和StreamEdge
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
// 传入的参数为true,清空Transformation树
if (clearTransformations) {
this.transformations.clear();
}
// 返回生成的StreamGraph
return streamGraph;
}
StreamGraphGenerator创建StreamGraph的主要逻辑是:遍历Transformation树,逐个解析Transformation生成物理StreamNode和StreamEdge,以此来形成StreamGraph结构。最后把转换过程中产生的数据清理掉。
/** * 创建StreamGraph的主要逻辑 * 核心:让StreamOperator作为StreamNode,并根据Transformation之间的关联关系创造StreamEdge。 * 然后给StreamEdge指定上下游连接的2个StreamNode,让它作为上游StreamNode的“输出边”、下游StreamNode的“输入边” */ public StreamGraph generate() { // 构建一个空的StreamGraph对象(目前这个StreamGraph里既没有StreamNode,也没有StreamEdge) streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings); // 设定StreamGraph的相关参数 streamGraph.setStateBackend(stateBackend); // 设置是否链化:默认true streamGraph.setChaining(chaining); streamGraph.setScheduleMode(scheduleMode); streamGraph.setUserArtifacts(userArtifacts); streamGraph.setTimeCharacteristic(timeCharacteristic); streamGraph.setJobName(jobName); streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains); // 初始化一个HashMap存储“已经转换过的Transformation”:转换过程涉及递归调用,这是为了避免重复转换 alreadyTransformed = new HashMap<>(); // 解析物理Transformation:遍历List<Transformation>集合(也就是Transformation树),生成对应的StreamGraph。 for (Transformation<?> transformation: transformations) { // 利用Transformation逐个创建出StreamGraph中的StreamNode和StreamEdge(Transformation类型不同,解析逻辑也不同) transform(transformation); } final StreamGraph builtStreamGraph = streamGraph; // 清理转换过程中产生的数据 alreadyTransformed.clear(); alreadyTransformed = null; streamGraph = null; // 返回StreamGraph return builtStreamGraph; }
物理Transformation经过解析会生成StreamNode(物理节点)和StreamEdge,而虚拟Transformation只会添加一个虚拟节点。整个Transformation解析过程从Transformation树开始入手,因为Transformation之间有连接关系,因此虚拟Transformation也会被解析。
首先就是要遍历Transformation树,递归解析每个Transformation:
/** * 遍历Transformation树,对每个“物理Transformation”进行解析,当然也会一并解析虚拟Transformation。据此生成对应的StreamNode和StreamEdge。 * 有了StreamNode和StreamEdge,就逐渐形成了DAG */ private Collection<Integer> transform(Transformation<?> transform) { /**如果Map包含当前Transformation,说明它已经转换过了,就直接return了,避免递归调用时产生“重复解析”*/ if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); // 如果某个StreamOperator的MaxParallelism设为了负数或未设置,这里会进行纠正:当前StreamOperator的并行度 = 整个Job的最大并行度 if (transform.getMaxParallelism() <= 0) { // 如果当前Transformation没有设置MaxParallelism,那就获取整个Job的MaxParallelism,作为本次Transformation的最大并行度 int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { // 默认:当前算子转换的并行度为当前Job的最大并行度 transform.setMaxParallelism(globalMaxParallelismFromConfig); } } /**获取当前Transformation的输出类型,保证当前Transformation的输出正常*/ transform.getOutputType(); /** * 因为在整个递归转换过程中,会将当前Transformation的全部上游(且已转换完毕的)Transformation的唯一ID return。 * 像union操作这种Transformation的解析,会将它上游所有已完成解析的Transformation的唯一ID以集合形式返回, * 因此这里需要使用Collection集合接收 */ Collection<Integer> transformedIds; /** * 对不同类型的Transformation,执行不同的转换/解析操作: * (递归调用)对上游的Transformation进行转换,确保当前Transformation的所有上游的Transformation都已经转换完毕; * 转换完成一个物理Transformation,就对应生成一个StreamNode,并用StreamEdge将StreamNode连接起来; */ if (transform instanceof OneInputTransformation<?, ?>) { // 对具体的转换类型进行解析,如map、filter等 transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { /**解析union操作对应的Transformation,会将union操作的所有(已完成解析的)上游Transformation的唯一ID以集合的形式返回*/ transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { // 虚拟Transformation处理(添加虚拟节点):不涉及数据处理,仅仅是描述上下游数据之间的传输关系 transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } /**将刚刚转换好的Transformation按照“Transformation:Transformation ID集合”的映射关系,存储到Map集合中*/ if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } // 省略部分代码... return transformedIds; }
对于“Transformation树”的解析过程涉及到了递归调用,目的就是为了确保当前Transformation的所有的上游Transformation全部解析完毕,这样才能进一步构建StreamNode和StreamEdge。为了避免因递归而造成的重复解析,解析完成的Transformation会保存到Map集合中。如果哪个Transformation已经解析过了,那就直接return,避免重复解析。
“虚拟Transformation”由非物理转换操作而来,如shuffle、union等,解析虚拟Transformation,并不会对应生成StreamNode,而是会生成对应数量的“虚拟节点”(因为这个虚拟Transformation可能会有N个直接上游物理Transformation)。
可以看出,解析虚拟Transformation的过程中同样也涉及到了递归调用,就是为了保证它的所有上游Transformation全都解析完毕。解析完毕后会得到它上游Transformation对应的唯一ID,用来生成对应数量的虚拟节点。
/** * 虚拟Transformation处理:不会转换出StreamNode,而是添加为虚拟节点。 * 当这个虚拟Transformation的下游Transformation(非虚拟Transformation)转换StreamNode、生成StreamEdge时, * 会找到这个“虚拟节点”的上游节点,直至找到“物理节点”为止,通过刚刚生成的StreamEdge将“虚拟节点”的上游和下游“物理节点”连接起来, * 并将这个虚拟Transformation的Partitioner分区器等属性一并封装到这个StreamEdge中 * * 例如:物理StreamNode--StreamEdge(虚拟Transformation的相关属性)--物理StreamNode */ private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) { // 获取当前Transformation的“上一个”Transformation Transformation<T> input = partition.getInput(); List<Integer> resultIds = new ArrayList<>(); // 递归调用,对当前虚拟Transformation的所有上游Transformation进行解析,完成后会返回Transformation唯一ID Collection<Integer> transformedIds = transform(input); /** * 当前虚拟Transformation的所有上游Transformation均已解析处理完毕,现在要为它生成并添加“虚拟节点”。 */ for (Integer transformedId: transformedIds) { // 虚拟节点id:就是一个“累加递增”的数字编号 int virtualId = Transformation.getNewNodeId(); /** * 在StreamGraph中添加一个“虚拟节点”--VirtualPartitionNode,此处不会产生物理操作,仅仅表示数据的流转方向。 * 本质上就是以virtualId(虚拟节点id)为Key,将配置信息封装成Tuple3并作为Value,存储到Map中。 * 这些配置信息将会帮助Task实例构建上下游算子之间的数据传输策略 */ streamGraph.addVirtualPartitionNode( // 参数:上游Transformation ID,虚拟节点id,当前虚拟Transformation对应的Partitioner和ShuffleMode transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode()); // 将虚拟节点id保存到List中 resultIds.add(virtualId); } return resultIds; }
添加虚拟节点的核心逻辑:将当前虚拟Transformation和它上游Transformation建立起某种联系(包装对象),保存到StreamGraph中的Map集合中
/** * 向StreamGraph中添加一个“虚拟节点”,本质就是将当前虚拟Transformation和它上游Transformation建立起某种联系, * 保存到StreamGraph内的Map集合中 */ public void addVirtualPartitionNode( Integer originalId, // 上游Transformation ID Integer virtualId, // 当前虚拟Transformation对应的虚拟ID StreamPartitioner<?> partitioner, ShuffleMode shuffleMode) { // 安全检查:如果保存有“虚拟节点”的Map结构中包含当前这个key(虚拟节点ID),那就抛异常 if (virtualPartitionNodes.containsKey(virtualId)) { throw new IllegalStateException("Already has virtual partition node with id " + virtualId); } // 按照“虚拟节点ID:虚拟Transformation的相关信息”的映射关系,保存到Map结构中,后续添加StreamEdge时会连接到实际的“物理节点“ virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode)); }
与其说是添加虚拟节点,倒不如说就是把虚拟Transformation的相关信息保存起来。虚拟Transformation的相关信息会被封装到Tuple3中,并按照“虚拟节点ID:虚拟Transformation的相关信息”的映射关系,将它们保存到Map结构中,后续流程中生成StreamEdge时会有大用!
解析物理Transformation的本质就是生成StreamNode和StreamEdge,并让二者之间建立关联关系。当这个物理Transformation的所有上游全都解析完成后,接下来的2个核心操作就是addOperator和addEdge
/** * 以递归的方式,对上游Transformation进行转换,同时伴随着StreamNode以及StreamEdge的构建。 * 在对Transformation树进行遍历解析时,会优先保证它的”上一个Transformation“已经解析完毕。 * 由于Transformation中以成员变量(也就是input指针)的形式“合法持有”了它“上一次Transformation“, * 因此Transformation树之间的指向关系是从Sink向Source,而解析顺序则是从Source向Sink。 */ private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) { /** * 递归调用,解析Transformation:保证解析当前Transformation时,它前面的Transformation都已经解析完毕! * 所有解析过的Transformation,它们的唯一ID会保存起来,用来参与到“构建StreamEdge”中去。 */ Collection<Integer> inputIds = transform(transform.getInput()); /**避免递归调用过程中出现“重复转换”现象,将所有已经转换过的Transformation的唯一ID return*/ if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } // 获取Slot共享组:slotSharingGroup String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); /** * 构建StreamGraph结构中的StreamNode:将StreamOperator添加到StreamNode中(相互映射、对应) * 在使用DataStream提供的转换方法时,Transformation持有OperatorFactory,OperatorFactory持有StreamOperator,StreamOperator持有自定义xxxFunction */ streamGraph.addOperator(transform.getId(), // 对应关系 ---> StreamNode:Transformation:StreamOperator // Slot共享组 slotSharingGroup, transform.getCoLocationGroupKey(), // 将Transformation中的OperatorFactory(StreamOperator的“代言人”),add到StreamGraph中 transform.getOperatorFactory(), // 本次Transformation的输入类型,即上一次Transformation的输出类型 transform.getInputType(), // 本次Transformation的输出类型 transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig); // 设定KeySelector streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } // 获取Transformation的并行度 int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transform.getParallelism() : executionConfig.getParallelism(); // 为StreamGraph设置并行度 streamGraph.setParallelism(transform.getId(), parallelism); // 为StreamGraph设置最大并行度 streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); /** * 创建StreamEdge:当前Transformation对应的StreamNode已经准备好了,它上游的所有Transformation * 也解析完成了(对应的StreamNode也准备好了),可以用StreamEdge把它俩连起来了。 */ for (Integer inputId: inputIds) { // 将上一个Transformation和当前Transformation的Transformation唯一ID连接起来,构建成StreamGraph中的StreamEdge // 参数:上游Transformation ID、当前Transformation ID streamGraph.addEdge(inputId, transform.getId(), 0); } // 返回当前Transformation的唯一ID(递增整数)。作为已经解析完毕的Transformation,它会被用来构建StreamEdge return Collections.singleton(transform.getId()); }
解析物理Transformation的第一步,就是为当前Transformation构建对应的StreamNode:
/** * 构建StreamGraph的StreamNode(物理节点,等同于StreamOperator) */ public <IN, OUT> void addOperator( Integer vertexID, // 当前Transformation的唯一ID @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, // 持有StreamOperator的OperatorFactory TypeInformation<IN> inTypeInfo, // 本次Transformation的输入类型,也就是上一个Transformation的输出类型 TypeInformation<OUT> outTypeInfo, // 本次Transformation的输出类型 String operatorName) { // 区别在于StreamTask的子类不同 if (operatorFactory.isStreamSource()) { // Source节点 addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName); } else { // 非Source节点 addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName); } // 省略部分代码(将本次Transformation的输入类型、输出类型交给“持有”StreamOperator的OperatorFactory)... }
addNode的区别就在于这个StreamOperator是否为StreamSource
/** * 添加StreamNode的本质:按照“Transformation的唯一ID:StreamNode”的映射关系,将创建的StreamNode保存到Map集合中 */ protected StreamNode addNode(Integer vertexID, // 当前Transformation的唯一ID @Nullable String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, // OneInputStreamTas or SourceStreamTask StreamOperatorFactory<?> operatorFactory, String operatorName) { // 安全检查:理论上,当前Transformation所对应的StreamNode尚未生成。 if (streamNodes.containsKey(vertexID)) { throw new RuntimeException("Duplicate vertexID " + vertexID); } /** * 创建StreamGraph中的StreamNode:因为Transformation以成员变量的形式“合法持有”了StreamOperator, * 现在又要用Transformation的唯一ID来构建StreamNode,因此在逻辑上可以将三者“混为一谈”,也就是StreamNode = Transformation = StreamOperator */ StreamNode vertex = new StreamNode( vertexID, // 当前Transformation的唯一ID slotSharingGroup, coLocationGroup, operatorFactory, // OperatorFactory持有着StreamOperator operatorName, // 算子Name new ArrayList<OutputSelector<?>>(), vertexClass); // 将已创建好的StreamNode,按照“Transformation ID:StreamNode”的映射关系,保存到StreamGraph的Map集合中 streamNodes.put(vertexID, vertex); // 返回刚刚new出来的StreamNode return vertex; }
添加StreamNode,就是new一个StreamNode出来,将当前Transformation的唯一ID、OperatorFactory(通过成员变量持有着StreamOperator)、算子Name等保存到StreamNode中,说白了就是把StreamOperator包装成StreamNode。然后将StreamNode按照“Transformation ID:StreamNode”的映射关系,保存到StreamGraph内的Map集合中。
从这也能看出来,StreamOperator、StreamNode、Transformation在理解逻辑上是可以将它们三者“混为一谈”的。
在解析当前物理Transformation的过程中,它对应的StreamNode已经准备好了,并且它的所有上游Transformation均已解析完毕(它们的StreamNode也构建完成了)。
/**
* 创建StreamEdge:当前Transformation对应的StreamNode已经准备好了,它上游的所有Transformation
* 也解析完成了(对应的StreamNode也准备好了),可以用StreamEdge把它俩连起来了。
*/
for (Integer inputId: inputIds) {
// 将上一个Transformation和当前Transformation的Transformation唯一ID连接起来,构建成StreamGraph中的StreamEdge
// 参数:上游Transformation ID、当前Transformation ID
streamGraph.addEdge(inputId, transform.getId(), 0);
}
接下来就要为当前StreamNode(对应当前Transformation)和它所有上游StreamNode(对应上游Transformation)之间构建StreamEdge。
/**
* 生成StreamEdge(负责连接上、下StreamNode):将上一个Transformation和当前Transformation分别对应的StreamNode连接起来
*/
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
// 根据节点类型,创建StreamEdge将物理Transformation对应的StreamNode连接起来
addEdgeInternal(upStreamVertexID, // 上一个Transformation的唯一ID
downStreamVertexID, // 当前Transformation的唯一ID
typeNumber,
null,
new ArrayList<String>(),
null,
null);
}
根据当前Transformation的上游Transformation的节点类型,执行对应的处理逻辑。如果上游Transformation不是物理Transformation,那就会一直向上找,直至找到物理Transformation为止。
/** * 形成StreamEdge,将上一个和当前(物理的)Transformation ID对应的StreamNode关联起来(非虚拟节点)。 * 核心目的:生成StreamEdge,并将StreamNode和StreamEdge关联起来(将生成的StreamEdge指定为上游StreamNode的输出边、下游StreamNode的输入边) * 注意:如果是虚拟Transformation转换生成的虚拟节点,此时会将虚拟Transformation内保存的信息,封装到StreamEdge中 */ private void addEdgeInternal(Integer upStreamVertexID, // 上一个Transformation的唯一ID(物理 or 虚拟) Integer downStreamVertexID, // 当前Transformation的唯一ID(即物理StreamNode) int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, ShuffleMode shuffleMode) { /** * 根据上一个Transformation对应的节点类型,执行对应的处理逻辑。只要up不是物理节点,就继续向上找,直至找到物理节点为止。 * 最终,虚拟节点的Transformation信息,一定会被封装到2个物理StreamNode之间的StreamEdge中。 * 例如:StreamNode---StreamEdge(封装了2个StreamNode之间的虚拟节点的相关虚拟转换属性信息)---StreamNode */ if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { // 当上游是SlideOutput时 int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { // 当上游是Select时,递归调用,并传入select信息 int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } // 将虚拟Transformation内的信息,保存到StreamEdge中 addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { /** * 如果当前Transformation对应的StreamNode的上游是虚拟节点(虚拟Transformation转换而来),就将这个虚拟节点对应的Partitioner封装到StreamEdge中 */ // 上一个Transformation的唯一ID,也就是虚拟节点ID int virtualId = upStreamVertexID; /**从保存有“虚拟节点”的Map集合中,找到当前虚拟节点的上游节点,直至找到物理节点为止。*/ upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { // 取出当前虚拟Transformation内保存的Partitioner partitioner = virtualPartitionNodes.get(virtualId).f1; } // 取出当前虚拟Transformation内保存的ShuffleMode shuffleMode = virtualPartitionNodes.get(virtualId).f2; /**将down作为down,将up的up作为新的up,带着旧up的Transformation信息,继续新一轮的逻辑判断*/ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); } else { /** * 以上几种情况都是逻辑转换,下面才是真正的构建StreamEdge */ // 通过上一个Transformation的唯一ID,从保存StreamNode的Map集合中拿到它对应的StreamNode,它将作为这个StreamEdge的上游 StreamNode upstreamNode = getStreamNode(upStreamVertexID); // 通过当前Transformation的唯一ID,从保存StreamNode的Map集合中拿到它对应的StreamNode,它作为这个StreamEdge的下游 StreamNode downstreamNode = getStreamNode(downStreamVertexID); /**根据并行度,决定使用哪种数据分发策略:ForwardPartitioner or RebalancePartitioner*/ if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { // 如果这个StreamEdge的上游StreamNode和下游StreamNode的并行度相同,那就使用ForwardPartitioner的数据分发策略(直接向后分发) partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { // 如果这个StreamEdge的上游StreamNode和下游StreamNode的并行度不一样,那就使用RebalancePartitioner的数据分发策略(轮询向后分发) partitioner = new RebalancePartitioner<Object>(); } // 保险:已经确定了ForwardPartitioner,但StreamEdge上下连接的2个StreamNode的并行度不同,那就抛异常 if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } // 默认ShuffleMode为:未定义,交给框架决定(流 or 批)二选一 if (shuffleMode == null) { shuffleMode = ShuffleMode.UNDEFINED; } /**构建StreamEdge,参数为:up StreamNode、down StreamNode,以及中间夹杂的虚拟Transformation的相关信息 */ StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); // 指定当前StreamEdge为up StreamNode的输出边 getStreamNode(edge.getSourceId()).addOutEdge(edge); // 指定当前StreamEdge为down StreamNode的输入边 getStreamNode(edge.getTargetId()).addInEdge(edge); } }
一旦找到上游节点为物理StreamNode,那就会创建StreamEdge,这也就意味着此时2个物理的StreamNode即将会被连接在一起,且中间还“隐藏”了虚拟节点。其中StreamEdge会作为这个up物理StreamNode的输出边、down物理StreamNode的输入边。
StreamNode和StreamEdge之间的连接关系的确立,是通过StreamNode内的List集合保存对应StreamEdge的方式完成的。
// 当前StreamNode的所有“输入边” private List<StreamEdge> inEdges = new ArrayList<StreamEdge>(); // 当前StreamNode的所有“输出边” private List<StreamEdge> outEdges = new ArrayList<StreamEdge>(); /** * 将指定的StreamEdge作为当前StreamNode的“输出边”:上游StreamNode --(StreamEdge)--> */ public void addOutEdge(StreamEdge outEdge) { if (outEdge.getSourceId() != getId()) { throw new IllegalArgumentException("Source id doesn't match the StreamNode id"); } else { // 有可能有N个输出 outEdges.add(outEdge); } }
如此一来,上、下游的2个StreamNode就通过StreamEdge连接起来了,而那些不涉及物理转换操作的虚拟Transformation,则被“隐藏”到了StreamEdge中。
基于“Transformation树”构建StreamGraph,就是利用Transformation之间的关联关系(每个Transformation都有一个input指针,指向前一个Transformation),构建DAG来描述计算逻辑之间的拓扑关系。其中物理Transformation会形成StreamNode,彼此之间用StreamEdge连接。而虚拟Transformation的相关信息会被添加到StreamEdge中“隐藏”起来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。