当前位置:   article > 正文

Flink源码解读(一):StreamGraph源码解读_flink streamgraph 源码解读

flink streamgraph 源码解读

目录

Flink流图基本概念

StreamGraph源码

StreamGraph的核心对象

StreamNode

StreamEdge

StreamNode和StreamEdge之间的关系

上传jar包

生成StreamGraph

生成StreamNode

生成Edge

核心方法

参考


Flink流图基本概念

这里简单介绍一下Flink流图的一些基本概念和过程,详情可以看Flink基础概念。根据不同图的生成顺序,主要是分为4层:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图。具体步骤如下:

  1. Client将作业的应用程序代码生成StreamGraph(在批处理模式下生成的是OptimizedPlan)。StreamGraph是表示流处理程序拓扑的数据结构,描述算子与算子之间逻辑上的拓扑关系,封装了生成作业图(JobGraph)的必要信息。
  2. 将StreamGraph转换为JobGraph。JobGraph表示JobManager可接受的低级别Flink数据流程序。所有来自更高级别API的程序都被转换成JobGraph。JobGraph是一个顶点和中间结果的图,这些顶点和中间结果连接在一起形成一个DAG。JobGraph定义了作业范围的配置信息,而每个顶点和中间结果定义了具体操作和中间数据的特征
  3. 将JobGraph提交给Dispatcher。Dispatcher组件负责接收作业提交、在故障时恢复作业、监控Flink会话集群的状态等
  4. Dispatcher根据JobGraph创建相应的JobManager并运行。
  5. JobManager将JobGraph转换为ExecutionGraph。ExecutionGraph是协调数据流的分布式执行的中心数据结构,它保留了每个并行任务、每个中间流以及它们之间的通信信息
  6. JobManager将ExecutionGraph转换为物理执行图。

StreamGraph源码

 StreamGraph作为Flink最上层的逻辑封装可以理解为用户API的转化的逻辑层,主要是把用户编写的Transformation转换成StreamNode并生成指向上下游的StreamEdge并装载进StreamGraph。接下来主要以Yarn模式为例子。

StreamGraph的核心对象

StreamNode和StreamEdge是StreamGraph的核心数据结构对象。

StreamNode

StreamNode是StreamGraph中的节点,也就是流程序中的算子。一个StreamNode表示一个算子,即便是Source和Sink也是以StreamNode表示,只不过因为是表示输入输出所以有特定称呼。StreamNode封装了算子的其他关键属性,比如其并行度、分区信息、输入和输出类型的序列化器等。

StreamNode分为实体和虚拟两种。因为StreamNode是转换而来的,但并非所有转换操作都具有实际的物理意义(即物理上对应具体的算子),比如分区(Partition)、分割/选择(Select)和合并(Union)不会在StreamGraph中创建实际的节点,而是创建虚拟节点,该节点包含特定的属性。虚拟StreamNode节点的信息不会在StreamGraph中显示,而是存储到了对应的转换边(StreamEdge)上

StreamEdge

StreamEdge用于连接两个StreamNode,一个StreamNode可以有多个入边、出边。StreamEdge中存储了分区器、旁路输出等信息。

StreamNode和StreamEdge之间的关系

StreamEdge包含源StreamNode(使用sourceVertex属性表示)和目的StreamNode(使用targetVertex属性表示)。StreamNode中存储了与其连接的入边集合和出边集合,用inEdges和outEdges表示。

