当前位置:   article > 正文

Flink源码系列(生成StreamGraph)-第三期_flink 生成stream ing graph

flink 生成stream ing graph

上一期指路:

第二期

上一期我们分析完了用户代码,其核心是把相关算子加入transformations中,这个对于生成流图很重要。我们在编写用户代码时,最后一步肯定会写env.execute这一步,如果不写的话,那么程序其实并没有真正意义上的运行。于是我们接着从execute函数开始进行源码分析

1.StreamExecutionEnvironment#execute

  1. public JobExecutionResult execute(String jobName) throws Exception {
  2. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  3. return execute(getStreamGraph(jobName));
  4. }

由于我们这一期只分析生成流图,所以这一期主要分析getStreamGraph函数,下一期就分析获取到流图后继续执行execute的过程。

2.StreamExecutionEnvironment#getStreamGraph->StreamExecutionEnvironment#getStreamGraph

  1. public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  2. StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  3. if (clearTransformations) {
  4. this.transformations.clear();
  5. }
  6. return streamGraph;
  7. }

3.StreamGraphGenerator#generate

  1. public StreamGraph generate() {
  2. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  3. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  4. configureStreamGraph(streamGraph);
  5. alreadyTransformed = new HashMap<>();
  6. for (Transformation<?> transformation: transformations) {
  7. transform(transformation);
  8. }
  9. final StreamGraph builtStreamGraph = streamGraph;
  10. alreadyTransformed.clear();
  11. alreadyTransformed = null;
  12. streamGraph = null;
  13. return builtStreamGraph;
  14. }

 重点关注transform(transformation),点击进入

  1. private Collection<Integer> transform(Transformation<?> transform) {
  2. if (alreadyTransformed.containsKey(transform)) {
  3. return alreadyTransformed.get(transform);
  4. }
  5. LOG.debug("Transforming " + transform);
  6. if (transform.getMaxParallelism() <= 0) {
  7. // if the max parallelism hasn't been set, then first use the job wide max parallelism
  8. // from the ExecutionConfig.
  9. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
  10. if (globalMaxParallelismFromConfig > 0) {
  11. transform.setMaxParallelism(globalMaxParallelismFromConfig);
  12. }
  13. }
  14. // call at least once to trigger exceptions about MissingTypeInfo
  15. transform.getOutputType();
  16. @SuppressWarnings("unchecked")
  17. final TransformationTranslator<?, Transformation<?>> translator =
  18. (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
  19. Collection<Integer> transformedIds;
  20. if (translator != null) {
  21. transformedIds = translate(translator, transform);
  22. } else {
  23. transformedIds = legacyTransform(transform);
  24. }
  25. // need this check because the iterate transformation adds itself before
  26. // transforming the feedback edges
  27. if (!alreadyTransformed.containsKey(transform)) {
  28. alreadyTransformed.put(transform, transformedIds);
  29. }
  30. return transformedIds;
  31. }

遍历所有transformations并转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge;返回值为该 transform 的 id 集合。

关注translate(translator, transform),点击并进入

  1. private Collection<Integer> translate(
  2. final TransformationTranslator<?, Transformation<?>> translator,
  3. final Transformation<?> transform) {
  4. checkNotNull(translator);
  5. checkNotNull(transform);
  6. final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
  7. // the recursive call might have already transformed this
  8. if (alreadyTransformed.containsKey(transform)) {
  9. return alreadyTransformed.get(transform);
  10. }
  11. final String slotSharingGroup = determineSlotSharingGroup(
  12. transform.getSlotSharingGroup(),
  13. allInputIds.stream()
  14. .flatMap(Collection::stream)
  15. .collect(Collectors.toList()));
  16. final TransformationTranslator.Context context = new ContextImpl(
  17. this, streamGraph, slotSharingGroup, configuration);
  18. return shouldExecuteInBatchMode
  19. ? translator.translateForBatch(transform, context)
  20. : translator.translateForStreaming(transform, context);
  21. }

我们直接看最后几行,我们是流模式,所以执行translator.translateForStreaming(transform, context),点击进入

  1. public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
  2. checkNotNull(transformation);
  3. checkNotNull(context);
  4. final Collection<Integer> transformedIds =
  5. translateForStreamingInternal(transformation, context);
  6. configure(transformation, context);
  7. return transformedIds;
  8. }

点击translateForStreamingInternal,要分情况,加入是keyBy的话,会执行PartitionTransformationTranslator中的translateForStreamingInternal,如果是flatMap,会执行OneInputTransformationTranslator中的translateForStreamingInternal。

4.PartitionTransformationTranslator#translateForStreamingInternal

  1. protected Collection<Integer> translateForStreamingInternal(
  2. final PartitionTransformation<OUT> transformation,
  3. final Context context) {
  4. return translateInternal(transformation, context);
  5. }

 点击translateInternal进入

  1. private Collection<Integer> translateInternal(
  2. final PartitionTransformation<OUT> transformation,
  3. final Context context) {
  4. checkNotNull(transformation);
  5. checkNotNull(context);
  6. final StreamGraph streamGraph = context.getStreamGraph();
  7. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  8. checkState(
  9. parentTransformations.size() == 1,
  10. "Expected exactly one input transformation but found " + parentTransformations.size());
  11. final Transformation<?> input = parentTransformations.get(0);
  12. List<Integer> resultIds = new ArrayList<>();
  13. for (Integer inputId: context.getStreamNodeIds(input)) {
  14. final int virtualId = Transformation.getNewNodeId();
  15. streamGraph.addVirtualPartitionNode(
  16. inputId,
  17. virtualId,
  18. transformation.getPartitioner(),
  19. transformation.getShuffleMode());
  20. resultIds.add(virtualId);
  21. }
  22. return resultIds;
  23. }

