赞
踩
Graph的概念:
Flink中的执行图可以分为四层:StreamGraph—>JobGraph—>ExecutionGraph—>物理执行图
StreamGraph:是根据用户通过StreamAPI编写的代码生成的最原始的图,用来表示程序的拓扑结构。
JobGraph:StreamGraph经过优化后生成了JobGraph,提交给JobManager的数据结构。主要优化chain合并算子链,减少数据在节点之间,序列化、反序列化、以及网络传输的消耗。
ExecutionGraph:JobManager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
物理执行图:JobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的“图”,并不是具体的数据结构。
StreamGraph中的每个顶点都是StreamNode,这个StreamNode其实就是一个Operator,连接两个StreamNode的是StreamEdge对象。
在StreamGraph向JobGraph转化过程中,会对StreamNode进行相应的优化,根据条件会对StreamNode 进行合并成为了JobVertex,而每个jobvertex就是JobGraph的端点,JobGraph的输出对象是IntermediateDataSet,存储JobGraph的输出内容,在JobGraph中,连接上游端点输出和下游端点的边对象叫做JobEdge。
以wordCount代码为例:
generator()方法中的configureStreamGraph 设置状态后端和检查点checkpoint
private void configureStreamGraph(final StreamGraph graph) { checkNotNull(graph); graph.setChaining(chaining); graph.setChainingOfOperatorsWithDifferentMaxParallelism( chainingOfOperatorsWithDifferentMaxParallelism); graph.setUserArtifacts(userArtifacts); graph.setTimeCharacteristic(timeCharacteristic); graph.setVertexDescriptionMode(configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE)); graph.setVertexNameIncludeIndexPrefix( configuration.get(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX)); graph.setAutoParallelismEnabled( configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED)); graph.setEnableCheckpointsAfterTasksFinish( configuration.get( ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)); setDynamic(graph); if (shouldExecuteInBatchMode) { configureStreamGraphBatch(graph); setDefaultBufferTimeout(-1); } else { configureStreamGraphStreaming(graph); } }
对于一个具体的transformation 又是如何转换成StreamNode和StreamEdge的?
// 进入transform()方法
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
private Collection<Integer> transform(Transformation<?> transform) { // 首先判断算子是否已经被transform了 if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from the ExecutionConfig. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } transform .getSlotSharingGroup() .ifPresent( slotSharingGroup -> { final ResourceSpec resourceSpec = SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup); if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) { slotSharingGroupResources.compute( slotSharingGroup.getName(), (name, profile) -> { if (profile == null) { return ResourceProfile.fromResourceSpec( resourceSpec, MemorySize.ZERO); } else if (!ResourceProfile.fromResourceSpec( resourceSpec, MemorySize.ZERO) .equals(profile)) { throw new IllegalArgumentException( "The slot sharing group " + slotSharingGroup.getName() + " has been configured with two different resource spec."); } else { return profile; } }); } }); // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); @SuppressWarnings("unchecked") final TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator<?, Transformation<?>>) //从translatorMap里拿出transformation和transformationTranslator,transformationTranslator的作用就是将Transformation转换为StreamNode translatorMap.get(transform.getClass()); // 将当前transformation转换成StreamNode 和StreamEdge用于构建StreamGraph Collection<Integer> transformedIds; if (translator != null) { // 进入translate方法 查看具体转换流程 transformedIds = translate(translator, transform); } else { transformedIds = legacyTransform(transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } return transformedIds; }
translate方法:
1、获取当前算子 转换成的transformation的所接收的所有上游输出的transform节点
2、设置slot共享组
3、流批模式的判断 ,进入translateForStreaming 具体实现类选择SimpleTransformationTranslator.
当前的转换只针对当前算子的节点,此处无法得到下游算子的信息,所以这里不会进行StreamEdge的构建,进入translateForStreaminfgInternal方法,由于debug现在走的是split的 translate 所以具体实现为OneIputTransformationTranslator。
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation, final Context context) {
// 进入translateInternal方法
return translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);
}
}
protected Collection<Integer> translateInternal( final Transformation<OUT> transformation, final StreamOperatorFactory<OUT> operatorFactory, final TypeInformation<IN> inputType, @Nullable final KeySelector<IN, ?> stateKeySelector, @Nullable final TypeInformation<?> stateKeyType, final Context context) { checkNotNull(transformation); checkNotNull(operatorFactory); checkNotNull(inputType); checkNotNull(context); final StreamGraph streamGraph = context.getStreamGraph(); final String slotSharingGroup = context.getSlotSharingGroup(); final int transformationId = transformation.getId(); final ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); // StreamGraph端会添加一个StreamNode streamGraph.addOperator( transformationId, slotSharingGroup, transformation.getCoLocationGroupKey(), operatorFactory, inputType, transformation.getOutputType(), transformation.getName()); if (stateKeySelector != null) { TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig); streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer); } int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transformation.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism( transformationId, parallelism, transformation.isParallelismConfigured()); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); // 获取所有的输入 final List<Transformation<?>> parentTransformations = transformation.getInputs(); checkState( parentTransformations.size() == 1, "Expected exactly one input transformation but found " + parentTransformations.size()); // 设置当前StreamNode和上游所有StreamNode之间的StreamEdge for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) { // 设置StreamGraph的边 // transformationId 当前顶点的ID,inputId为上游顶点的ID streamGraph.addEdge(inputId, transformationId, 0); } if (transformation instanceof PhysicalTransformation) { streamGraph.setSupportsConcurrentExecutionAttempts( transformationId, ((PhysicalTransformation<OUT>) transformation) .isSupportsConcurrentExecutionAttempts()); } return Collections.singleton(transformationId); }
总结:先调用StreamGraph.addOpertor将当前这个transform转换为StreamNode并添加到StreamGraph内。
然后获取当前transform的所有上游输入的节点的id,通过StreamGraph.addEdge来构建StreamEdge,并将StreamEdge添加到StreamGraph中。
进入StreamGraph.addOperator()方法查看转换成StreamNode并添加到StreamGraph的具体实现。
public <IN, OUT> void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { // 这里会判断invoke的类型 是SourceStreamTask 还是OneinputStreamTask // 三元表达式判断的结果 // invokableClass class org.apache.flink.streaming.runtime.tasks.OneInputStreamTask Class<? extends TaskInvokable> invokableClass = operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class; // 进入addOperator方法 查看addNode的具体实现逻辑 addOperator( vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName, invokableClass); }
如上图所示:addNode()方法
1、首先它new StreamNode() 对于每一个StreamOperator初始化了一个StreamNode
2、StreamNode.puts() 将该StreamNode加入到StreamGraph中,算子处理逻辑userfuction–>StreamOperator–>transformation–>StreamNode
在构建StreamNode的时候会判断InvokableClass事件,判断是否是Source算子如果是则为SourceStreamTask 如果不是则为OneInputStreamTask
至此addNode已经转换完成。
StreamGraph.addEdge()
进入addEdgeInternal()
private void addEdgeInternal( Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, StreamExchangeMode exchangeMode, IntermediateDataSetID intermediateDataSetId) { if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal( upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, exchangeMode, intermediateDataSetId); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } exchangeMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal( upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, exchangeMode, intermediateDataSetId); } else { // 进入createActualEdge() createActualEdge( upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputTag, exchangeMode, intermediateDataSetId); } }
private void createActualEdge( Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, OutputTag outputTag, StreamExchangeMode exchangeMode, IntermediateDataSetID intermediateDataSetId) { // 通过上游顶点拿到上游的StreamNodeId StreamNode upstreamNode = getStreamNode(upStreamVertexID); // 当前顶点的StreamNodeId对StreamEdge来说,该StreamNode为这条边的下游 StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. /** 如果上游的StreamNode和下游的StreamNode并行度一样则使用ForwardPartitioner 如果上游StreamNode和下游StreamNode并行度不一样,则使用RebalancePartitioner */ if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { if (partitioner instanceof ForwardForConsecutiveHashPartitioner) { partitioner = ((ForwardForConsecutiveHashPartitioner<?>) partitioner) .getHashPartitioner(); } else { throw new UnsupportedOperationException( "Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } } if (exchangeMode == null) { exchangeMode = StreamExchangeMode.UNDEFINED; } /** * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link * StreamEdge}. */ int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size(); // 创建StreamEdge StreamEdge edge = new StreamEdge( upstreamNode, downstreamNode, typeNumber, partitioner, outputTag, exchangeMode, uniqueId, intermediateDataSetId); // 将当前的StreamEdge对象设置为上游StreamNode的输出边 getStreamNode(edge.getSourceId()).addOutEdge(edge); // 将当前的StreamEdge对象设置为下游StreamNode的输入边 getStreamNode(edge.getTargetId()).addInEdge(edge); }
在上面的过程中,首先根据用户调用的算子,生成StreamOperator,然后将StreamOperator转化为Transformation,最后再将Transformation转化为StreamNode,在StreamNode构建完成之后先将StreamNode放入StreamGraph对象,再根据StreamNode的类型以及上下游StreamNode的关系开始构建StreamEdge,构建完成后使用StreamEdge将上下游有输出输入关系的StreamNode连接起来,在所有的StreamEdge连接完成后,StreamGraph就构建完成了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。