StreamNode源码:

  1. /**
  2. * Class representing the operators in the streaming programs, with all their properties.
  3. */
  4. @Internal
  5. public class StreamNode {
  6. private final int id;
  7. private int parallelism;
  8. /**
  9. * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
  10. * dynamic scaling and the number of key groups used for partitioned state.
  11. */
  12. private int maxParallelism;
  13. private ResourceSpec minResources = ResourceSpec.DEFAULT;
  14. private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
  15. private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights = new HashMap<>();
  16. private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
  17. private long bufferTimeout;
  18. private final String operatorName;
  19. private @Nullable String slotSharingGroup;
  20. private @Nullable String coLocationGroup;
  21. private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
  22. private TypeSerializer<?> stateKeySerializer;
  23. private StreamOperatorFactory<?> operatorFactory;
  24. private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
  25. private TypeSerializer<?> typeSerializerOut;
  26. // 入边集合和出边集合
  27. private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
  28. private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
  29. private final Class<? extends AbstractInvokable> jobVertexClass;
  30. private InputFormat<?, ?> inputFormat;
  31. private OutputFormat<?> outputFormat;
  32. private String transformationUID;
  33. private String userHash;
  34. private boolean sortedInputs = false;
  35. .....
  36. // 给StreamNode添加入边和出边,即往入边出边集合直接add
  37. public void addInEdge(StreamEdge inEdge) {
  38. if (inEdge.getTargetId() != getId()) {
  39. throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
  40. } else {
  41. inEdges.add(inEdge);
  42. }
  43. }
  44. public void addOutEdge(StreamEdge outEdge) {
  45. if (outEdge.getSourceId() != getId()) {
  46. throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
  47. } else {
  48. outEdges.add(outEdge);
  49. }
  50. }
  51. ....
  52. }

StreamEdge源码:

  1. /**
  2. * An edge in the streaming topology. One edge like this does not necessarily
  3. * gets converted to a connection between two job vertices (due to
  4. * chaining/optimization).
  5. */
  6. @Internal
  7. public class StreamEdge implements Serializable {
  8. private static final long serialVersionUID = 1L;
  9. private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;
  10. private final String edgeId;
  11. // 源节点和目的节点
  12. private final int sourceId;
  13. private final int targetId;
  14. /**
  15. * The type number of the input for co-tasks.
  16. */
  17. private final int typeNumber;
  18. /**
  19. * The side-output tag (if any) of this {@link StreamEdge}.
  20. */
  21. // 侧输出流标签
  22. private final OutputTag outputTag;
  23. /**
  24. * The {@link StreamPartitioner} on this {@link StreamEdge}.
  25. */
  26. // 分区器
  27. private StreamPartitioner<?> outputPartitioner;
  28. /**
  29. * The name of the operator in the source vertex.
  30. */
  31. private final String sourceOperatorName;
  32. /**
  33. * The name of the operator in the target vertex.
  34. */
  35. private final String targetOperatorName;
  36. // shuffle模式定义了算子之间的数据交换方式
  37. private final ShuffleMode shuffleMode;
  38. private long bufferTimeout;
  39. public StreamEdge(
  40. StreamNode sourceVertex,
  41. StreamNode targetVertex,
  42. int typeNumber,
  43. long bufferTimeout,
  44. StreamPartitioner<?> outputPartitioner,
  45. OutputTag outputTag,
  46. ShuffleMode shuffleMode) {
  47. this.sourceId = sourceVertex.getId();
  48. this.targetId = targetVertex.getId();
  49. this.typeNumber = typeNumber;
  50. this.bufferTimeout = bufferTimeout;
  51. this.outputPartitioner = outputPartitioner;
  52. this.outputTag = outputTag;
  53. this.sourceOperatorName = sourceVertex.getOperatorName();
  54. this.targetOperatorName = targetVertex.getOperatorName();
  55. this.shuffleMode = checkNotNull(shuffleMode);
  56. this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + outputPartitioner;
  57. }
  58. }

上传jar包

