赞
踩
Flink中的执行图可以分成四层:
StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
是根据用户代码通过Stream API编写的代码生成的最初的图。用来表示程序的拓扑结构。
在客户端生成流图
StreamGraph 经过优化后生成了JobGraph,提交给Jobmanager的数据结构,主要的优化为,将多个符合条件的节点 chain在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
在客户端生成作业图
JobManager根据JobGraph生成的。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
从jobgraph生成ExecutionGraph是在JobManager中完成的。
ExecutionJobVertex
该对象和 JobGraph 中的 JobVertex 一 一对应。该对象还包含一组 ExecutionVertex, 数量 与该 JobVertex 中所包含的StreamNode 的并行度一致,假设 StreamNode 的并行度为5 ,那么ExecutionJobVertex中也会包含 5个ExecutionVertex。
ExecutionJobVertex用来将一个JobVertex 封装成 ExecutionJobVertex,并依次创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富ExecutionGraph。
ExecutionVertex
ExecutionJobVertex会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。
IntermediateResult
IntermediateResult 又叫作中间结果集,该对象是个逻辑概念 表示 ExecutionJobVertex输出,和 JobGraph 中的IntermediateDalaSet 一 一对应,同样 一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)。
IntermediateResultPartition
IntermediateResultPartition 又叫作中间结果分区。表示一个 ExecutionVertex分区的输出结果,与 Execution Edge 相关联。
ExecutionEdge
表示ExecutionVertex 的输入,连接到上游产生的IntermediateResultPartition,一个Execution对应唯一的一个IntermediateResultPartition 和一个ExecutionVertex,一个ExecutionVertex 可以有多个ExecutionEdge。
Execution
ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。
当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。
JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
JobManager根据Execution对job进行调度后,在各个TaskManager上部署Task后形成的图,并不是一个具体的数据结构。
在per job模式下,该图是在客户端生成的,找到StreamExecutionEnvironment.java,找到execute方法,查看如下源码
public JobExecutionResult execute() throws Exception {
// 获取流图 并将流图传入execute
return execute(getStreamGraph());
}
流作业的{@link StreamGraph}的Getter方法。
传入的true表示调用清除以前注册的{@link Transformation转换}。
返回结果表示转换的流图
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
流作业的{@link StreamGraph}的Getter方法,带有清除以前注册的{@link Transformation转换}的选项。例如,清除转换允许在多次调用{@link #execute()}时不重新执行相同的操作。
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
上面传入的 transformations是怎么来的呢?
在StreamExecutionEnvironment.java类中,看到了这个全局变量,并且protected final
protected final List<Transformation<?>> transformations = new ArrayList<>();
点击变量名transformations,分别找到2276行、2353行和2668行(2668行是一个获取transformations的方法,暂时不介绍)
@Internal
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
那么现在咱们只知道添加到这个transformations集合体了,但是源头是放进去的,谁调用了这个方法呢?
该方法是一个内部方法,并不意味着用户可以使用。创建operator的API方法必须调用此方法。
上面图中圈住都是调用了addOperator这个方法的,其它要么是抽象类,那么是测试类,直接跳过。
上面圈中的类中,可以挨个点进去看下,方法体里面都执行了addOperator(transformation)这个方法,其实这个正好和咱们用户代码对应上,transformations也是依次被添加的。
当然上面还有剩余的类没有查看,如果有时间你们可以自己查看下。
上面介绍了trasnformation到底是从哪里来的,后面会讲每个种类的算子,到那个时候,会讲到transformation是怎么生成的,算子分哪些类等等问题。
分两个方法 第一是:getStreamGraphGenerator(transformations); 第二是generate()。我们一个一个来看下是怎么回事。
源码如下:
private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
// 这个判断对于新手而已是经常遇到的问题,明明提交了jar,但是为什么会报这个没有算子呢?一般都是打包的时候出的问题
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
// We copy the transformation so that newly added transformations cannot intervene with the
// stream graph generation.
return new StreamGraphGenerator(
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}
上面new StreamGraphGenerator复制转换,以便新添加的转换不能干预流图的生成。
下面我们接着查看generate()方法。
public StreamGraph generate() {
// 初始化流图对象
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
// 初始化唯一身份hashMap,这个transform作为key
alreadyTransformed = new IdentityHashMap<>();
// 用户代码里面的算子依次被添加到transforms集合中
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
代码中使用到Map算子,源码如下:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType =
TypeExtractor.getMapReturnTypes(
clean(mapper), getType(), Utils.getCallLocationName(), true);
return map(mapper, outType);
}
又调用了一层map
public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
这个时候调用transform方法 并且传入了三个参数,第一个是operatorName,map算子的名字默认是Map,当然也可以自定义,在用户代码里调用name()方法就行;第二个参数是输出的类型;第三个参数是用户代码传入的匿名函数
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
上面又调用了doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 读取输入Transform的输出类型,找出关于MissingTypeInfo的错误
transformation.getOutputType();
// 新的transformation会连接上当前的DataStream中的transformation,从而构建成一棵树
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// 调用addOperator
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
上面介绍了 transformations 集合中的元素是怎么来的。
接下来回到generate()方法中,
// transforms是一个list, 依次存放了 用户代码里的 算子
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
private Collection<Integer> transform(Transformation<?> transform) {
// 判断,如果包含
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
transform
.getSlotSharingGroup()
.ifPresent(
slotSharingGroup -> {
final ResourceSpec resourceSpec =
SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
slotSharingGroupResources.compute(
slotSharingGroup.getName(),
(name, profile) -> {
if (profile == null) {
return ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO);
} else if (!ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO)
.equals(profile)) {
throw new IllegalArgumentException(
"The slot sharing group "
+ slotSharingGroup.getName()
+ " has been configured with two different resource spec.");
} else {
return profile;
}
});
}
});
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
// 主要逻辑
if (translator != null) {
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
// 跳到最后一行
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform) {
checkNotNull(translator);
checkNotNull(transform);
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
final String slotSharingGroup =
determineSlotSharingGroup(
transform.getSlotSharingGroup().isPresent()
? transform.getSlotSharingGroup().get().getName()
: null,
allInputIds.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
final TransformationTranslator.Context context =
new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
// 选择流模式
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);
}
进入到SimpleTransformationTranslator.java类中
@Override
public final Collection<Integer> translateForStreaming(
final T transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final Collection<Integer> transformedIds =
translateForStreamingInternal(transformation, context);
configure(transformation, context);
return transformedIds;
}
真的是套娃啊,套的都懵了,笔者只要不专注就得重新来, 不过到了这种超企业级得架构就得把各种模式运用得淋漓尽致,也庆幸看到了flink得源码 ,真的老代码还在持续得更新,有很多框架只要没有逻辑和性能方面得问题,都不会再动改之前得代码,flink在这方面很难得。
protected abstract Collection<Integer> translateForStreamingInternal(
final T transformation, final Context context);
这是一个接口,使用快捷键 ctrl+H可以看到好多实现类
这些实现类到底是什么呢?
由于算子类型不一样,比如map,flatmap这些和keyby,union,connect本质上对数据得操作是有区别得,所以根据使用算子得类型差异使用对应得转换器。
基于map等类型算子,我们先看OnInputTransformationTranslator.java类中,有两个重写方法,translateForBatchInternal和translateForStreamingInternal,前者很明显,是关于批得,我还是看后者吧。
@Override
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation, final Context context) {
return translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);
}
继续套娃
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// 添加StreamNode 即顶点,点到这个方法里面可以到 顶点ID
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
int parallelism =
transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
// 这里添加边Edge,这里根据
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
return Collections.singleton(transformationId);
}
从addOperator()可以看到又调用了addNode()
private <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName,
Class<? extends TaskInvokable> invokableClass) {
// 此处调用了addNode
addNode(
vertexID,
slotSharingGroup,
coLocationGroup,
invokableClass,
operatorFactory,
operatorName);
setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);
}
if (operatorFactory.isInputTypeConfigurable()) {
operatorFactory.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
在看下方法streamGraph.addEdge(inputId, transformationId, 0);
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
StreamExchangeMode exchangeMode) {
// 当上游是侧输出时,递归调用,并传入侧输出信息
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
null,
outputTag,
exchangeMode);
}
// 当上游是partition时,递归调用,并传入partitioner信息
else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
exchangeMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
exchangeMode);
} else {
// 真正构建StreamEdge
createActualEdge(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputTag,
exchangeMode);
}
}
真的是在套娃啊,,一层一层的套,而且还递归,然后看到createActualEdge方法,才真是创建边
private void createActualEdge(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode) {
// 上游节点
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
// 下游节点
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 如果没有指定分区器,会为其选择forward 或 rebalance
if (partitioner == null
&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner =
executionConfig.isDynamicGraph()
? new ForwardForUnspecifiedPartitioner<>()
: new ForwardPartitioner<>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException(
"Forward partitioning does not allow "
+ "change of parallelism. Upstream operation: "
+ upstreamNode
+ " parallelism: "
+ upstreamNode.getParallelism()
+ ", downstream operation: "
+ downstreamNode
+ " parallelism: "
+ downstreamNode.getParallelism()
+ " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (exchangeMode == null) {
exchangeMode = StreamExchangeMode.UNDEFINED;
}
/**
* Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of
* self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be
* difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link
* StreamEdge}.
*/
int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
// 此处是创建边
StreamEdge edge =
new StreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
exchangeMode,
uniqueId);
// 此处是将前后顶点通过边连在了一起
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。