当前位置:   article > 正文

Flink源码(一):StreamGraph

Flink源码(一):StreamGraph

前提:由于最近需要在原有Flink CEP的基础上,实现动态加载CEP规则的需求,因此想到了通过自定义动态CEP算子以及自定义动态pattern加载的算子协调器实现规则的更新。

        因此在开发同时记录一下Flink数据处理的过程。

        VERSION:FLINK 1.14.3

1.1. StreamExecutionEnvironment

        在每个flink程序开发的时候,都会有一行这样的语句:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        具体做了什么,现在点到源码里看一下。

  1. public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
  2. return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
  3. .map(factory -> factory.createExecutionEnvironment(configuration))
  4. .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
  5. }

这里会通过工厂类获取到StreamExecutionEnvironment。

    • 当本地IDE运行时创建的是LocalEnvironment
    • 命令行启动时CliFrontend会对上下文进行初始化
  1. StreamExecutionEnvironmentFactory factory =
  2. conf -> {
  3. Configuration mergedConfiguration = new Configuration();
  4. mergedConfiguration.addAll(configuration);
  5. mergedConfiguration.addAll(conf);
  6. return new StreamContextEnvironment(
  7. executorServiceLoader,
  8. mergedConfiguration,
  9. userCodeClassLoader,
  10. enforceSingleJobExecution,
  11. suppressSysout);
  12. };
  13. initializeContextEnvironment(factory);
  14. protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
  15. contextEnvironmentFactory = ctx;
  16. threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
  17. }

1.2. env.addSource

        在程序开始都会通过addSource获取源数据,返回DataStreamSource<?>。

  1. private <OUT> DataStreamSource<OUT> addSource(
  2. final SourceFunction<OUT> function,
  3. final String sourceName,
  4. @Nullable final TypeInformation<OUT> typeInfo,
  5. final Boundedness boundedness) {
  6. ......
  7. TypeInformation<OUT> resolvedTypeInfo =
  8. getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
  9. ......
  10. final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
  11. return new DataStreamSource<>(
  12. this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
  13. }

        这里主要目的就是创建了StreamSource作为算子(将function赋给父类AbstractUdfStreamOperator),同时初始化并返回DataStreamSource(初始化了父类DataStream的environment和transformation)。

        transformation是一个转换对象,后续所有的算子最终都会转换成一个transformation

1.3. transform

        程序中调用map,filter,keyby等等算子进行逻辑处理,其实都是flink的transform操作。

以map为例:

  1. public <R> SingleOutputStreamOperator<R> map(
  2. MapFunction<T, R> mapper, TypeInformation<R> outputType) {
  3. return transform("Map", outputType, new StreamMap<>(clean(mapper)));
  4. }

        outputType中包含了当前算子的输入类型以及输出类型。

        StreamMap是常用的算子中的一种,自定义MapFunction时也是调用了不同类下的userFunction.map

  1. public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
  2. implements OneInputStreamOperator<IN, OUT> {
  3. private static final long serialVersionUID = 1L;
  4. public StreamMap(MapFunction<IN, OUT> mapper) {
  5. super(mapper);
  6. chainingStrategy = ChainingStrategy.ALWAYS;
  7. }
  8. @Override
  9. public void processElement(StreamRecord<IN> element) throws Exception {
  10. output.collect(element.replace(userFunction.map(element.getValue())));
  11. }
  12. }

        其他常用的算子类还有:

  1. public <R> SingleOutputStreamOperator<R> transform(
  2. String operatorName,
  3. TypeInformation<R> outTypeInfo,
  4. OneInputStreamOperator<T, R> operator) {
  5. return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
  6. }

        SimpleOperatorFactory.of(operator)是通过静态工厂设计方法,创建一个Simple算子的工厂,后续在流转换时会从工厂中获取对应的算子。

  1. protected <R> SingleOutputStreamOperator<R> doTransform(
  2. String operatorName,
  3. TypeInformation<R> outTypeInfo,
  4. StreamOperatorFactory<R> operatorFactory) {
  5. transformation.getOutputType();
  6. // 创建一个 OneInputTransformation 实例,用于描述输入转换和输出操作
  7. OneInputTransformation<T, R> resultTransform =
  8. new OneInputTransformation<>(
  9. this.transformation,
  10. operatorName,
  11. operatorFactory,
  12. outTypeInfo,
  13. environment.getParallelism());
  14. @SuppressWarnings({"unchecked", "rawtypes"})
  15. SingleOutputStreamOperator<R> returnStream =
  16. new SingleOutputStreamOperator(environment, resultTransform);
  17. getExecutionEnvironment().addOperator(resultTransform);
  18. return returnStream;
  19. }

        最终创建OneInputTransformation实例,通过addOperator将当前算子对应的transform添加到StreamExecutionEnvironment下的List<Transformation<?>> transformations中。

        其余算子操作都是这个流程,将算子操作 -> 算子对象 -> 算子类工厂 -> Transformation -> 添加到list中。

1.4. dataStream.addSink

        大部分程序中会通过addSink将数据发送到其他系统,在流调用addSink时,其实都是使用的父类dataStream中的addSink方法。

        DataStream及其子类,其中DataStream、KeyedStream和SingleOutputStreamOperator都是常见的流的接收类型。

  1. public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
  2. ......
  3. StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
  4. DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
  5. getExecutionEnvironment().addOperator(sink.getLegacyTransformation());
  6. return sink;
  7. }

        可以看到addSink和doTransform目的一样,都是将当前操作作为算子构建出transform放到list中,记录所有要执行的转换操作,以map时实例化的OneInputTransformation为例:

  1. public OneInputTransformation(
  2. Transformation<IN> input,
  3. String name,
  4. StreamOperatorFactory<OUT> operatorFactory,
  5. TypeInformation<OUT> outputType,
  6. int parallelism) {
  7. super(name, outputType, parallelism);
  8. this.input = input;
  9. this.operatorFactory = operatorFactory;
  10. }

        指定了当前Transformation的上游Transformation以及当前Transformation的算子工厂

  1. protected static Integer idCounter = 0;
  2. public static int getNewNodeId() {
  3. idCounter++;
  4. return idCounter;
  5. }
  6. public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
  7. this.id = getNewNodeId();
  8. this.name = Preconditions.checkNotNull(name);
  9. this.outputType = outputType;
  10. this.parallelism = parallelism;
  11. this.slotSharingGroup = Optional.empty();
  12. }

        另外指定了父类Transformation中的一些信息,其中 NodeId 是从0开始递增,后续也会Stream Graph的NodeId。

        总的来说,在编写flink程序的时候,就已经完成了流环境以及每个算子的一些初始化操作