当客户端submit脚本上传jar包之后,由Flink获取该jar包,并且通过反射调用用户的main函数。

  1. //过程比较多,我尽量写的详细点……
  2. //主要是提交函数,CliFrontend是程序的提交的入口,重点方法是cli.parseAndRun(args)
  3. public static void main(final String[] args) {
  4. EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
  5. // 1. find the configuration directory
  6. // 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
  7. final String configurationDirectory = getConfigurationDirectoryFromEnv();
  8. // 2. load the global configuration
  9. // 2. 加载全局conf配置:
  10. // "taskmanager.memory.process.size" -> "1728m"
  11. // "parallelism.default" -> "1"
  12. // "jobmanager.execution.failover-strategy" -> "region"
  13. // "jobmanager.rpc.address" -> "localhost"
  14. // "taskmanager.numberOfTaskSlots" -> "1"
  15. // "jobmanager.memory.process.size" -> "1600m"
  16. // "jobmanager.rpc.port" -> "6123"
  17. final Configuration configuration =
  18. GlobalConfiguration.loadConfiguration(configurationDirectory);
  19. // 3. load the custom command lines
  20. // 3. 加载自定义参数
  21. final List<CustomCommandLine> customCommandLines =
  22. loadCustomCommandLines(configuration, configurationDirectory);
  23. try {
  24. // 构建CliFrontend : GenericCLI > flinkYarnSessionCLI > DefaultCLI
  25. final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
  26. SecurityUtils.install(new SecurityConfiguration(cli.configuration));
  27. // 使用parseAndRun 提交指令
  28. int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
  29. System.exit(retCode);
  30. } catch (Throwable t) {
  31. final Throwable strippedThrowable =
  32. ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  33. LOG.error("Fatal error while running command line interface.", strippedThrowable);
  34. strippedThrowable.printStackTrace();
  35. System.exit(31);
  36. }
  37. }
  38. //之后在parseAndRun(args)这个函数,会根据请求的命令的不同调用不同的方法,例如run,stop等等
  39. //支持的命令
  40. /**
  41. *
  42. * // actions
  43. * private static final String ACTION_RUN = "run";
  44. * private static final String ACTION_RUN_APPLICATION = "run-application";
  45. * private static final String ACTION_INFO = "info";
  46. * private static final String ACTION_LIST = "list";
  47. * private static final String ACTION_CANCEL = "cancel";
  48. * private static final String ACTION_STOP = "stop";
  49. * private static final String ACTION_SAVEPOINT = "savepoint";
  50. */
  51. //因为我们是提交job,所以调用的是CliFrontend.run函数
  52. //在这个函数里面主要是确定执行Flink的方法/环境/程序等等信息,
  53. //通过CliFrontend.executeProgram(effectiveConfiguration, program)
  54. //然后交由ClientUtils工具类提交任务
  55. protected void executeProgram(final Configuration configuration, final PackagedProgram program)
  56. throws ProgramInvocationException {
  57. ClientUtils.executeProgram(
  58. new DefaultExecutorServiceLoader(), configuration, program, false, false);
  59. }
  60. //由ClientUtils.executeProgram(跟上一个方法名是一样的,只不过是类不同),构建程序的执行环境/类加载器,开始准备执行...
  61. // 执行程序代码
  62. public static void executeProgram(
  63. PipelineExecutorServiceLoader executorServiceLoader,
  64. Configuration configuration,
  65. PackagedProgram program,
  66. boolean enforceSingleJobExecution,
  67. boolean suppressSysout)
  68. throws ProgramInvocationException {
  69. checkNotNull(executorServiceLoader);
  70. // 获取用户了加载器. : FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@3439
  71. final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
  72. // 缓存当前类加载器...
  73. final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
  74. try {
  75. // 设置类加载器为用户指定的类加载器..
  76. Thread.currentThread().setContextClassLoader(userCodeClassLoader);
  77. //log info : Starting program (detached: false)
  78. LOG.info(
  79. "Starting program (detached: {})",
  80. !configuration.getBoolean(DeploymentOptions.ATTACHED));
  81. // 获取用户代码中的环境....
  82. // getExecutionEnvironment
  83. ContextEnvironment.setAsContext(
  84. executorServiceLoader,
  85. configuration,
  86. userCodeClassLoader,
  87. enforceSingleJobExecution,
  88. suppressSysout);
  89. StreamContextEnvironment.setAsContext(
  90. executorServiceLoader,
  91. configuration,
  92. userCodeClassLoader,
  93. enforceSingleJobExecution,
  94. suppressSysout);
  95. try {
  96. // 通过反射的方式, 调用用户程序的mian方法...
  97. program.invokeInteractiveModeForExecution();
  98. } finally {
  99. ContextEnvironment.unsetAsContext();
  100. StreamContextEnvironment.unsetAsContext();
  101. }
  102. } finally {
  103. Thread.currentThread().setContextClassLoader(contextClassLoader);
  104. }
  105. }
  106. //最后的最后,通过PackagedProgram.invokeInteractiveModeForExecution
  107. //这里是通过调用底层的callMainMethod方法,通过反射的方式去调用main方法。
  108. //mainMethod.invoke(null, (Object) args)到这里才是最终开始执行。
  109. /**
  110. * This method assumes that the context environment is prepared, or the execution will be a
  111. * local execution by default.
  112. */
  113. public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
  114. // mainClass: class org.apache.flink.streaming.examples.socket.SocketWindowWordCount
  115. // args
  116. // 0 = "--port"
  117. // 1 = "9999"
  118. callMainMethod(mainClass, args);
  119. }
  120. // class org.apache.flink.streaming.examples.socket.SocketWindowWordCount args : --port 9999
  121. private static void callMainMethod(Class<?> entryClass, String[] args)
  122. throws ProgramInvocationException {
  123. Method mainMethod;
  124. if (!Modifier.isPublic(entryClass.getModifiers())) {
  125. throw new ProgramInvocationException(
  126. "The class " + entryClass.getName() + " must be public.");
  127. }
  128. // public static void org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(java.lang.String[]) throws java.lang.Exception
  129. try {
  130. mainMethod = entryClass.getMethod("main", String[].class);
  131. } catch (NoSuchMethodException e) {
  132. throw new ProgramInvocationException(
  133. "The class " + entryClass.getName() + " has no main(String[]) method.");
  134. } catch (Throwable t) {
  135. throw new ProgramInvocationException(
  136. "Could not look up the main(String[]) method from the class "
  137. + entryClass.getName()
  138. + ": "
  139. + t.getMessage(),
  140. t);
  141. }
  142. if (!Modifier.isStatic(mainMethod.getModifiers())) {
  143. throw new ProgramInvocationException(
  144. "The class " + entryClass.getName() + " declares a non-static main method.");
  145. }
  146. if (!Modifier.isPublic(mainMethod.getModifiers())) {
  147. throw new ProgramInvocationException(
  148. "The class " + entryClass.getName() + " declares a non-public main method.");
  149. }
  150. // 开始执行 !!!!!!!!!
  151. try {
  152. mainMethod.invoke(null, (Object) args);
  153. } catch (IllegalArgumentException e) {
  154. throw new ProgramInvocationException(
  155. "Could not invoke the main method, arguments are not matching.", e);
  156. } catch (IllegalAccessException e) {
  157. throw new ProgramInvocationException(
  158. "Access to the main method was denied: " + e.getMessage(), e);
  159. } catch (InvocationTargetException e) {
  160. Throwable exceptionInMethod = e.getTargetException();
  161. if (exceptionInMethod instanceof Error) {
  162. throw (Error) exceptionInMethod;
  163. } else if (exceptionInMethod instanceof ProgramParametrizationException) {
  164. throw (ProgramParametrizationException) exceptionInMethod;
  165. } else if (exceptionInMethod instanceof ProgramInvocationException) {
  166. throw (ProgramInvocationException) exceptionInMethod;
  167. } else {
  168. throw new ProgramInvocationException(
  169. "The main method caused an error: " + exceptionInMethod.getMessage(),
  170. exceptionInMethod);
  171. }
  172. } catch (Throwable t) {
  173. throw new ProgramInvocationException(
  174. "An error occurred while invoking the program's main method: " + t.getMessage(),
  175. t);
  176. }
  177. }

