赞
踩
目录
这里简单介绍一下Flink流图的一些基本概念和过程,详情可以看Flink基础概念。根据不同图的生成顺序,主要是分为4层:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图。具体步骤如下:
StreamGraph作为Flink最上层的逻辑封装可以理解为用户API的转化的逻辑层,主要是把用户编写的Transformation转换成StreamNode并生成指向上下游的StreamEdge并装载进StreamGraph。接下来主要以Yarn模式为例子。
StreamNode和StreamEdge是StreamGraph的核心数据结构对象。
StreamNode是StreamGraph中的节点,也就是流程序中的算子。一个StreamNode表示一个算子,即便是Source和Sink也是以StreamNode表示,只不过因为是表示输入输出所以有特定称呼。StreamNode封装了算子的其他关键属性,比如其并行度、分区信息、输入和输出类型的序列化器等。
而StreamNode分为实体和虚拟两种。因为StreamNode是转换而来的,但并非所有转换操作都具有实际的物理意义(即物理上对应具体的算子),比如分区(Partition)、分割/选择(Select)和合并(Union)不会在StreamGraph中创建实际的节点,而是创建虚拟节点,该节点包含特定的属性。虚拟StreamNode节点的信息不会在StreamGraph中显示,而是存储到了对应的转换边(StreamEdge)上。
StreamEdge用于连接两个StreamNode,一个StreamNode可以有多个入边、出边。StreamEdge中存储了分区器、旁路输出等信息。
StreamEdge包含源StreamNode(使用sourceVertex属性表示)和目的StreamNode(使用targetVertex属性表示)。StreamNode中存储了与其连接的入边集合和出边集合,用inEdges和outEdges表示。
StreamNode源码:
- /**
- * Class representing the operators in the streaming programs, with all their properties.
- */
- @Internal
- public class StreamNode {
-
- private final int id;
- private int parallelism;
- /**
- * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
- * dynamic scaling and the number of key groups used for partitioned state.
- */
- private int maxParallelism;
- private ResourceSpec minResources = ResourceSpec.DEFAULT;
- private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
- private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights = new HashMap<>();
- private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
- private long bufferTimeout;
- private final String operatorName;
- private @Nullable String slotSharingGroup;
- private @Nullable String coLocationGroup;
- private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
- private TypeSerializer<?> stateKeySerializer;
-
- private StreamOperatorFactory<?> operatorFactory;
- private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
- private TypeSerializer<?> typeSerializerOut;
-
- // 入边集合和出边集合
- private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
- private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
-
- private final Class<? extends AbstractInvokable> jobVertexClass;
-
- private InputFormat<?, ?> inputFormat;
- private OutputFormat<?> outputFormat;
-
- private String transformationUID;
- private String userHash;
- private boolean sortedInputs = false;
-
- .....
- // 给StreamNode添加入边和出边,即往入边出边集合直接add
-
- public void addInEdge(StreamEdge inEdge) {
- if (inEdge.getTargetId() != getId()) {
- throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
- } else {
- inEdges.add(inEdge);
- }
- }
-
- public void addOutEdge(StreamEdge outEdge) {
- if (outEdge.getSourceId() != getId()) {
- throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
- } else {
- outEdges.add(outEdge);
- }
- }
-
- ....
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
StreamEdge源码:
- /**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
- @Internal
- public class StreamEdge implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;
-
- private final String edgeId;
- // 源节点和目的节点
- private final int sourceId;
- private final int targetId;
-
- /**
- * The type number of the input for co-tasks.
- */
- private final int typeNumber;
- /**
- * The side-output tag (if any) of this {@link StreamEdge}.
- */
- // 侧输出流标签
- private final OutputTag outputTag;
-
- /**
- * The {@link StreamPartitioner} on this {@link StreamEdge}.
- */
- // 分区器
- private StreamPartitioner<?> outputPartitioner;
-
- /**
- * The name of the operator in the source vertex.
- */
- private final String sourceOperatorName;
-
- /**
- * The name of the operator in the target vertex.
- */
- private final String targetOperatorName;
- // shuffle模式定义了算子之间的数据交换方式
- private final ShuffleMode shuffleMode;
-
- private long bufferTimeout;
-
- public StreamEdge(
- StreamNode sourceVertex,
- StreamNode targetVertex,
- int typeNumber,
- long bufferTimeout,
- StreamPartitioner<?> outputPartitioner,
- OutputTag outputTag,
- ShuffleMode shuffleMode) {
-
- this.sourceId = sourceVertex.getId();
- this.targetId = targetVertex.getId();
- this.typeNumber = typeNumber;
- this.bufferTimeout = bufferTimeout;
- this.outputPartitioner = outputPartitioner;
- this.outputTag = outputTag;
- this.sourceOperatorName = sourceVertex.getOperatorName();
- this.targetOperatorName = targetVertex.getOperatorName();
- this.shuffleMode = checkNotNull(shuffleMode);
- this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + outputPartitioner;
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
当客户端submit脚本上传jar包之后,由Flink获取该jar包,并且通过反射调用用户的main函数。
- //过程比较多,我尽量写的详细点……
- //主要是提交函数,CliFrontend是程序的提交的入口,重点方法是cli.parseAndRun(args)
- public static void main(final String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
-
- // 1. find the configuration directory
- // 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
- final String configurationDirectory = getConfigurationDirectoryFromEnv();
-
- // 2. load the global configuration
- // 2. 加载全局conf配置:
- // "taskmanager.memory.process.size" -> "1728m"
- // "parallelism.default" -> "1"
- // "jobmanager.execution.failover-strategy" -> "region"
- // "jobmanager.rpc.address" -> "localhost"
- // "taskmanager.numberOfTaskSlots" -> "1"
- // "jobmanager.memory.process.size" -> "1600m"
- // "jobmanager.rpc.port" -> "6123"
- final Configuration configuration =
- GlobalConfiguration.loadConfiguration(configurationDirectory);
-
- // 3. load the custom command lines
- // 3. 加载自定义参数
- final List<CustomCommandLine> customCommandLines =
- loadCustomCommandLines(configuration, configurationDirectory);
-
- try {
- // 构建CliFrontend : GenericCLI > flinkYarnSessionCLI > DefaultCLI
- final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
-
- SecurityUtils.install(new SecurityConfiguration(cli.configuration));
-
- // 使用parseAndRun 提交指令
- int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
- System.exit(retCode);
-
-
- } catch (Throwable t) {
- final Throwable strippedThrowable =
- ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
- LOG.error("Fatal error while running command line interface.", strippedThrowable);
- strippedThrowable.printStackTrace();
- System.exit(31);
- }
- }
-
-
- //之后在parseAndRun(args)这个函数,会根据请求的命令的不同调用不同的方法,例如run,stop等等
- //支持的命令
- /**
- *
- * // actions
- * private static final String ACTION_RUN = "run";
- * private static final String ACTION_RUN_APPLICATION = "run-application";
- * private static final String ACTION_INFO = "info";
- * private static final String ACTION_LIST = "list";
- * private static final String ACTION_CANCEL = "cancel";
- * private static final String ACTION_STOP = "stop";
- * private static final String ACTION_SAVEPOINT = "savepoint";
- */
-
- //因为我们是提交job,所以调用的是CliFrontend.run函数
- //在这个函数里面主要是确定执行Flink的方法/环境/程序等等信息,
- //通过CliFrontend.executeProgram(effectiveConfiguration, program)
- //然后交由ClientUtils工具类提交任务
- protected void executeProgram(final Configuration configuration, final PackagedProgram program)
- throws ProgramInvocationException {
- ClientUtils.executeProgram(
- new DefaultExecutorServiceLoader(), configuration, program, false, false);
- }
-
- //由ClientUtils.executeProgram(跟上一个方法名是一样的,只不过是类不同),构建程序的执行环境/类加载器,开始准备执行...
- // 执行程序代码
- public static void executeProgram(
- PipelineExecutorServiceLoader executorServiceLoader,
- Configuration configuration,
- PackagedProgram program,
- boolean enforceSingleJobExecution,
- boolean suppressSysout)
- throws ProgramInvocationException {
- checkNotNull(executorServiceLoader);
-
- // 获取用户了加载器. : FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@3439
- final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
-
- // 缓存当前类加载器...
- final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-
- try {
- // 设置类加载器为用户指定的类加载器..
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
- //log info : Starting program (detached: false)
- LOG.info(
- "Starting program (detached: {})",
- !configuration.getBoolean(DeploymentOptions.ATTACHED));
-
- // 获取用户代码中的环境....
- // getExecutionEnvironment
- ContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- StreamContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- try {
- // 通过反射的方式, 调用用户程序的mian方法...
- program.invokeInteractiveModeForExecution();
- } finally {
- ContextEnvironment.unsetAsContext();
- StreamContextEnvironment.unsetAsContext();
- }
- } finally {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
-
- //最后的最后,通过PackagedProgram.invokeInteractiveModeForExecution
- //这里是通过调用底层的callMainMethod方法,通过反射的方式去调用main方法。
- //mainMethod.invoke(null, (Object) args)到这里才是最终开始执行。
- /**
- * This method assumes that the context environment is prepared, or the execution will be a
- * local execution by default.
- */
- public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
-
- // mainClass: class org.apache.flink.streaming.examples.socket.SocketWindowWordCount
- // args
- // 0 = "--port"
- // 1 = "9999"
-
-
- callMainMethod(mainClass, args);
- }
-
-
-
- // class org.apache.flink.streaming.examples.socket.SocketWindowWordCount args : --port 9999
- private static void callMainMethod(Class<?> entryClass, String[] args)
- throws ProgramInvocationException {
- Method mainMethod;
- if (!Modifier.isPublic(entryClass.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " must be public.");
- }
- // public static void org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(java.lang.String[]) throws java.lang.Exception
- try {
- mainMethod = entryClass.getMethod("main", String[].class);
- } catch (NoSuchMethodException e) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " has no main(String[]) method.");
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "Could not look up the main(String[]) method from the class "
- + entryClass.getName()
- + ": "
- + t.getMessage(),
- t);
- }
-
- if (!Modifier.isStatic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-static main method.");
- }
- if (!Modifier.isPublic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-public main method.");
- }
- // 开始执行 !!!!!!!!!
- try {
- mainMethod.invoke(null, (Object) args);
- } catch (IllegalArgumentException e) {
- throw new ProgramInvocationException(
- "Could not invoke the main method, arguments are not matching.", e);
- } catch (IllegalAccessException e) {
- throw new ProgramInvocationException(
- "Access to the main method was denied: " + e.getMessage(), e);
- } catch (InvocationTargetException e) {
- Throwable exceptionInMethod = e.getTargetException();
- if (exceptionInMethod instanceof Error) {
- throw (Error) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramParametrizationException) {
- throw (ProgramParametrizationException) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramInvocationException) {
- throw (ProgramInvocationException) exceptionInMethod;
- } else {
- throw new ProgramInvocationException(
- "The main method caused an error: " + exceptionInMethod.getMessage(),
- exceptionInMethod);
- }
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "An error occurred while invoking the program's main method: " + t.getMessage(),
- t);
- }
- }
-
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
当mainMethod.invoke开始执行的时候,各个operator会生成对应的Transformation等封装的逻辑实例,直到运行到StreamExecutionEnvironment.execute()后,才开始懒执行。类似于Spark中的action算子,才开始真正的执行代码。
- //调用getStreamGraph函数
- public JobExecutionResult execute(String jobName) throws Exception {
- Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
- //生成StreamGraph
- return execute(getStreamGraph(jobName));
- }
-
- // 这里主要是生成StreamGraph,其中使用StreamGraphGenerator.generate函数
- /**
- * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
- * clears previously registered {@link Transformation transformations}.
- *
- * @param jobName Desired name of the job
- * @return The streamgraph representing the transformations
- */
- @Internal
- public StreamGraph getStreamGraph(String jobName) {
- return getStreamGraph(jobName, true);
- }
-
- /**
- * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the streaming job
- * with the option to clear previously registered {@link Transformation transformations}. Clearing the
- * transformations allows, for example, to not re-execute the same operations when calling
- * {@link #execute()} multiple times.
- *
- * @param jobName Desired name of the job
- * @param clearTransformations Whether or not to clear previously registered transformations
- * @return The streamgraph representing the transformations
- */
- @Internal
- public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
- StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
- if (clearTransformations) {
- this.transformations.clear();
- }
- return streamGraph;
- }
-
-
- public StreamGraph generate() {
- //生成StreamGraph实例
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- //判断执行模式
- shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
- // 配置StreamGraph
- configureStreamGraph(streamGraph);
-
- alreadyTransformed = new HashMap<>();
- //遍历所有的转换
- for (Transformation<?> transformation: transformations) {
- //生成streamNode 和 streamEdge
- transform(transformation);
- }
- .........
- }
-
- //最终根据transform(transformtaion),生成StreamGraph
- //其中transform函数将会调用translateInternal进行生成实例。在后续版本中是在transformFeedback函数中,调用addEdge函数进行StreamEdges的链接
- private Collection<Integer> translateInternal(
- final OneInputTransformation<IN, OUT> transformation,
- final Context context) {
- checkNotNull(transformation);
- checkNotNull(context);
-
- final StreamGraph streamGraph = context.getStreamGraph();
- final String slotSharingGroup = context.getSlotSharingGroup();
- final int transformationId = transformation.getId();
- final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
- //生成StreamNode,并添加到StreamGraph的streamNodesMap中
- streamGraph.addOperator(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- transformation.getOperatorFactory(),
- transformation.getInputType(),
- transformation.getOutputType(),
- transformation.getName());
-
- .......
-
- for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
- //生成Edge并把该edge添加到自己的上下游streamNode中
- streamGraph.addEdge(inputId, transformationId, 0);
- }
- }
-
-
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里说明一下,在生成StreamGraph的时候,其中有transformation参数,这个参数主要是在StreamGraphGenerator.generate(this, transformations)的时候进行传递的。是通过protected final List<StreamTransformation<?>> transformations = new ArrayList<>();产生。每一个operator算子都会对应一个OutputStreamOperator,然后在函数中调用transform函数,并且进行addOperator(resultTransform),把算子添加到transformation中完成赋值。
- public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
- OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operator,
- outTypeInfo,
- environment.getParallelism());
- ...
- getExecutionEnvironment().addOperator(resultTransform);
-
- return returnStream;
- }
- public <IN, OUT> void addOperator(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- StreamOperatorFactory<OUT> operatorFactory,
- TypeInformation<IN> inTypeInfo,
- TypeInformation<OUT> outTypeInfo,
- String operatorName) {
- //后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
- //比如Map函数对应的OneInputStreamTask.class
- Class<? extends AbstractInvokable> invokableClass =
- operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
- addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
- outTypeInfo, operatorName, invokableClass);
- }
-
- protected StreamNode addNode(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- Class<? extends AbstractInvokable> vertexClass,
- StreamOperatorFactory<?> operatorFactory,
- String operatorName) {
-
- if (streamNodes.containsKey(vertexID)) {
- throw new RuntimeException("Duplicate vertexID " + vertexID);
- }
-
- //生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
- // 里面封装了用户的userFunction)
- StreamNode vertex = new StreamNode(
- vertexID,
- slotSharingGroup,
- coLocationGroup,
- operatorFactory,
- operatorName,
- vertexClass);
-
- streamNodes.put(vertexID, vertex);
- .....
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- private void addEdgeInternal(Integer upStreamVertexID,
- Integer downStreamVertexID,
- int typeNumber,
- StreamPartitioner<?> partitioner,
- List<String> outputNames,
- OutputTag outputTag,
- ShuffleMode shuffleMode) {
-
- //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
- if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
- //outputTag标识一个sideout流
- if (outputTag == null) {
- outputTag = virtualSideOutputNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
- //partition类型的transformation同上
- } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
- if (partitioner == null) {
- partitioner = virtualPartitionNodes.get(virtualId).f1;
- }
- shuffleMode = virtualPartitionNodes.get(virtualId).f2;
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
- } else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream and downstream
- // operator matches use forward partitioning, use rebalance otherwise.
- // 分区器由上下游的并行度是否一致决定
- // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
- // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
-
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
-
- //决定Operator是否可chain(!=batch)以及ResultPartitionType的类型
- //通常transformation的shuffleMode = UNDEFINED(包括partition类型的transformation)
- //此时ResultPartitionType的类型将由GlobalDataExchangeMode决定(非batch模式下=ALL_EDGES_PIPELINED->ResultPartitionType=PIPELINED_BOUNDED)
- if (shuffleMode == null) {
- shuffleMode = ShuffleMode.UNDEFINED;
- }
-
- //生成StreamEdge 核心属性为上下游节点和分区器及shuffleMode
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
- partitioner, outputTag, shuffleMode);
-
- //把该edge添加到自己的上下游streamNode中
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
addOperator:构建streamNodes集合
addEdge:构建边
addEdgeInternal:构建边,在该方法中,决定分区的策略,如果没有指定分区则按照上游和下游算子的并行度是否相同决定是本地分发,还是均匀分发
getJobGraph:生成JobGraph
getStreamingPlanAsJSON:StreamGraph字符串表示形式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。