当前位置:   article > 正文

Flink1.12源码解读——StreamGraph执行图构建过程_graphstream的jar文件

graphstream的jar文件

        Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。

Flink分为四种执行图,本章主要解析StreamGraph的生成计划

本章及后续源码解读环境以生产为主:运行模式:OnYarn,HA模式:ZK,Mode:Streaming,关键逻辑解释我会备注到代码上(灰色字体)勿忽视。

StreamGraph作为Flink最上层的逻辑封装可以理解为用户API的转化的逻辑层,它同JobGraph一样最初是在Client端生成(集群模式)但不一样的是不会像JobGraph那样有链等加工操作,主要是把用户编写的Transformations转换成StreamNode并生成指向上下游的StreamEdge并装载进StreamGraph,过程并不复杂。

以OnYarn为例,当我们在集群环境提交submit脚本入口函数:

主要流程为flink获取用户jar包,反射并调用用户main函数

  1. /**
  2. * Submits the job based on the arguments.
  3. */
  4. public static void main(final String[] args) {
  5. ........
  6. try {
  7. final CliFrontend cli = new CliFrontend(
  8. configuration,
  9. customCommandLines);
  10. SecurityUtils.install(new SecurityConfiguration(cli.configuration));
  11. int retCode = SecurityUtils.getInstalledContext()
  12. //根据参数匹配对应的执行方式
  13. .runSecured(() -> cli.parseAndRun(args));
  14. System.exit(retCode);
  15. }
  1. public static void executeProgram(
  2. PipelineExecutorServiceLoader executorServiceLoader,
  3. Configuration configuration,
  4. PackagedProgram program,
  5. boolean enforceSingleJobExecution,
  6. boolean suppressSysout) throws ProgramInvocationException {
  7. ...
  8. try {//执行主函数
  9. program.invokeInteractiveModeForExecution();
  1. private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
  2. 。。。。
  3. try {//执行主函数
  4. mainMethod.invoke(null, (Object) args);

当上述代码执行到mainMethod.inoke后就会进入到用户的主程序中开始运行用户main函数的逻辑

跟Spark一样,Flink也是懒执行,用户逻辑代码会在Flink封装并执行完所有流程图后才开始运行。

以wc为例,各个operator会生成对应的Transformation(比如Map对应的OneInputTransformation)等Flink封装的逻辑实例,但目前并没有真正运行用户代码,而是直到运行到StreamExecutionEnvironment.execute()后,才开始懒执行

Map函数:这里看得出核心逻辑都封装成了OneInputTransformation,包括处理用户Map逻辑的代码StreamTask(这里的processElement主要是output到input的数据交互逻辑,后续章节会展开说),添加到StreamExecutionEnvironment的List<Transformation<?>> transformations中的Transformation会依次生成对应的StreamNode

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
  2. //生成StreamMap
  3. return transform("Map", outputType, new StreamMap<>(clean(mapper)));
  4. }
  5. public class StreamMap<IN, OUT>
  6. extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
  7. implements OneInputStreamOperator<IN, OUT> {
  8. private static final long serialVersionUID = 1L;
  9. public StreamMap(MapFunction<IN, OUT> mapper) {
  10. super(mapper);
  11. chainingStrategy = ChainingStrategy.ALWAYS;
  12. }
  13. @Override
  14. //处理逻辑
  15. public void processElement(StreamRecord<IN> element) throws Exception {
  16. output.collect(element.replace(userFunction.map(element.getValue())));
  17. }
  18. }
  19. protected <R> SingleOutputStreamOperator<R> doTransform(
  20. String operatorName,
  21. TypeInformation<R> outTypeInfo,
  22. StreamOperatorFactory<R> operatorFactory) {
  23. // read the output type of the input Transform to coax out errors about MissingTypeInfo
  24. transformation.getOutputType();
  25. //operatorFactory 列:常用的用户自定义的算子(map,filter等)生成SimpleUdfStreamOperatorFactory
  26. OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
  27. this.transformation,
  28. operatorName,
  29. operatorFactory,
  30. outTypeInfo,
  31. environment.getParallelism());
  32. @SuppressWarnings({"unchecked", "rawtypes"})
  33. SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
  34. getExecutionEnvironment().addOperator(resultTransform);
  35. return returnStream;
  36. }

当执行到StreamExecutionEnvironment.execute()后,开始触发Flink的执行程序

  1. public JobExecutionResult execute(String jobName) throws Exception {
  2. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  3. //生成StreamGraph
  4. return execute(getStreamGraph(jobName));
  5. }
  6. public StreamGraph generate() {
  7. //生成StreamGraph实例
  8. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  9. //判断执行模式
  10. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  11. configureStreamGraph(streamGraph);
  12. alreadyTransformed = new HashMap<>();
  13. for (Transformation<?> transformation: transformations) {
  14. //生成streamNode 和 streamEdge
  15. transform(transformation);
  16. }
  17. private Collection<Integer> translateInternal(
  18. final OneInputTransformation<IN, OUT> transformation,
  19. final Context context) {
  20. checkNotNull(transformation);
  21. checkNotNull(context);
  22. final StreamGraph streamGraph = context.getStreamGraph();
  23. final String slotSharingGroup = context.getSlotSharingGroup();
  24. final int transformationId = transformation.getId();
  25. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  26. //生成StreamNode,并添加到StreamGraph的streamNodesMap中
  27. streamGraph.addOperator(
  28. transformationId,
  29. slotSharingGroup,
  30. transformation.getCoLocationGroupKey(),
  31. transformation.getOperatorFactory(),
  32. transformation.getInputType(),
  33. transformation.getOutputType(),
  34. transformation.getName());
  35. .......
  36. for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
  37. //生成Edge并把该edge添加到自己的上下游streamNode中
  38. streamGraph.addEdge(inputId, transformationId, 0);
  39. }

生成StreamNode 

  1. public <IN, OUT> void addOperator(
  2. Integer vertexID,
  3. @Nullable String slotSharingGroup,
  4. @Nullable String coLocationGroup,
  5. StreamOperatorFactory<OUT> operatorFactory,
  6. TypeInformation<IN> inTypeInfo,
  7. TypeInformation<OUT> outTypeInfo,
  8. String operatorName) {
  9. //后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
  10. //比如Map函数对应的OneInputStreamTask.class
  11. Class<? extends AbstractInvokable> invokableClass =
  12. operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
  13. addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
  14. outTypeInfo, operatorName, invokableClass);
  15. }
  16. protected StreamNode addNode(
  17. Integer vertexID,
  18. @Nullable String slotSharingGroup,
  19. @Nullable String coLocationGroup,
  20. Class<? extends AbstractInvokable> vertexClass,
  21. StreamOperatorFactory<?> operatorFactory,
  22. String operatorName) {
  23. if (streamNodes.containsKey(vertexID)) {
  24. throw new RuntimeException("Duplicate vertexID " + vertexID);
  25. }
  26. //生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
  27. // 里面封装了用户的userFunction)
  28. StreamNode vertex = new StreamNode(
  29. vertexID,
  30. slotSharingGroup,
  31. coLocationGroup,
  32. operatorFactory,
  33. operatorName,
  34. vertexClass);
  35. streamNodes.put(vertexID, vertex);

生成Edge

  1. private void addEdgeInternal(Integer upStreamVertexID,
  2. Integer downStreamVertexID,
  3. int typeNumber,
  4. StreamPartitioner<?> partitioner,
  5. List<String> outputNames,
  6. OutputTag outputTag,
  7. ShuffleMode shuffleMode) {
  8. //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
  9. if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
  10. int virtualId = upStreamVertexID;
  11. upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
  12. //outputTag标识一个sideout流
  13. if (outputTag == null) {
  14. outputTag = virtualSideOutputNodes.get(virtualId).f1;
  15. }
  16. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
  17. //partition类型的transformation同上
  18. } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
  19. int virtualId = upStreamVertexID;
  20. upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
  21. if (partitioner == null) {
  22. partitioner = virtualPartitionNodes.get(virtualId).f1;
  23. }
  24. shuffleMode = virtualPartitionNodes.get(virtualId).f2;
  25. addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
  26. } else {
  27. StreamNode upstreamNode = getStreamNode(upStreamVertexID);
  28. StreamNode downstreamNode = getStreamNode(downStreamVertexID);
  29. // If no partitioner was specified and the parallelism of upstream and downstream
  30. // operator matches use forward partitioning, use rebalance otherwise.
  31. // 分区器由上下游的并行度是否一致决定
  32. // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
  33. // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
  34. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  35. partitioner = new ForwardPartitioner<Object>();
  36. } else if (partitioner == null) {
  37. partitioner = new RebalancePartitioner<Object>();
  38. }
  39. if (partitioner instanceof ForwardPartitioner) {
  40. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  41. throw new UnsupportedOperationException("Forward partitioning does not allow " +
  42. "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
  43. ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
  44. " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  45. }
  46. }
  47. //决定Operator是否可chain(!=batch)以及ResultPartitionType的类型
  48. //通常transformation的shuffleMode = UNDEFINED(包括partition类型的transformation)
  49. //此时ResultPartitionType的类型将由GlobalDataExchangeMode决定(非batch模式下=ALL_EDGES_PIPELINED->ResultPartitionType=PIPELINED_BOUNDED)
  50. if (shuffleMode == null) {
  51. shuffleMode = ShuffleMode.UNDEFINED;
  52. }
  53. //生成StreamEdge 核心属性为上下游节点和分区器及shuffleMode
  54. StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
  55. partitioner, outputTag, shuffleMode);
  56. //把该edge添加到自己的上下游streamNode中
  57. getStreamNode(edge.getSourceId()).addOutEdge(edge);
  58. getStreamNode(edge.getTargetId()).addInEdge(edge);
  59. }
  60. }

       至此StreamGraph构建完毕,看得出StreamGraph整体构造过程比较清晰不复杂,仅仅是Flink对用户API以及部分配置的描述并存储在自己封装的数据结构中而已,这些数据结构会在后面构建JobGraph时取出调用,比如遍历出缓存的StreamEdge 符合条件的Operator会Chian在一起合并成JobVertex等等,而JobGraph所存储封装的数据结构又会在构造ExecutionGraph时被取出调用直到构造出物理执行图,从Client端到Cluster端整个过程会开始慢慢变得复杂 ,中间也会涉及到不同的外部依赖也会生成负责调度,监控,物理执行的组件等等,以上整个过程会放在后面章节解析。

       欢迎任何技术上的交流,Thanks..

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/794449
推荐阅读
相关标签
  

闽ICP备14008679号