当mainMethod.invoke开始执行的时候,各个operator会生成对应的Transformation等封装的逻辑实例,直到运行到StreamExecutionEnvironment.execute()后,才开始懒执行。类似于Spark中的action算子,才开始真正的执行代码。

生成StreamGraph

  1. //调用getStreamGraph函数
  2. public JobExecutionResult execute(String jobName) throws Exception {
  3. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  4. //生成StreamGraph
  5. return execute(getStreamGraph(jobName));
  6. }
  7. // 这里主要是生成StreamGraph,其中使用StreamGraphGenerator.generate函数
  8. /**
  9. * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
  10. * clears previously registered {@link Transformation transformations}.
  11. *
  12. * @param jobName Desired name of the job
  13. * @return The streamgraph representing the transformations
  14. */
  15. @Internal
  16. public StreamGraph getStreamGraph(String jobName) {
  17. return getStreamGraph(jobName, true);
  18. }
  19. /**
  20. * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the streaming job
  21. * with the option to clear previously registered {@link Transformation transformations}. Clearing the
  22. * transformations allows, for example, to not re-execute the same operations when calling
  23. * {@link #execute()} multiple times.
  24. *
  25. * @param jobName Desired name of the job
  26. * @param clearTransformations Whether or not to clear previously registered transformations
  27. * @return The streamgraph representing the transformations
  28. */
  29. @Internal
  30. public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  31. StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  32. if (clearTransformations) {
  33. this.transformations.clear();
  34. }
  35. return streamGraph;
  36. }
  37. public StreamGraph generate() {
  38. //生成StreamGraph实例
  39. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  40. //判断执行模式
  41. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  42. // 配置StreamGraph
  43. configureStreamGraph(streamGraph);
  44. alreadyTransformed = new HashMap<>();
  45. //遍历所有的转换
  46. for (Transformation<?> transformation: transformations) {
  47. //生成streamNode 和 streamEdge
  48. transform(transformation);
  49. }
  50. .........
  51. }
  52. //最终根据transform(transformtaion),生成StreamGraph
  53. //其中transform函数将会调用translateInternal进行生成实例。在后续版本中是在transformFeedback函数中,调用addEdge函数进行StreamEdges的链接
  54. private Collection<Integer> translateInternal(
  55. final OneInputTransformation<IN, OUT> transformation,
  56. final Context context) {
  57. checkNotNull(transformation);
  58. checkNotNull(context);
  59. final StreamGraph streamGraph = context.getStreamGraph();
  60. final String slotSharingGroup = context.getSlotSharingGroup();
  61. final int transformationId = transformation.getId();
  62. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  63. //生成StreamNode,并添加到StreamGraph的streamNodesMap中
  64. streamGraph.addOperator(
  65. transformationId,
  66. slotSharingGroup,
  67. transformation.getCoLocationGroupKey(),
  68. transformation.getOperatorFactory(),
  69. transformation.getInputType(),
  70. transformation.getOutputType(),
  71. transformation.getName());
  72. .......
  73. for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
  74. //生成Edge并把该edge添加到自己的上下游streamNode中
  75. streamGraph.addEdge(inputId, transformationId, 0);
  76. }
  77. }