1.5. env.execute()

        env.execute是用来启动执行一个Flink程序的主要方法,方法内有两步操作

  1. public JobExecutionResult execute() throws Exception {
  2. return execute(getStreamGraph());
  3. }

        getStreamGraph():获取stream Graph

        execute:执行stream Graph

1.6. stream Graph

  1. public StreamGraph getStreamGraph(boolean clearTransformations) {
  2. // 将 transformations 算子列表转换生成 StreamGraph
  3. final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
  4. // 如果指定了清除转换列表,则清除
  5. if (clearTransformations) {
  6. transformations.clear();
  7. }
  8. return streamGraph;
  9. }

        点到generate看一下。

  1. public StreamGraph generate() {
  2. // 通过执行配置、检查点配置、保存点配置初始化 stream graph
  3. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  4. streamGraph.setEnableCheckpointsAfterTasksFinish(
  5. configuration.get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
  6. // 是否以批模式执行
  7. shouldExecuteInBatchMode = shouldExecuteInBatchMode();
  8. configureStreamGraph(streamGraph);
  9. // 已经转换过的
  10. alreadyTransformed = new HashMap<>();
  11. // 对每个注册的转换进行转换操作
  12. for (Transformation<?> transformation : transformations) {
  13. transform(transformation);
  14. }
  15. streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
  16. // 设置细粒度全局流交换模式
  17. setFineGrainedGlobalStreamExchangeMode(streamGraph);
  18. // 禁用不对齐检查点支持的流边
  19. for (StreamNode node : streamGraph.getStreamNodes()) {
  20. if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
  21. for (StreamEdge edge : node.getInEdges()) {
  22. edge.setSupportsUnalignedCheckpoints(false);
  23. }
  24. }
  25. }
  26. final StreamGraph builtStreamGraph = streamGraph;
  27. // 清理资源
  28. alreadyTransformed.clear();
  29. alreadyTransformed = null;
  30. streamGraph = null;
  31. return builtStreamGraph;
  32. }

        初始化了StreamGraph并做了检查点等配置,这里主要看一下transform是如何处理每个转换操作的。

  1. private Collection<Integer> transform(Transformation<?> transform) {
  2. ...... // 一些资源配置操作,先略过,主要看是如何处理transform的
  3. // 获取特定类型转换的翻译器
  4. final TransformationTranslator<?, Transformation<?>> translator =
  5. (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
  6. Collection<Integer> transformedIds;
  7. // 如果存在合适的翻译器,则使用翻译器进行转换;否则使用旧版转换方式
  8. if (translator != null) {
  9. transformedIds = translate(translator, transform);
  10. } else {
  11. transformedIds = legacyTransform(transform);
  12. }
  13. // 如果尚未将转换操作放入已转换映射中,则将其添加进去
  14. if (!alreadyTransformed.containsKey(transform)) {
  15. alreadyTransformed.put(transform, transformedIds);
  16. }
  17. return transformedIds;
  18. }

        translatorMap在static代码块中进行了初始化,以map为例获取到的是OneInputTransformationTranslator,直接看一下translator.translateForStreaming(transform, context)内部是做了什么。

  1. protected Collection<Integer> translateInternal(
  2. final Transformation<OUT> transformation,
  3. final StreamOperatorFactory<OUT> operatorFactory,
  4. final TypeInformation<IN> inputType,
  5. @Nullable final KeySelector<IN, ?> stateKeySelector,
  6. @Nullable final TypeInformation<?> stateKeyType,
  7. final Context context) {
  8. checkNotNull(transformation);
  9. checkNotNull(operatorFactory);
  10. checkNotNull(inputType);
  11. checkNotNull(context);
  12. final StreamGraph streamGraph = context.getStreamGraph();
  13. final String slotSharingGroup = context.getSlotSharingGroup();
  14. final int transformationId = transformation.getId();
  15. final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
  16. streamGraph.addOperator(
  17. transformationId, // nodeid
  18. slotSharingGroup, // 共享槽
  19. transformation.getCoLocationGroupKey(),
  20. operatorFactory, // 算子工厂
  21. inputType, // 上游的 transform
  22. transformation.getOutputType(), // 下游的 transform
  23. transformation.getName()); // 算子名
  24. if (stateKeySelector != null) {
  25. TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
  26. streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
  27. }
  28. int parallelism =
  29. transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
  30. ? transformation.getParallelism()
  31. : executionConfig.getParallelism();
  32. streamGraph.setParallelism(transformationId, parallelism);
  33. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  34. final List<Transformation<?>> parentTransformations = transformation.getInputs();
  35. checkState(
  36. parentTransformations.size() == 1,
  37. "Expected exactly one input transformation but found "
  38. + parentTransformations.size());
  39. // 获取上游的 Transformation ,从已转换的Transformation里找到对应的id
  40. for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
  41. // 将每个父ID到当前Transformation的边关系添加到图中
  42. streamGraph.addEdge(inputId, transformationId, 0);
  43. }
  44. // 返回当前的 节点ID
  45. return Collections.singleton(transformationId);
  46. }

        这里通过streamGraph.addOperator将每个Transformation添加到了streamGraph,在内部创建了StreamNode,并且将StreamNode添加到StreamNode集合中。

        另外通过streamGraph.addEdge,从以完成转换的map中获取到上游Transformation的Ids,将上游Id和当前Id的边关系添加到streamGraph中,最终将Transformation和NodeId添加到已转换的Transformation集合中。

        总结一下,在generate过程中主要是将每个Transformation实例成streamNode,两个Node之间的上下游关系实例成streamEdge,同时设置了每个Node的资源配置,上下游序列化方式,检查点等配置项。

个人总结可能会有理解不到位的地方,欢迎指正~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/895907
推荐阅读
相关标签
  

闽ICP备14008679号