当前位置:   article > 正文

2、Flink作业转换-StreamGraph_flink 转换 transformation 为 streamgraph

flink 转换 transformation 为 streamgraph

1 StreamExecutionEnvironment

2 StreamTransformation

3 StreamOperator

4 StreamGraph源码分析

4.1 遍历 & 递归转换

4.2 添加物理节点

4.3 添加虚拟节点

4.4 添加边

5 实例讲解

1 StreamExecutionEnvironment


StreamExecutionEnvironment 是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类:LocalStreamEnvironmentRemoteStreamEnvironmentStreamContextEnvironment等。StreamExecutionEnvironment 也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfig 和 CheckpointConfig 中。现在先只关注拓扑结构的产生过程。通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:

  1. env.addSource(XXX)
  2. .map(XXX)
  3. .filter(XXX)
  4. .addSink(XXX)
  5. env.execute("job-name");

添加数据源后获得 DataStream, 之后通过不同的算子不停地在 DataStream 上实现转换过滤等逻辑,最终将结果输出到 sink 中。通过env.execute()执行作业,这里首先会生成一张StreaGraph:

  1. public JobExecutionResult execute(String jobName) throws Exception {
  2. return execute(getStreamGraph(jobName));
  3. }
  4. @Internal
  5. public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  6. StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  7. if (clearTransformations) {
  8. // transformations是一个list集合,存放所有的Transformation
  9. this.transformations.clear();
  10. }
  11. return streamGraph;
  12. }

在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换。

2 StreamTransformation


StreamTransformation代表了从一个或多个DataStream生成新DataStream的操作。DataStream的底层其实就是一个 StreamTransformation,描述了这个DataStream是怎么来的。

DataStream 上常见的 transformation 有 map、flatmap、filter等(更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。比如 DataStream.map源码如下:

  1. // SingleOutputStreamOperator为DataStream的子类
  2. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
  3. // 通过java reflection抽出mapper的返回值类型
  4. TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
  5. Utils.getCallLocationName(), true);
  6. // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类
  7. return transform("Map", outType, new StreamMap<>(clean(mapper)));
  8. }
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
  10. // read the output type of the input Transform to coax out errors about MissingTypeInfo
  11. transformation.getOutputType();
  12. // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树
  13. OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
  14. this.transformation,
  15. operatorName,
  16. operator,
  17. outTypeInfo,
  18. environment.getParallelism());
  19. @SuppressWarnings({ "unchecked", "rawtypes" })
  20. SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
  21. // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph
  22. getExecutionEnvironment().addOperator(resultTransform);
  23. return returnStream;
  24. }

从上方代码可以了解到,map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后将该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph。其分层实现如下图所示:

另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。

union、split/select、partition中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现,UnionTransformation,SplitTransformation,SelectTransformation,PartitionTransformation由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。

3 StreamOperator


DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。StreamOperator 类图如下:

所有实现类都继承了AbstractStreamOperator。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator,该类是封装了UDF的StreamOperator。UDF就是实现了Function接口的类,如MapFunction、FilterFunction

4 StreamGraph源码分析


StreamGraphGenerator 会基于 StreamExecutionEnvironment 的 transformations 列表来生成 StreamGraph

4.1 遍历 & 递归转换