这里说明一下,在生成StreamGraph的时候,其中有transformation参数,这个参数主要是在StreamGraphGenerator.generate(this, transformations)的时候进行传递的。是通过protected final List<StreamTransformation<?>> transformations = new ArrayList<>();产生。每一个operator算子都会对应一个OutputStreamOperator,然后在函数中调用transform函数,并且进行addOperator(resultTransform),把算子添加到transformation中完成赋值

  1. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
  2. OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
  3. this.transformation,
  4. operatorName,
  5. operator,
  6. outTypeInfo,
  7. environment.getParallelism());
  8. ...
  9. getExecutionEnvironment().addOperator(resultTransform);
  10. return returnStream;
  11. }

生成StreamNode

  1. public <IN, OUT> void addOperator(
  2. Integer vertexID,
  3. @Nullable String slotSharingGroup,
  4. @Nullable String coLocationGroup,
  5. StreamOperatorFactory<OUT> operatorFactory,
  6. TypeInformation<IN> inTypeInfo,
  7. TypeInformation<OUT> outTypeInfo,
  8. String operatorName) {
  9. //后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
  10. //比如Map函数对应的OneInputStreamTask.class
  11. Class<? extends AbstractInvokable> invokableClass =
  12. operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
  13. addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
  14. outTypeInfo, operatorName, invokableClass);
  15. }
  16. protected StreamNode addNode(
  17. Integer vertexID,
  18. @Nullable String slotSharingGroup,
  19. @Nullable String coLocationGroup,
  20. Class<? extends AbstractInvokable> vertexClass,
  21. StreamOperatorFactory<?> operatorFactory,
  22. String operatorName) {
  23. if (streamNodes.containsKey(vertexID)) {
  24. throw new RuntimeException("Duplicate vertexID " + vertexID);
  25. }
  26. //生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
  27. // 里面封装了用户的userFunction)
  28. StreamNode vertex = new StreamNode(
  29. vertexID,
  30. slotSharingGroup,
  31. coLocationGroup,
  32. operatorFactory,
  33. operatorName,
  34. vertexClass);
  35. streamNodes.put(vertexID, vertex);
  36. .....
  37. }