5.OneInputTransformationTranslator#translateForStreamingInternal

  1. public Collection<Integer> translateForStreamingInternal(
  2. final OneInputTransformation<IN, OUT> transformation,
  3. final Context context) {
  4. return translateInternal(transformation,
  5. transformation.getOperatorFactory(),
  6. transformation.getInputType(),
  7. transformation.getStateKeySelector(),
  8. transformation.getStateKeyType(),
  9. context
  10. );
  11. }

点击translateInternal进入

  1. protected Collection<Integer> translateInternal(
  2. final Transformation<OUT> transformation,
  3. final StreamOperatorFactory<OUT> operatorFactory,
  4. final TypeInformation<IN> inputType,
  5. @Nullable final KeySelector<IN, ?> stateKeySelector,
  6. @Nullable final TypeInformation<?> stateKeyType,
  7. final Context context) {
  8. checkNotNull(transformation);
  9. checkNotNull(operatorFactory);
  10. checkNotNull(inputType);
  11. checkNotNull(context);
  12. final StreamGraph streamGraph = context.getStreamGraph();
  13. final String slotSharingGroup = context.getSlotSharingGroup();
  14. final int transformationId = transformation.getId();
  15. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  16. streamGraph.addOperator(
  17. transformationId,
  18. slotSharingGroup,
  19. transformation.getCoLocationGroupKey(),
  20. operatorFactory,
  21. inputType,
  22. transformation.getOutputType(),
  23. transformation.getName());
  24. if (stateKeySelector != null) {
  25. TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
  26. streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
  27. }
  28. int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
  29. ? transformation.getParallelism()
  30. : executionConfig.getParallelism();
  31. streamGraph.setParallelism(transformationId, parallelism);
  32. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  33. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  34. checkState(
  35. parentTransformations.size() == 1,
  36. "Expected exactly one input transformation but found " + parentTransformations.size());
  37. for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
  38. streamGraph.addEdge(inputId, transformationId, 0);
  39. }
  40. return Collections.singleton(transformationId);
  41. }

6.StreamGraph#addEdge->StreamGraph#addEdgeInternal

  1. private void addEdgeInternal(Integer upStreamVertexID,
  2. Integer downStreamVertexID,
  3. int typeNumber,
  4. StreamPartitioner<?> partitioner,
  5. List<String> outputNames,
  6. OutputTag outputTag,
  7. ShuffleMode shuffleMode) {
  8. if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
  9. int virtualId = upStreamVertexID;
  10. upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
  11. if (outputTag == null) {
  12. outputTag = virtualSideOutputNodes.get(virtualId).f1;
  13. }
  14. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
  15. } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
  16. int virtualId = upStreamVertexID;
  17. upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
  18. if (partitioner == null) {
  19. partitioner = virtualPartitionNodes.get(virtualId).f1;
  20. }
  21. shuffleMode = virtualPartitionNodes.get(virtualId).f2;
  22. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
  23. } else {
  24. StreamNode upstreamNode = getStreamNode(upStreamVertexID);
  25. StreamNode downstreamNode = getStreamNode(downStreamVertexID);
  26. // If no partitioner was specified and the parallelism of upstream and downstream
  27. // operator matches use forward partitioning, use rebalance otherwise.
  28. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  29. partitioner = new ForwardPartitioner<Object>();
  30. } else if (partitioner == null) {
  31. partitioner = new RebalancePartitioner<Object>();
  32. }
  33. if (partitioner instanceof ForwardPartitioner) {
  34. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  35. throw new UnsupportedOperationException("Forward partitioning does not allow " +
  36. "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
  37. ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
  38. " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  39. }
  40. }
  41. if (shuffleMode == null) {
  42. shuffleMode = ShuffleMode.UNDEFINED;
  43. }
  44. StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
  45. partitioner, outputTag, shuffleMode);
  46. getStreamNode(edge.getSourceId()).addOutEdge(edge);
  47. getStreamNode(edge.getTargetId()).addInEdge(edge);
  48. }
  49. }

7.最终StreamGraph效果

Nodes的数据如下:

  1. {
  2. "nodes": [
  3. {
  4. "id": 1,
  5. "type": "Source: Socket Stream",
  6. "pact": "Data Source",
  7. "contents": "Source: Socket Stream",
  8. "parallelism": 1
  9. },
  10. {
  11. "id": 2,
  12. "type": "Flat Map",
  13. "pact": "Operator",
  14. "contents": "Flat Map",
  15. "parallelism": 4,
  16. "predecessors": [
  17. {
  18. "id": 1,
  19. "ship_strategy": "REBALANCE",
  20. "side": "second"
  21. }
  22. ]
  23. },
  24. {
  25. "id": 4,
  26. "type": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)",
  27. "pact": "Operator",
  28. "contents": "Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)",
  29. "parallelism": 4,
  30. "predecessors": [
  31. {
  32. "id": 2,
  33. "ship_strategy": "HASH",
  34. "side": "second"
  35. }
  36. ]
  37. },
  38. {
  39. "id": 5,
  40. "type": "Sink: Print to Std. Out",
  41. "pact": "Data Sink",
  42. "contents": "Sink: Print to Std. Out",
  43. "parallelism": 1,
  44. "predecessors": [
  45. {
  46. "id": 4,
  47. "ship_strategy": "REBALANCE",
  48. "side": "second"
  49. }
  50. ]
  51. }
  52. ]
  53. }

画图如下

总览

本期涉及到的代码部分流程图总览如下:

我们下期见!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/659842
推荐阅读
相关标签
  

闽ICP备14008679号