赞
踩
Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。
Flink分为四种执行图,本章主要解析StreamGraph的生成计划
本章及后续源码解读环境以生产为主:运行模式:OnYarn,HA模式:ZK,Mode:Streaming,关键逻辑解释我会备注到代码上(灰色字体)勿忽视。
StreamGraph作为Flink最上层的逻辑封装可以理解为用户API的转化的逻辑层,它同JobGraph一样最初是在Client端生成(集群模式)但不一样的是不会像JobGraph那样有链等加工操作,主要是把用户编写的Transformations转换成StreamNode并生成指向上下游的StreamEdge并装载进StreamGraph,过程并不复杂。
以OnYarn为例,当我们在集群环境提交submit脚本入口函数:
主要流程为flink获取用户jar包,反射并调用用户main函数
- /**
- * Submits the job based on the arguments.
- */
- public static void main(final String[] args) {
- ........
-
- try {
- final CliFrontend cli = new CliFrontend(
- configuration,
- customCommandLines);
-
- SecurityUtils.install(new SecurityConfiguration(cli.configuration));
- int retCode = SecurityUtils.getInstalledContext()
- //根据参数匹配对应的执行方式
- .runSecured(() -> cli.parseAndRun(args));
- System.exit(retCode);
- }
- public static void executeProgram(
- PipelineExecutorServiceLoader executorServiceLoader,
- Configuration configuration,
- PackagedProgram program,
- boolean enforceSingleJobExecution,
- boolean suppressSysout) throws ProgramInvocationException {
- ...
-
- try {//执行主函数
- program.invokeInteractiveModeForExecution();
- private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
- 。。。。
-
- try {//执行主函数
- mainMethod.invoke(null, (Object) args);
当上述代码执行到mainMethod.inoke后就会进入到用户的主程序中开始运行用户main函数的逻辑
跟Spark一样,Flink也是懒执行,用户逻辑代码会在Flink封装并执行完所有流程图后才开始运行。
以wc为例,各个operator会生成对应的Transformation(比如Map对应的OneInputTransformation)等Flink封装的逻辑实例,但目前并没有真正运行用户代码,而是直到运行到StreamExecutionEnvironment.execute()后,才开始懒执行
Map函数:这里看得出核心逻辑都封装成了OneInputTransformation,包括处理用户Map逻辑的代码StreamTask(这里的processElement主要是output到input的数据交互逻辑,后续章节会展开说),添加到StreamExecutionEnvironment的List<Transformation<?>> transformations中的Transformation会依次生成对应的StreamNode
- public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
- //生成StreamMap
- return transform("Map", outputType, new StreamMap<>(clean(mapper)));
- }
-
- public class StreamMap<IN, OUT>
- extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public StreamMap(MapFunction<IN, OUT> mapper) {
- super(mapper);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- //处理逻辑
- public void processElement(StreamRecord<IN> element) throws Exception {
- output.collect(element.replace(userFunction.map(element.getValue())));
- }
- }
-
- protected <R> SingleOutputStreamOperator<R> doTransform(
- String operatorName,
- TypeInformation<R> outTypeInfo,
- StreamOperatorFactory<R> operatorFactory) {
-
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
-
- //operatorFactory 列:常用的用户自定义的算子(map,filter等)生成SimpleUdfStreamOperatorFactory
- OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
- this.transformation,
- operatorName,
- operatorFactory,
- outTypeInfo,
- environment.getParallelism());
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
-
- getExecutionEnvironment().addOperator(resultTransform);
-
- return returnStream;
- }
当执行到StreamExecutionEnvironment.execute()后,开始触发Flink的执行程序
- public JobExecutionResult execute(String jobName) throws Exception {
- Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
- //生成StreamGraph
- return execute(getStreamGraph(jobName));
- }
-
- public StreamGraph generate() {
- //生成StreamGraph实例
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- //判断执行模式
- shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
- configureStreamGraph(streamGraph);
-
- alreadyTransformed = new HashMap<>();
-
- for (Transformation<?> transformation: transformations) {
- //生成streamNode 和 streamEdge
- transform(transformation);
- }
-
- private Collection<Integer> translateInternal(
- final OneInputTransformation<IN, OUT> transformation,
- final Context context) {
- checkNotNull(transformation);
- checkNotNull(context);
-
- final StreamGraph streamGraph = context.getStreamGraph();
- final String slotSharingGroup = context.getSlotSharingGroup();
- final int transformationId = transformation.getId();
- final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
-
- //生成StreamNode,并添加到StreamGraph的streamNodesMap中
- streamGraph.addOperator(
- transformationId,
- slotSharingGroup,
- transformation.getCoLocationGroupKey(),
- transformation.getOperatorFactory(),
- transformation.getInputType(),
- transformation.getOutputType(),
- transformation.getName());
-
- .......
-
- for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))) {
- //生成Edge并把该edge添加到自己的上下游streamNode中
- streamGraph.addEdge(inputId, transformationId, 0);
- }
生成StreamNode
- public <IN, OUT> void addOperator(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- StreamOperatorFactory<OUT> operatorFactory,
- TypeInformation<IN> inTypeInfo,
- TypeInformation<OUT> outTypeInfo,
- String operatorName) {
- //后面在生产Task时是通过该Class来反射调用带参构造函数来初始化Task
- //比如Map函数对应的OneInputStreamTask.class
- Class<? extends AbstractInvokable> invokableClass =
- operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
- addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
- outTypeInfo, operatorName, invokableClass);
- }
-
- protected StreamNode addNode(
- Integer vertexID,
- @Nullable String slotSharingGroup,
- @Nullable String coLocationGroup,
- Class<? extends AbstractInvokable> vertexClass,
- StreamOperatorFactory<?> operatorFactory,
- String operatorName) {
-
- if (streamNodes.containsKey(vertexID)) {
- throw new RuntimeException("Duplicate vertexID " + vertexID);
- }
-
- //生成StreamNode 核心数据:slotSharingGroup,operatorFactory(常用的用户自义定算子SimpleUdfStreamOperatorFactory等,
- // 里面封装了用户的userFunction)
- StreamNode vertex = new StreamNode(
- vertexID,
- slotSharingGroup,
- coLocationGroup,
- operatorFactory,
- operatorName,
- vertexClass);
-
- streamNodes.put(vertexID, vertex);
生成Edge
- private void addEdgeInternal(Integer upStreamVertexID,
- Integer downStreamVertexID,
- int typeNumber,
- StreamPartitioner<?> partitioner,
- List<String> outputNames,
- OutputTag outputTag,
- ShuffleMode shuffleMode) {
-
- //如果是sideout类型的transformation,使用上游的transformationId继续调用addEdgeInternal
- if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
- //outputTag标识一个sideout流
- if (outputTag == null) {
- outputTag = virtualSideOutputNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
- //partition类型的transformation同上
- } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
- if (partitioner == null) {
- partitioner = virtualPartitionNodes.get(virtualId).f1;
- }
- shuffleMode = virtualPartitionNodes.get(virtualId).f2;
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
- } else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream and downstream
- // operator matches use forward partitioning, use rebalance otherwise.
- // 分区器由上下游的并行度是否一致决定
- // 这里ForwardPartitioner与RebalancePartitioner等的区别主要体现在selectChannel,
- // 前者直接返会当前channel的index 0 后者为当前Channel个数取随机+1 再对Channel个数取余(另外几个partitioner也实现不同的selectChannel)
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
-
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
-
- //决定Operator是否可chain(!=batch)以及ResultPartitionType的类型
- //通常transformation的shuffleMode = UNDEFINED(包括partition类型的transformation)
- //此时ResultPartitionType的类型将由GlobalDataExchangeMode决定(非batch模式下=ALL_EDGES_PIPELINED->ResultPartitionType=PIPELINED_BOUNDED)
- if (shuffleMode == null) {
- shuffleMode = ShuffleMode.UNDEFINED;
- }
-
- //生成StreamEdge 核心属性为上下游节点和分区器及shuffleMode
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
- partitioner, outputTag, shuffleMode);
-
- //把该edge添加到自己的上下游streamNode中
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
- }
- }
至此StreamGraph构建完毕,看得出StreamGraph整体构造过程比较清晰不复杂,仅仅是Flink对用户API以及部分配置的描述并存储在自己封装的数据结构中而已,这些数据结构会在后面构建JobGraph时取出调用,比如遍历出缓存的StreamEdge 符合条件的Operator会Chian在一起合并成JobVertex等等,而JobGraph所存储封装的数据结构又会在构造ExecutionGraph时被取出调用直到构造出物理执行图,从Client端到Cluster端整个过程会开始慢慢变得复杂 ,中间也会涉及到不同的外部依赖也会生成负责调度,监控,物理执行的组件等等,以上整个过程会放在后面章节解析。
欢迎任何技术上的交流,Thanks..
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。