生成Edge

  1. private void addEdgeInternal(Integer upStreamVertexID,
  2. Integer downStreamVertexID,
  3. int typeNumber,
  4. StreamPartitioner<?> partitioner,
  5. List<String> outputNames,
  6. OutputTag outputTag,
  7. ShuffleMode shuffleMode) {
  8. //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
  9. if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
  10. int virtualId = upStreamVertexID;
  11. upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
  12. //outputTag标识一个sideout流
  13. if (outputTag == null) {
  14. outputTag = virtualSideOutputNodes.get(virtualId).f1;
  15. }
  16. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
  17. //partition类型的transformation同上
  18. } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
  19. int virtualId = upStreamVertexID;
  20. upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
  21. if (partitioner == null) {
  22. partitioner = virtualPartitionNodes.get(virtualId).f1;
  23. }
  24. shuffleMode = virtualPartitionNodes.get(virtualId).f2;
  25. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
  26. } else {
  27. StreamNode upstreamNode = getStreamNode(upStreamVertexID);
  28. StreamNode downstreamNode = getStreamNode(downStreamVertexID);
  29. // If no partitioner was specified and the parallelism of upstream and downstream
  30. // operator matches use forward partitioning, use rebalance otherwise.
  31. // 分区器由上下游的并行度是否一致决定
  32. // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
  33. // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
  34. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  35. partitioner = new ForwardPartitioner<Object>();
  36. } else if (partitioner == null) {
  37. partitioner = new RebalancePartitioner<Object>();
  38. }
  39. if (partitioner instanceof ForwardPartitioner) {
  40. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  41. throw new UnsupportedOperationException("Forward partitioning does not allow " +
  42. "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
  43. ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
  44. " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  45. }
  46. }
  47. //决定Operator是否可chain(!=batch)以及ResultPartitionType的类型
  48. //通常transformation的shuffleMode = UNDEFINED(包括partition类型的transformation)
  49. //此时ResultPartitionType的类型将由GlobalDataExchangeMode决定(非batch模式下=ALL_EDGES_PIPELINED->ResultPartitionType=PIPELINED_BOUNDED)
  50. if (shuffleMode == null) {
  51. shuffleMode = ShuffleMode.UNDEFINED;
  52. }
  53. //生成StreamEdge 核心属性为上下游节点和分区器及shuffleMode
  54. StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
  55. partitioner, outputTag, shuffleMode);
  56. //把该edge添加到自己的上下游streamNode中
  57. getStreamNode(edge.getSourceId()).addOutEdge(edge);
  58. getStreamNode(edge.getTargetId()).addInEdge(edge);
  59. }
  60. }

核心方法

addOperator:构建streamNodes集合
addEdge:构建边
addEdgeInternal:构建边,在该方法中,决定分区的策略,如果没有指定分区则按照上游和下游算子的并行度是否相同决定是本地分发,还是均匀分发
getJobGraph:生成JobGraph
getStreamingPlanAsJSON:StreamGraph字符串表示形式

参考

Flink1.12源码解读——StreamGraph执行图构建过程_ws0owws0ow的博客-CSDN博客

Flink运行架构详细讲解 - 程序员大本营

Flink之StreamGraph生成源码分析_陪你一起捡蛋壳的博客-CSDN博客_flink streamgraph

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/794381
推荐阅读
相关标签
  

闽ICP备14008679号