当前位置:   article > 正文

Flink源码剖析:flink-streaming-java 之 StreamGraph

flink-streaming-java


本文主要围绕 Flink 源码中 flink-streaming-java 模块。介绍如何使用 DataStream API 进行 Flink 流任务开发, flink-streaming-java 模块中的一些重要类,贯穿着介绍下从 DataStream
API 到 StreamGraph 的构建过程。

1. DataStream API使用一览

使用 DataStream API 通常有以下步骤:

  1. 如何创建 Environment(Local、Remote) 并设置属性
  • setParallelism(int):StreamExecutionEnvironment
  • setMaxParallelism(int):StreamExecutionEnvironment
  • setBufferTimeout(long):StreamExecutionEnvironment
  • enableCheckpointing(long,CheckpointingMode):StreamExecutionEnvironment
  • setStateBackend(StateBackend):StreamExecutionEnvironment
  • setStreamTimeCharacteristic(TimeCharacteristic):void
  1. 如何读取数据?添加 Source 数据源获得 DataStream
  • fromElements(OUT …): DataStreamSource …
  • readTextFile(String): DataStreamSource …
  • readFile(FileInputFormat,String): DataStreamSource …
  • socketTextStream(String ,int ,String ,long): DataStreamSource …
  • createInput(InputFormat<OUT,?>,TypeInformation): DataStreamSource …
  • addSource(SourceFunction,TypeInformation): DataStreamSource …
  1. 如何操作转换数据?

在这里插入图片描述

图1: DataStream API操作概览
  • Basic Transformations
    map、filter、flatMap
  • KeyedStream Transformations
    keyBy、aggregations、reduce
  • MultiStream Transformations
    union、connect、coMap、coFlatMap、split、select

在这里插入图片描述

图2: DataStream基本转换
  • Distribution Transformations
    物理分组:
关系 表示 图示
global 全部发往第1个task 在这里插入图片描述
broadcast 广播,复制上游的数据发送到所有下游节点 在这里插入图片描述
forward 上下游并发度一样时一对一发送 在这里插入图片描述
shuffle 随机均匀分配 在这里插入图片描述
reblance Round-Robin(轮流分配) 在这里插入图片描述
rescale Local Round-Robin (本地轮流分配),只会看到本机的实例 在这里插入图片描述
partitionCustom 自定义单播
  1. 如何输出数据?添加 Sink
  • writeAsText(String path): DataStreamSink …
  • writeAsCsv(String path): DataStreamSink …
  • addSink(SinkFunction sinkFunction): DataStreamSink
  1. 如何提交执行?
    DataStream 通过不同的算子不停地在 DataStream 上实现转换过滤等逻辑,最终将结果输出到 DataSink 中。
    在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换。
  • execute():JobExecutionResult

1.1 自带 WordCount 代码示例

我们看下基于 Flink DataStream API 的自带 WordCount 示例:实时统计单词数量,每来一个计算一次并输出一次。

public class WordCount {
   

	// *************************************************************************
	// PROGRAM
	// *************************************************************************

	public static void main(String[] args) throws Exception {
   

		final ParameterTool params = ParameterTool.fromArgs(args);
		// 1. 设置运行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().setGlobalJobParameters(params);
		env.setParallelism(2);

		// 2. 配置数据源读取数据
		DataStream<String> text;
		if (params.has("input")) {
   
			// read the text file from given input path
			text = env.readTextFile(params.get("input"));
		} else {
   
			// get default test text data
			text = env.fromElements(new String[] {
   
				"miao,She is a programmer",
				"wu,He is a programmer",
				"zhao,She is a programmer"
			});
		}

        // 3. 进行一系列转换
		DataStream<Tuple2<String, Integer>> counts =
			// split up the lines in pairs (2-tuples) containing: (word,1)
			text.flatMap(new Tokenizer())
			// group by the tuple field "0" and sum up tuple field "1"
			.keyBy(0).sum(1);

		// 4. 配置数据汇写出数据
		if (params.has("output")) {
   
			counts.writeAsText(params.get("output"));
		} else {
   
			System.out.println("Printing result to stdout. Use --output to specify output path.");
			counts.print();
		}

		// 5. 提交执行
		env.execute("Streaming WordCount");
	}