在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用StreamGraphGenerator.transform方法。对于每一个 StreamTransformation, 确保当前其上游已经完成转换StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge

  1. public StreamGraph generate() {
  2. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  3. ......
  4. // 遍历transformations列表,进行转换
  5. for (Transformation<?> transformation : transformations) {
  6. transform(transformation);
  7. }
  8. final StreamGraph builtStreamGraph = streamGraph;
  9. alreadyTransformed.clear();
  10. alreadyTransformed = null;
  11. streamGraph = null;
  12. return builtStreamGraph;
  13. }
  14. // 对具体的一个transformation进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
  15. // 返回值为该transform的id集合,通常大小为1个(除FeedbackTransformation)
  16. private Collection<Integer> transform(StreamTransformation<?> transform) {
  17. // 跳过已经转换过的transformation
  18. if (alreadyTransformed.containsKey(transform)) {
  19. return alreadyTransformed.get(transform);
  20. }
  21. LOG.debug("Transforming " + transform);
  22. // 为了触发 MissingTypeInfo 的异常
  23. transform.getOutputType();
  24. Collection<Integer> transformedIds;
  25. if (transform instanceof OneInputTransformation<?, ?>) {
  26. transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
  27. } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
  28. transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
  29. } else if (transform instanceof SourceTransformation<?>) {
  30. transformedIds = transformSource((SourceTransformation<?>) transform);
  31. } else if (transform instanceof SinkTransformation<?>) {
  32. transformedIds = transformSink((SinkTransformation<?>) transform);
  33. } else if (transform instanceof UnionTransformation<?>) {
  34. transformedIds = transformUnion((UnionTransformation<?>) transform);
  35. } else if (transform instanceof SplitTransformation<?>) {
  36. transformedIds = transformSplit((SplitTransformation<?>) transform);
  37. } else if (transform instanceof SelectTransformation<?>) {
  38. transformedIds = transformSelect((SelectTransformation<?>) transform);
  39. } else if (transform instanceof FeedbackTransformation<?>) {
  40. transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
  41. } else if (transform instanceof CoFeedbackTransformation<?>) {
  42. transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
  43. } else if (transform instanceof PartitionTransformation<?>) {
  44. transformedIds = transformPartition((PartitionTransformation<?>) transform);
  45. } else {
  46. throw new IllegalStateException("Unknown transformation: " + transform);
  47. }
  48. // need this check because the iterate transformation adds itself before
  49. // transforming the feedback edges
  50. if (!alreadyTransformed.containsKey(transform)) {
  51. alreadyTransformed.put(transform, transformedIds);
  52. }
  53. if (transform.getBufferTimeout() > 0) {
  54. streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
  55. }
  56. if (transform.getUid() != null) {
  57. streamGraph.setTransformationId(transform.getId(), transform.getUid());
  58. }
  59. return transformedIds;
  60. }

对于不同类型的 StreamTransformation,分别调用对应的转换方法,该方法首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamEdge。以 最典型的 transformOneInputTransform 为例:

  1. private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
  2. // 递归对该transform的直接上游transform进行转换,获得直接上游id集合
  3. Collection<Integer> inputIds = transform(transform.getInput());
  4. // 递归调用可能已经处理过该transform了
  5. if (alreadyTransformed.containsKey(transform)) {
  6. return alreadyTransformed.get(transform);
  7. }
  8. String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
  9. // 添加 StreamNode
  10. streamGraph.addOperator(transform.getId(),
  11. slotSharingGroup,
  12. transform.getOperator(),
  13. transform.getInputType(),
  14. transform.getOutputType(),
  15. transform.getName());
  16. if (transform.getStateKeySelector() != null) {
  17. TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
  18. streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
  19. }
  20. streamGraph.setParallelism(transform.getId(), transform.getParallelism());
  21. // 依次连接到上游节点,创建 StreamEdge
  22. for (Integer inputId: inputIds) {
  23. streamGraph.addEdge(inputId, transform.getId(), 0);
  24. }
  25. return Collections.singleton(transform.getId());
  26. }

4.2 添加物理节点

接着看一看 StreamGraph 中添加节点和边的方法,首先是添加节点:

  1. protected StreamNode addNode(Integer vertexID,
  2. String slotSharingGroup,
  3. @Nullable String coLocationGroup,
  4. Class<? extends AbstractInvokable> vertexClass,
  5. StreamOperator<?> operatorObject,
  6. String operatorName) {
  7. if (streamNodes.containsKey(vertexID)) {
  8. throw new RuntimeException("Duplicate vertexID " + vertexID);
  9. }
  10. StreamNode vertex = new StreamNode(environment,
  11. vertexID,
  12. slotSharingGroup,
  13. coLocationGroup,
  14. operatorObject,
  15. operatorName,
  16. new ArrayList<OutputSelector<?>>(),
  17. vertexClass);
  18. //创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
  19. streamNodes.put(vertexID, vertex);
  20. return vertex;
  21. }

4.3 添加虚拟节点

