- /**
- * Class representing the operators in the streaming programs, with all their properties.
- */
- @Internal
- public class StreamNode {
- private final int id;
- private int parallelism;
- /**
- * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
- * dynamic scaling and the number of key groups used for partitioned state.
- */
- private int maxParallelism;
- private ResourceSpec minResources = ResourceSpec.DEFAULT;
- private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
- private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights = new HashMap<>();
- private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
- private long bufferTimeout;
- private final String operatorName;
- private @Nullable String slotSharingGroup;
- private @Nullable String coLocationGroup;
- private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
- private TypeSerializer<?> stateKeySerializer;
- private StreamOperatorFactory<?> operatorFactory;
- private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
- private TypeSerializer<?> typeSerializerOut;
- // 入边集合和出边集合
- private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
- private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
- private final Class<? extends AbstractInvokable> jobVertexClass;
- private InputFormat<?, ?> inputFormat;
- private OutputFormat<?> outputFormat;
- private String transformationUID;
- private String userHash;
- private boolean sortedInputs = false;
- .....
- // 给StreamNode添加入边和出边,即往入边出边集合直接add
- public void addInEdge(StreamEdge inEdge) {
- if (inEdge.getTargetId() != getId()) {
- throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
- } else {
- inEdges.add(inEdge);
- }
- }
- public void addOutEdge(StreamEdge outEdge) {
- if (outEdge.getSourceId() != getId()) {
- throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
- } else {
- outEdges.add(outEdge);
- }
- }
- ....
- }
- /**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
- @Internal
- public class StreamEdge implements Serializable {
- private static final long serialVersionUID = 1L;
- private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;
- private final String edgeId;
- // 源节点和目的节点
- private final int sourceId;
- private final int targetId;
- /**
- * The type number of the input for co-tasks.
- */
- private final int typeNumber;
- /**
- * The side-output tag (if any) of this {@link StreamEdge}.
- */
- // 侧输出流标签
- private final OutputTag outputTag;
- /**
- * The {@link StreamPartitioner} on this {@link StreamEdge}.
- */
- // 分区器
- private StreamPartitioner<?> outputPartitioner;
- /**
- * The name of the operator in the source vertex.
- */
- private final String sourceOperatorName;
- /**
- * The name of the operator in the target vertex.
- */
- private final String targetOperatorName;
- // shuffle模式定义了算子之间的数据交换方式
- private final ShuffleMode shuffleMode;
- private long bufferTimeout;
- public StreamEdge(
- StreamNode sourceVertex,
- StreamNode targetVertex,
- int typeNumber,
- long bufferTimeout,
- StreamPartitioner<?> outputPartitioner,
- OutputTag outputTag,
- ShuffleMode shuffleMode) {
- this.sourceId = sourceVertex.getId();
- this.targetId = targetVertex.getId();
- this.typeNumber = typeNumber;
- this.bufferTimeout = bufferTimeout;
- this.outputPartitioner = outputPartitioner;
- this.outputTag = outputTag;
- this.sourceOperatorName = sourceVertex.getOperatorName();
- this.targetOperatorName = targetVertex.getOperatorName();
- this.shuffleMode = checkNotNull(shuffleMode);
- this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + outputPartitioner;
- }
- }
- //过程比较多,我尽量写的详细点……
- //主要是提交函数,CliFrontend是程序的提交的入口,重点方法是cli.parseAndRun(args)
- public static void main(final String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
- // 1. find the configuration directory
- // 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
- final String configurationDirectory = getConfigurationDirectoryFromEnv();
- // 2. load the global configuration
- // 2. 加载全局conf配置:
- // "taskmanager.memory.process.size" -> "1728m"
- // "parallelism.default" -> "1"
- // "jobmanager.execution.failover-strategy" -> "region"
- // "jobmanager.rpc.address" -> "localhost"
- // "taskmanager.numberOfTaskSlots" -> "1"
- // "jobmanager.memory.process.size" -> "1600m"
- // "jobmanager.rpc.port" -> "6123"
- final Configuration configuration =
- GlobalConfiguration.loadConfiguration(configurationDirectory);
- // 3. load the custom command lines
- // 3. 加载自定义参数
- final List<CustomCommandLine> customCommandLines =
- loadCustomCommandLines(configuration, configurationDirectory);
- try {
- // 构建CliFrontend : GenericCLI > flinkYarnSessionCLI > DefaultCLI
- final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
- SecurityUtils.install(new SecurityConfiguration(cli.configuration));
- // 使用parseAndRun 提交指令
- int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
- System.exit(retCode);
- } catch (Throwable t) {
- final Throwable strippedThrowable =
- ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
- LOG.error("Fatal error while running command line interface.", strippedThrowable);
- strippedThrowable.printStackTrace();
- System.exit(31);
- }
- }
- //之后在parseAndRun(args)这个函数,会根据请求的命令的不同调用不同的方法,例如run,stop等等
- //支持的命令
- /**
- *
- * // actions
- * private static final String ACTION_RUN = "run";
- * private static final String ACTION_RUN_APPLICATION = "run-application";
- * private static final String ACTION_INFO = "info";
- * private static final String ACTION_LIST = "list";
- * private static final String ACTION_CANCEL = "cancel";
- * private static final String ACTION_STOP = "stop";
- * private static final String ACTION_SAVEPOINT = "savepoint";
- */
- //因为我们是提交job,所以调用的是CliFrontend.run函数
- //在这个函数里面主要是确定执行Flink的方法/环境/程序等等信息,
- //通过CliFrontend.executeProgram(effectiveConfiguration, program)
- //然后交由ClientUtils工具类提交任务
- protected void executeProgram(final Configuration configuration, final PackagedProgram program)
- throws ProgramInvocationException {
- ClientUtils.executeProgram(
- new DefaultExecutorServiceLoader(), configuration, program, false, false);
- }
- //由ClientUtils.executeProgram(跟上一个方法名是一样的,只不过是类不同),构建程序的执行环境/类加载器,开始准备执行...
- // 执行程序代码
- public static void executeProgram(
- PipelineExecutorServiceLoader executorServiceLoader,
- Configuration configuration,
- PackagedProgram program,
- boolean enforceSingleJobExecution,
- boolean suppressSysout)
- throws ProgramInvocationException {
- checkNotNull(executorServiceLoader);
- // 获取用户了加载器. : FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@3439
- final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
- // 缓存当前类加载器...
- final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- // 设置类加载器为用户指定的类加载器..
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
- //log info : Starting program (detached: false)
- LOG.info(
- "Starting program (detached: {})",
- !configuration.getBoolean(DeploymentOptions.ATTACHED));
- // 获取用户代码中的环境....
- // getExecutionEnvironment
- ContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
- StreamContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
- try {
- // 通过反射的方式, 调用用户程序的mian方法...
- program.invokeInteractiveModeForExecution();
- } finally {
- ContextEnvironment.unsetAsContext();
- StreamContextEnvironment.unsetAsContext();
- }
- } finally {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
- //最后的最后,通过PackagedProgram.invokeInteractiveModeForExecution
- //这里是通过调用底层的callMainMethod方法,通过反射的方式去调用main方法。
- //mainMethod.invoke(null, (Object) args)到这里才是最终开始执行。
- /**
- * This method assumes that the context environment is prepared, or the execution will be a
- * local execution by default.
- */
- public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
- // mainClass: class org.apache.flink.streaming.examples.socket.SocketWindowWordCount
- // args
- // 0 = "--port"
- // 1 = "9999"
- callMainMethod(mainClass, args);
- }
- // class org.apache.flink.streaming.examples.socket.SocketWindowWordCount args : --port 9999
- private static void callMainMethod(Class<?> entryClass, String[] args)
- throws ProgramInvocationException {
- Method mainMethod;
- if (!Modifier.isPublic(entryClass.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " must be public.");
- }
- // public static void org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(java.lang.String[]) throws java.lang.Exception
- try {
- mainMethod = entryClass.getMethod("main", String[].class);
- } catch (NoSuchMethodException e) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " has no main(String[]) method.");
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "Could not look up the main(String[]) method from the class "
- + entryClass.getName()
- + ": "
- + t.getMessage(),
- t);
- }
- if (!Modifier.isStatic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-static main method.");
- }
- if (!Modifier.isPublic(mainMethod.getModifiers())) {
- throw new ProgramInvocationException(
- "The class " + entryClass.getName() + " declares a non-public main method.");
- }
- // 开始执行 !!!!!!!!!
- try {
- mainMethod.invoke(null, (Object) args);
- } catch (IllegalArgumentException e) {
- throw new ProgramInvocationException(
- "Could not invoke the main method, arguments are not matching.", e);
- } catch (IllegalAccessException e) {
- throw new ProgramInvocationException(
- "Access to the main method was denied: " + e.getMessage(), e);
- } catch (InvocationTargetException e) {
- Throwable exceptionInMethod = e.getTargetException();
- if (exceptionInMethod instanceof Error) {
- throw (Error) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramParametrizationException) {
- throw (ProgramParametrizationException) exceptionInMethod;
- } else if (exceptionInMethod instanceof ProgramInvocationException) {
- throw (ProgramInvocationException) exceptionInMethod;
- } else {
- throw new ProgramInvocationException(
- "The main method caused an error: " + exceptionInMethod.getMessage(),
- exceptionInMethod);
- }
- } catch (Throwable t) {
- throw new ProgramInvocationException(
- "An error occurred while invoking the program's main method: " + t.getMessage(),
- t);
- }
- }
- //调用getStreamGraph函数
- public JobExecutionResult execute(String jobName) throws Exception {
- Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
- //生成StreamGraph
- return execute(getStreamGraph(jobName));
- }
- // 这里主要是生成StreamGraph,其中使用StreamGraphGenerator.generate函数
- /**
- * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
- * clears previously registered {@link Transformation transformations}.
- *
- * @param jobName Desired name of the job
- * @return The streamgraph representing the transformations
- */
- @Internal
- public StreamGraph getStreamGraph(String jobName) {
- return getStreamGraph(jobName, true);
- }
- /**
- * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the streaming job
- * with the option to clear previously registered {@link Transformation transformations}. Clearing the
- * transformations allows, for example, to not re-execute the same operations when calling
- * {@link #execute()} multiple times.
- *
- * @param jobName Desired name of the job
- * @param clearTransformations Whether or not to clear previously registered transformations
- * @return The streamgraph representing the transformations
- */
- @Internal
- public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
- StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
- if (clearTransformations) {
- this.transformations.clear();
- }
- return streamGraph;
- }
- public StreamGraph generate() {
- //生成StreamGraph实例
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- //判断执行模式
- shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
- // 配置StreamGraph
- configureStreamGraph(streamGraph);
- alreadyTransformed = new HashMap<>();
- //遍历所有的转换
- for (Transformation<?> transformation: transformations) {
- //生成streamNode 和 streamEdge
- transform(transformation);
- }
- .........
- }
- //最终根据transform(transformtaion),生成StreamGraph
- //其中transform函数将会调用translateInternal进行生成实例。在后续版本中是在transformFeedback函数中,调用addEdge函数进行StreamEdges的链接
- private Collection<Integer> translateInternal(
- final OneInputTransformation<IN, OUT> transformation,
- final Context context) {
- checkNotNull(transformation);
- checkNotNull(context);
- final StreamGraph streamGraph = context.getStreamGraph();
- final String slotSharingGroup = context.getSlotSharingGroup();
- final int transformationId = transformation.getId();
- final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
- //生成StreamNode,并添加到StreamGraph的streamNodesMap中
- streamGraph.addOperator(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- transformation.getOperatorFactory(),
- transformation.getInputType(),
- transformation.getOutputType(),
- transformation.getName());
- .......
- for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
- //生成Edge并把该edge添加到自己的上下游streamNode中
- streamGraph.addEdge(inputId, transformationId, 0);
- }
- }
这里说明一下,在生成StreamGraph的时候,其中有transformation参数,这个参数主要是在StreamGraphGenerator.generate(this, transformations)的时候进行传递的。是通过protected final List<StreamTransformation<?>> transformations = new ArrayList<>();产生。每一个operator算子都会对应一个OutputStreamOperator,然后在函数中调用transform函数,并且进行addOperator(resultTransform),把算子添加到transformation中完成赋值。
- public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
- OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operator,
- outTypeInfo,
- environment.getParallelism());
- ...
- getExecutionEnvironment().addOperator(resultTransform);
- return returnStream;
- }
- public <IN, OUT> void addOperator(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- StreamOperatorFactory<OUT> operatorFactory,
- TypeInformation<IN> inTypeInfo,
- TypeInformation<OUT> outTypeInfo,
- String operatorName) {
- //后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
- //比如Map函数对应的OneInputStreamTask.class
- Class<? extends AbstractInvokable> invokableClass =
- operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
- addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
- outTypeInfo, operatorName, invokableClass);
- }
- protected StreamNode addNode(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- Class<? extends AbstractInvokable> vertexClass,
- StreamOperatorFactory<?> operatorFactory,
- String operatorName) {
- if (streamNodes.containsKey(vertexID)) {
- throw new RuntimeException("Duplicate vertexID " + vertexID);
- }
- //生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
- // 里面封装了用户的userFunction)
- StreamNode vertex = new StreamNode(
- vertexID,
- slotSharingGroup,
- coLocationGroup,
- operatorFactory,
- operatorName,
- vertexClass);
- streamNodes.put(vertexID, vertex);
- .....
- }
- private void addEdgeInternal(Integer upStreamVertexID,
- Integer downStreamVertexID,
- int typeNumber,
- StreamPartitioner<?> partitioner,
- List<String> outputNames,
- OutputTag outputTag,
- ShuffleMode shuffleMode) {
- //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
- if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
- //outputTag标识一个sideout流
- if (outputTag == null) {
- outputTag = virtualSideOutputNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
- //partition类型的transformation同上
- } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
- if (partitioner == null) {
- partitioner = virtualPartitionNodes.get(virtualId).f1;
- }
- shuffleMode = virtualPartitionNodes.get(virtualId).f2;
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
- } else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
- // If no partitioner was specified and the parallelism of upstream and downstream
- // operator matches use forward partitioning, use rebalance otherwise.
- // 分区器由上下游的并行度是否一致决定
- // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
- // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
- //决定Operator是否可chain(!=batch)以及ResultPartitionType的类型
- //通常transformation的shuffleMode = UNDEFINED(包括partition类型的transformation)
- //此时ResultPartitionType的类型将由GlobalDataExchangeMode决定(非batch模式下=ALL_EDGES_PIPELINED->ResultPartitionType=PIPELINED_BOUNDED)
- if (shuffleMode == null) {
- shuffleMode = ShuffleMode.UNDEFINED;
- }
- //生成StreamEdge 核心属性为上下游节点和分区器及shuffleMode
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
- partitioner, outputTag, shuffleMode);
- //把该edge添加到自己的上下游streamNode中
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
- }
- }