	// *************************************************************************
	// USER FUNCTIONS
	// *************************************************************************
	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
   

		@Override
		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
   
			// normalize and split the line
			String[] tokens = value.toLowerCase().split("\\W+");

			// emit the pairs
			for (String token : tokens) {
   
				if (token.length() > 0) {
   
					out.collect(new Tuple2<>(token, 1));
				}
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

2. 源码剖析

2.1 StreamExecutionEnvironment

StreamExecutionEnvironment 是 Flink 流处理任务执行的上下文,是我们编写 Flink 程序的入口。根据执行环境的不同,选择不同的 StreamExecutionEnvironment 类,
有 LocalStreamEnvironment、RemoteStreamEnvironment 等。如下图:
在这里插入图片描述

图3: StreamExecutionEnvironment子类

StreamExecutionEnvironment 依赖 ExecutionConfig 类来设置并行度等,依赖 CheckpointConfig 设置 Checkpointing 等相关属性。

在这里插入图片描述

图4: StreamExecutionEnvironment类图

在这里插入图片描述

图5: StreamExecutionEnvironment类中的重要属性和方法

2.2 Transformation

Transformation 代表了从一个或多个 DataStream 生成新 DataStream 的操作。在 DataStream 上通过 map 等算子不断进行转换,就得到了由 Transformation
构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph 。

Transformation 有很多子类,如 SourceTransformation、OneInputTransformation、TwoInputTransformation、SideOutputTransformation 等,分别对应了 DataStream 上的不同转换操作。

在这里插入图片描述

图6: Transformation子类

每一个 Transformation 都有一个关联 id,这个 id 是全局递增的,还有 uid、slotSharingGroup、parallelism 等信息。

在这里插入图片描述

图7: Transformation类中的重要属性

查看 Transformation 的其中两个子类 OneInputTransformation、TwoInputTransformation 的实现,都对应有输入 Transformation,也正是基于此才能还原出 DAG 的拓扑结构。

Transformation 在运行时并不对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。
Transformations 组成的 graph ,也就是我们写代码时的图结构如下:

    Source              Source
       +                   +
       |                   |
       v                   v
   Rebalance          HashPartition
       +                   +
       |                   |
       |                   |
       +------>Union<------+
                 +
                 |
                 v
               Split
                 +
                 |
                 v
               Select
                 +
                 v
                Map
                 +
                 |
                 v
               Sink
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

但是,在运行时将生成如下操作图,split/select/partitioning 等转换操作会被编码到边中,这个边连接 sources 和 map 操作:

  Source              Source
     +                   +
     |                   |
     |                   |
     +------->Map<-------+
               +
               |
               v
              Sink
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

通过源码也可以发现,UnionTransformation、SplitTransformation、SelectTransformation、PartitionTransformation 由于不包含具体的操作,
所以都没有 StreamOperator 成员变量,而其他的 Transformation 子类基本都有。

2.3 DataStream

一个 DataStream 就代表了同一种类型元素构成的数据流。通过对 DataStream 应用 map/filter 等操作,就可以将一个 DataStream 转换成另一个 DataStream 。
这个转换的过程就是根据不同的操作生成不同的 Transformation ,并将其加入到 StreamExecutionEnvironment 的 transformations 列表中。

DataStream 的子类包括 DataStreamSource、KeyedStream、IterativeStream、SingleOutputStreamOperator。

在这里插入图片描述

图8: DataStream子类

除了 DataStream 及其子类以外,其它的表征数据流的类还有 ConnectedStreams、WindowedStream、AllWindowedStream,这些会在后续的文章中陆续介绍。

DataStream 类中的重要属性和方法:

在这里插入图片描述

图9: DataStream类中的重要属性和方法

下面我们看下 map 操作是如何被添加进来的:

public class DataStream {
   

    public <R> SingleOutputStreamOperator
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/794405
推荐阅读
相关标签
  

闽ICP备14008679号