对于一些不包含物理转换操作的 StreamTransformation,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。以 PartitionTansformation 为例, PartitionTansformation 是 KeyedStream 对应的转换:

  1. private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
  2. StreamTransformation<T> input = partition.getInput();
  3. List<Integer> resultIds = new ArrayList<>();
  4. //递归地转换上游节点
  5. Collection<Integer> transformedIds = transform(input);
  6. for (Integer transformedId: transformedIds) {
  7. int virtualId = StreamTransformation.getNewNodeId();
  8. //添加虚拟的 Partition 节点
  9. streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
  10. resultIds.add(virtualId);
  11. }
  12. return resultIds;
  13. }
  14. // StreamGraph.addVirtualPartitionNode()
  15. public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
  16. if (virtualPartitionNodes.containsKey(virtualId)) {
  17. throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
  18. }
  19. //添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
  20. virtualPartitionNodes.put(virtualId,
  21. new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
  22. }

4.4 添加边

前面提到,在每一个物理节点的转换上,会调用 StreamGraph.addEdge 在输入节点和当前节点之间建立边的连接:

  1. private void addEdgeInternal(Integer upStreamVertexID,
  2. Integer downStreamVertexID,
  3. int typeNumber,
  4. StreamPartitioner<?> partitioner,
  5. List<String> outputNames,
  6. OutputTag outputTag) {
  7. //先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
  8. //在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
  9. if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
  10. ......
  11. } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
  12. int virtualId = upStreamVertexID;
  13. upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
  14. if (partitioner == null) {
  15. partitioner = virtualPartitionNodes.get(virtualId).f1;
  16. }
  17. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
  18. } else {
  19. //两个物理节点
  20. StreamNode upstreamNode = getStreamNode(upStreamVertexID);
  21. StreamNode downstreamNode = getStreamNode(downStreamVertexID);
  22. // If no partitioner was specified and the parallelism of upstream and downstream
  23. // operator matches use forward partitioning, use rebalance otherwise.
  24. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  25. partitioner = new ForwardPartitioner<Object>();
  26. } else if (partitioner == null) {
  27. partitioner = new RebalancePartitioner<Object>();
  28. }
  29. if (partitioner instanceof ForwardPartitioner) {
  30. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  31. throw new UnsupportedOperationException("Forward partitioning does not allow " +
  32. "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
  33. ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
  34. " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  35. }
  36. }
  37. //创建 StreamEdge,保留了 StreamPartitioner 等属性
  38. StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
  39. //分别将StreamEdge添加到上游节点和下游节点
  40. getStreamNode(edge.getSourceId()).addOutEdge(edge);
  41. getStreamNode(edge.getTargetId()).addInEdge(edge);
  42. }
  43. }

这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。

5 实例讲解


如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区shuffle。我们会分析该程序是如何生成StreamGraph的。

  1. DataStream<String> text = env.socketTextStream(hostName, port);
  2. text.flatMap(new LineSplitter())
  3. .shuffle()
  4. .filter(new HelloFilter())
  5. .print();

首先会在env中生成一棵transformation树,用List<StreamTransformation<?>>保存。其结构图如下:

其中符号*为input指针,指向上游的transformation,从而形成了一棵transformation树。然后,通过调用StreamGraphGenerator.generate(env, transformations)来生成StreamGraph。自底向上递归调用每一个transformation,也就是说处理顺序是Source->FlatMap->Shuffle->Filter->Sink。

如上图所示:

  1. 首先处理Source,生成了Source的StreamNode。
  2. 然后处理FlatMap,生成了FlatMap的StreamNode,并生成StreamEdge连接上游Source和FlatMap。由于上下游的并发度不一样(1:4),所以此处是Rebalance分区。
  3. 然后处理Shuffle,由于是逻辑转换,并不会生成实际的节点。将partitioner信息暂存在virtuaPartitionNodes中。
  4. 在处理Filter时,生成了Filter的StreamNode。发现上游是shuffle,找到shuffle的上游FlatMap,创建StreamEdge与Filter相连。并把ShufflePartitioner的信息写到StreamEdge中。
  5. 最后处理Sink,创建Sink的StreamNode,并生成StreamEdge与上游Filter相连。由于上下游并发度一样(4:4),所以此处选择 Forward 分区。

最后可以通过 UI可视化 来观察得到的 StreamGraph。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号