赞
踩
flink-streaming-java
模块。介绍如何使用 DataStream API 进行 Flink 流任务开发,
flink-streaming-java
模块中的一些重要类,贯穿着介绍下从 DataStream
使用 DataStream API 通常有以下步骤:
关系 | 表示 | 图示 |
---|---|---|
global | 全部发往第1个task | |
broadcast | 广播,复制上游的数据发送到所有下游节点 | |
forward | 上下游并发度一样时一对一发送 | |
shuffle | 随机均匀分配 | |
reblance | Round-Robin(轮流分配) | |
rescale | Local Round-Robin (本地轮流分配),只会看到本机的实例 | |
partitionCustom | 自定义单播 |
List<StreamTransformation<?>> transformations
来保留生成 DataStream 的所有转换。我们看下基于 Flink DataStream API 的自带 WordCount 示例:实时统计单词数量,每来一个计算一次并输出一次。
public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // 1. 设置运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(2); // 2. 配置数据源读取数据 DataStream<String> text; if (params.has("input")) { // read the text file from given input path text = env.readTextFile(params.get("input")); } else { // get default test text data text = env.fromElements(new String[] { "miao,She is a programmer", "wu,He is a programmer", "zhao,She is a programmer" }); } // 3. 进行一系列转换 DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0).sum(1); // 4. 配置数据汇写出数据 if (params.has("output")) { counts.writeAsText(params.get("output")); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } // 5. 提交执行 env.execute("Streaming WordCount"); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
StreamExecutionEnvironment 是 Flink 流处理任务执行的上下文,是我们编写 Flink 程序的入口。根据执行环境的不同,选择不同的 StreamExecutionEnvironment 类,
有 LocalStreamEnvironment、RemoteStreamEnvironment 等。如下图:
StreamExecutionEnvironment 依赖 ExecutionConfig 类来设置并行度等,依赖 CheckpointConfig 设置 Checkpointing 等相关属性。
Transformation 代表了从一个或多个 DataStream 生成新 DataStream 的操作。在 DataStream 上通过 map 等算子不断进行转换,就得到了由 Transformation
构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph 。
Transformation 有很多子类,如 SourceTransformation、OneInputTransformation、TwoInputTransformation、SideOutputTransformation 等,分别对应了 DataStream 上的不同转换操作。
每一个 Transformation 都有一个关联 id,这个 id 是全局递增的,还有 uid、slotSharingGroup、parallelism 等信息。
查看 Transformation 的其中两个子类 OneInputTransformation、TwoInputTransformation 的实现,都对应有输入 Transformation,也正是基于此才能还原出 DAG 的拓扑结构。
Transformation 在运行时并不对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。
Transformations 组成的 graph ,也就是我们写代码时的图结构如下:
Source Source + + | | v v Rebalance HashPartition + + | | | | +------>Union<------+ + | v Split + | v Select + v Map + | v Sink
但是,在运行时将生成如下操作图,split/select/partitioning 等转换操作会被编码到边中,这个边连接 sources 和 map 操作:
Source Source
+ +
| |
| |
+------->Map<-------+
+
|
v
Sink
通过源码也可以发现,UnionTransformation、SplitTransformation、SelectTransformation、PartitionTransformation 由于不包含具体的操作,
所以都没有 StreamOperator 成员变量,而其他的 Transformation 子类基本都有。
一个 DataStream 就代表了同一种类型元素构成的数据流。通过对 DataStream 应用 map/filter 等操作,就可以将一个 DataStream 转换成另一个 DataStream 。
这个转换的过程就是根据不同的操作生成不同的 Transformation ,并将其加入到 StreamExecutionEnvironment 的 transformations 列表中。
DataStream 的子类包括 DataStreamSource、KeyedStream、IterativeStream、SingleOutputStreamOperator。
除了 DataStream 及其子类以外,其它的表征数据流的类还有 ConnectedStreams、WindowedStream、AllWindowedStream,这些会在后续的文章中陆续介绍。
DataStream 类中的重要属性和方法:
下面我们看下 map 操作是如何被添加进来的:
public class DataStream {
public <R> SingleOutputStreamOperator
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。