赞
踩
在大数据技术栈的探索中,我们曾讨论了离线计算的Spark,而当谈到实时计算,就不得不提Flink。本文将集中讨论Flink,旨在详尽展示其核心概念,从而助力你在大数据旅程中向前迈进。
值得注意的是,Flink和Spark有许多相似的概念。因此,在深入学习Flink之前,建议先浏览我之前关于Spark的文章,这将为你提供扎实的基础,并帮助在学习Flink时能更好地举一反三,加深对其理解。
话不多说,开启我们的Flink学习之旅。
在我们深入探讨Flink之前,首先要掌握一些流计算的基础概念。
事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流。
在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流
无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。
我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。
处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算,有界流所有数据可以被排序,所以并不需要有序摄取。
有界流处理通常被称为批处理。所以在Flink里批计算其实指的就是有界流。
Flink具有如下特点和优势:
Spark 和 Flink 在不同的应用领域上表现会有差别。
一般来说,Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。
在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大
另外,Spark Streaming中的流计算其实是微批计算,实时性不如Flink,还有一点很重要的是Spark Streaming不适合有状态的计算,得借助一些存储如:Redis,才能实现。而Flink天然支持有状态的计算。
Flink 本身提供了多层 API:
所有的 Flink 程序都可以归纳为由三部分构成:Source
、Transformation
和 Sink
。
Source 表示“源算子”,负责读取数据源。
Transformation 表示“转换算子”,利用各种算子进行处理加工。
Sink 表示“下沉算子”,负责数据的输出。
Source数据源会源源不断的产生数据,Transformation将产生的数据进行各种业务逻辑的数据处理,最终由Sink输出到外部(console、kafka、redis、DB…)。
所有基于Flink开发的程序都能够映射成一个Dataflows(数据流图):
当Source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流。
通过设置不同算子的并行度,比如 Source并行度设置为2 ,map也是2。代表会启动2个并行的线程来处理数据:
Flink是一个典型的Master-Slave架构,架构中包含了两个重要角色,分别是「JobManager」和「TaskManager」。
JobManager相当于是Master,TaskManager相当于是Slave。
在Flink中,JobManager负责整个Flink集群任务的调度以及资源的管理。它从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlot资源并命令TaskManager启动从客户端中获取的应用。
TaskManager则负责执行作业流的Task,并且缓存和交换数据流。
在TaskManager中资源调度的最小单位是Task slot。TaskManager中Task slot的数量表示并发处理Task的数量。
一台机器节点可以运行多个TaskManager,TaskManager工作期间会向JobManager发送心跳保持连接
Flink支持多种部署模式,包括本地模式、Standalone模式、YARN模式、Mesos模式和Kubernetes模式。
每种部署模式都有其优缺点,选择哪种部署模式取决于具体的应用场景和需求。
Flink 有三种不同的运行模式:Session、Per-Job 和 Application。
以上所述的部署环境可以与任何一种运行模式结合使用。例如,你可以在本地模式、Standalone 模式或 YARN 模式下运行 Session、Per-Job 或 Application 模式的 Flink 作业。
Flink在不同运行模型下的作业提交和执行流程大致如下:
Session 模式:
当你的作业完成运行后,该作业的JobManager会被停止,但是Flink集群(包括Dispatcher和其余的TaskManager)仍然处于运行状态,等待新的作业提交。这就是所谓的Session模式,它允许在同一个Flink集群上连续运行多个作业。
Per-Job 模式:
在Per-Job模式下,每个作业都有自己的资源隔离,互不干扰,资源利用率较高,但是如果作业数量大,则可能会因为每个作业都需要单独申请、释放资源导致效率较低。
Application 模式:
每个 Flink 应用都需要依赖一组 Flink 类库。Flink 应用至少需要依赖 Flink APIs。许多应用还会额外依赖连接器类库(比如 Kafka、Cassandra 等)。 当用户运行 Flink 应用时(无论是在 IDEA 环境下进行测试,还是部署在分布式环境下),运行时类库都必须可用。
开发工具:IntelliJ IDEA
以Java语言为例,配置开发Maven依赖:
<properties> <flink.version>1.13.6</flink.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
注意点:
provided
。compile
而不是 provided
。 否则 IntelliJ 不会添加这些依赖到 classpath,会导致应用运行时抛出 NoClassDefFountError
异常。添加打包插件:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
配置好开发环境之后我们来写一个简单的Flink程序。
需求:统计单词出现的次数
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为 1 env.setParallelism(1); // 构建输入数据流 DataStream<String> text = env.fromElements( "Hello World", "Hello Flink", "Hello Java"); // 对输入数据进行操作,包括分割、映射和聚合 DataStream<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) //keyBy(0) 操作按照元组的第一个字段(索引为0)进行分组。在这个例子中,这就表示根据每个单词(字符串)进行分组。 .keyBy(0) //sum(1) 是一个聚合操作,它对每个分组内的元素进行求和。在这个例子中,对元组的第二个字段(索引为1)进行求和,表示每个单词的出现次数。 .sum(1); // 输出结果 counts.print(); // 执行任务 env.execute("Flink Streaming Java WordCount"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // 规范化并分割行 String[] words = value.toLowerCase().split("\\W+"); // 为每个单词发出 Tuple2<String, Integer>(word, 1) for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } }
输出如下:
(hello,1)
(world,1)
(hello,2)
(flink,1)
(hello,3)
(java,1)
对代码简要解析一下:
这是一个基本的单词计数程序,它使用Apache Flink的流处理环境。这个程序读入一系列的字符串,然后把每个字符串分割成单词,对每个单词进行计数,并且输出计数结果。
在提供的例子中,有三个输入字符串:“Hello World”, “Hello Flink”, “Hello Java”,'Hello’这个单词出现了三次,其余单词 (‘World’, ‘Flink’, ‘Java’) 各出现了一次。
由于Flink是一个流处理框架,它实时地处理数据,所以它会在每一次遇到一个新的单词时就更新和输出计数。因此,每当 ‘Hello’ 出现时,都会更新和输出其计数。
对于这个例子:
这段代码已在本地运行和测试过,且相关部分已添加注释,大家可以实际运行感受一下。
特定算子的子任务(subtask)的个数称之为并行度(parallel),并行度是几,这个task内部就有几个subtask
怎样实现算子并行呢?其实也很简单,我们把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
整个流处理程序的并行度,理论上是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
如果我们将上面WordCount程序的并行度设置为3
env.setParallelism(3);
就会看到如下输出:
2> (world,1)
3> (flink,1)
1> (hello,1)
1> (hello,2)
1> (java,1)
1> (hello,3)
前面的数字代表线程,Flink会将相同的 key 分配到不同的 slot 进行处理。
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
代码中设置
setParallelism()
方法,来设置当前算子的并行度: stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。setParallelism()
方法,全局设定并行度:env.setParallelism(2);
这样代码中所有算子,默认的并行度就都为2了。提交应用时设置
在使用 flink run 命令提交应用时,可以增加 -p
参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置。如果我们直接在 Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:parallelism.default: 2
(初始值为 1)
这个设置对于整个集群上提交的所有作业有效。
-p
参数指定的并行度。-p
参数,那么采用集群配置文件中的默认并行度。这里需要说明的是,算子的并行度有时会受到自身具体实现的影响。比如读取 socket 文本流的算子 socketTextStream,它本身就是非并行的 Source 算子,所以无论怎么设置,它在运行时的并行度都是 1
在 Flink 中,Task 是一个阶段多个功能相同 subTask 的集合,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
要是之前学过Spark,这里可以用Spark的思想来看,Flink的Task就好比Spark中的Stage,而我们知道Spark的Stage是根据宽依赖来拆分的。所以我们也可以认为Flink的Task也是根据宽依赖拆分的(尽管Flink中并没有宽依赖的概念),这样会更好理解
如下图:
在Flink中,为了分布式执行,Flink会将算子子任务链接在一起形成任务。每个任务由一个线程执行。将算子链接在一起形成任务是一种有用的优化:它减少了线程间切换和缓冲的开销,并增加了整体吞吐量,同时降低了延迟
举个例子,假设我们有一个简单的Flink流处理程序,它从一个源读取数据,然后应用map
和filter
操作,最后将结果写入到一个接收器。这个程序可能看起来像这样:
DataStream<String> data = env.addSource(new CustomSource());
data.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
})
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("A");
}
})
.addSink(new CustomSink());
在这个例子中,map
和filter
操作可以被链接在一起形成一个任务,被优化为算子链,这意味着它们将在同一个线程中执行,而不是在不同的线程中执行并通过网络进行数据传输
Task Slots即是任务槽,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序,我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量: taskmanager.numberOfTaskSlots : 8
。
例如,如果 Task Manager 有2个 slot,那么它将为每个 slot 分配 50% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。
需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因
如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,在Flink中,可以通过在代码中使用slotSharingGroup
方法来设置slot共享组。
Flink会将具有相同slot共享组的操作放入同一个slot中,同时保持不具有slot共享组的操作在其他slot中。这可以用来隔离slot。
例如,你可以这样设置:
dataStream.map(...).slotSharingGroup("group1");
默认情况下,所有操作都被分配相同的SlotSharingGroup。
这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享,不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。
听了上面并行度和Slots的理论,可能还是有点疑惑,下面通过例子解释说明下:
假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示最多能并行执行9个任务。
假设我们写了一个WordCount程序,有四个转换算子:source —> flatMap —> reduce —> sink
当所有算子并行度相同时,很容易看出source和flatMap可以优化合并算子链,于是最终有三个任务节点:source & flatMap,reduce 和sink。
如果我们没有任何并行度设置,而配置文件中默认parallelism.default:1
,那么默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲
如图所示:
我们可以直接把并行度设置为 9,这样所有 3*9=27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。
另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。
根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections。Flink也提供了addSource方式,可以自定义数据源,下面介绍一些常用的数据源。
如果读取的是HDFS上的文件,那么需要导入Hadoop依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
代码示例:每隔10s去读取HDFS指定目录下的新增文件内容,并且进行WordCount。
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String filePath = "hdfs://node01:9000/flink/data/"; FileInputFormat<String> textInputFormat = new TextInputFormat(new Path(filePath)); //PROCESS_CONTINUOUSLY模式时,Flink会持续监视给定的路径,并在发现新数据时将其引入流中进行处理。 DataStream<String> textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000); DataStream<Tuple2<String, Integer>> result = textStream .flatMap(new WordSplitter()) .map(new WordMapper()) .keyBy(0) .sum(1); result.print(); env.execute(); } public static class WordSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } } public static class WordMapper implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> wordCountTuple) { //f0, f1 等是用来访问元组中的元素的字段。Tuple2<String, Integer> 表示这是一个大小为 2 的元组,其中 f0 是 String 类型,f1 是 Integer 类型。 // 在代码中,wordCountTuple.f0 表示的就是单词(即String类型的值),wordCountTuple.f1 则表示的是这个单词的计数(即 Integer 类型的值)。 return new Tuple2<>(wordCountTuple.f0, wordCountTuple.f1); } }
基于本地集合的数据源,一般用于测试场景,对于线上环境没有太大意义。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> data = Arrays.asList("hello word flink","hello java flink");
DataStream<String> text = env.fromCollection(data);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("WordCount Example");
}
接受Socket Server中的数据:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连接 socket 获取输入数据
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 解析数据、对数据进行分组、窗口处理和聚合计算
DataStream<Tuple2<String, Integer>> wordCount = text.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
wordCount.print();
env.execute("WordCount from Socket TextStream Example");
}
Flink想要接受Kafka中的数据,首先要配置flink与kafka的连接器依赖。
Maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
示例代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka的相关参数并从Kafka中读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(flinkKafkaConsumer);
// 对接收的每一行数据进行处理,分割出每个单词并初始化其数量为1
DataStream<Tuple2<String, Integer>> words = stream.flatMap(new Tokenizer());
DataStream<Tuple2<String, Integer>> wordCounts = words.keyBy(0).sum(1);
wordCounts.print().setParallelism(1);
env.execute("WordCountFromKafka");
}
Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以处理复杂的业务处理。
DataStream → DataStream
遍历数据流中的每一个元素,产生一个新的元素。
DataStream → DataStream
遍历数据流中的每一个元素,产生N个元素 N=0,1,2…。
DataStream → DataStream
过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉。
DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner。
指定分区字段的方式有三种:
根据索引号指定。
通过匿名函数来指定。
通过实现KeySelector接口 指定分区字段。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> stream = env.fromSequence(1, 100);
stream.map((MapFunction<Long, Tuple2<Long, Integer>>) (Long x) -> new Tuple2<>(x % 3, 1), TypeInformation.of(new TypeHint<Tuple2<Long, Integer>>() {}))
//根据索引号来指定分区字段:.keyBy(0)
//通过传入匿名函数 指定分区字段:.keyBy(x=>x._1)
//通过实现KeySelector接口 指定分区字段
.keyBy((KeySelector<Tuple2<Long, Integer>, Long>) (Tuple2<Long, Integer> value) -> value.f0, BasicTypeInfo.LONG_TYPE_INFO)
.sum(1).print();
env.execute("Flink Job");
}
适用于KeyedStream
KeyedStream:根据key分组 → DataStream
注意,reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env.fromElements( new Tuple2<>("apple", 3), new Tuple2<>("banana", 1), new Tuple2<>("apple", 5), new Tuple2<>("banana", 2), new Tuple2<>("apple", 4) ); // 使用reduce操作,将input中的所有元素合并到一起 DataStream<Tuple2<String, Integer>> result = dataStream .keyBy(0) .reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1)); result.print(); env.execute(); }
KeyedStream → DataStream
Aggregations代表的是一类聚合算子,上面说的reduce就属于Aggregations,以下是一些常用的:
sum()
: 计算数字类型字段的总和。min()
: 计算最小值。max()
: 计算最大值。count()
: 计数元素个数。avg()
: 计算平均值。另外,Flink 还支持自定义聚合函数,即使用 AggregateFunction
接口实现更复杂的聚合逻辑。
DataStream → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams
合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
注意:需要保证数据流中元素类型一致
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> ds1 = env.fromCollection(Arrays.asList(Tuple2.of("a",1),Tuple2.of("b",2),Tuple2.of("c",3)));
DataStream<Tuple2<String, Integer>> ds2 = env.fromCollection(Arrays.asList(Tuple2.of("d",4),Tuple2.of("e",5),Tuple2.of("f",6)));
DataStream<Tuple2<String, Integer>> ds3 = env.fromCollection(Arrays.asList(Tuple2.of("g",7),Tuple2.of("h",8)));
DataStream<Tuple2<String, Integer>> unionStream = ds1.union(ds2,ds3);
unionStream.print();
env.execute();
}
在 Flink 中,Union 操作被称为 “真合并” 是因为它将两个或多个数据流完全融合在一起,没有特定的顺序,并且不会去除重复项。这种操作方式类似于在数学概念中的集合联合(Union)操作,所以被称为 “真合并”。
请注意,与其他一些数据处理框架中的 Union 操作相比,例如 Spark 中的 Union 会根据某些条件去除重复的元素,Flink 的 Union 行为更接近于数学上的集合联合理论。
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> ds1 = env.socketTextStream("localhost", 8888); DataStream<String> ds2 = env.socketTextStream("localhost", 9999); DataStream<Tuple2<String, Integer>> wcStream1 = ds1 .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); DataStream<Tuple2<String, Integer>> wcStream2 = ds2 .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = wcStream1.connect(wcStream2); }
与union
不同,connect
只能连接两个流,并且这两个流的类型可以不同。connect
后的两个流会被看作是两个不同的流,可以使用CoMap
或者CoFlatMap
函数分别处理这两个流。
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具体算子名字,而是一类操作的名称
CoMap实现:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建两个不同的数据流 DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5); DataStream<String> text = env.fromElements("a", "b", "c"); // 连接两个数据流 ConnectedStreams<Integer, String> connected = nums.connect(text); // 使用 CoMap 处理连接的流 DataStream<String> result = connected.map(new CoMapFunction<Integer, String, String>() { @Override public String map1(Integer value) { return String.valueOf(value*2); } @Override public String map2(String value) { return "hello " + value; } }); result.print(); env.execute("CoMap example"); }
CoFlatMap实现方式:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> nums = env.fromElements(1, 2, 3, 4, 5); DataStream<String> text = env.fromElements("a", "b", "c"); ConnectedStreams<Integer, String> connected = nums.connect(text); DataStream<String> result = connected.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(String.valueOf(value*2)); out.collect(String.valueOf(value*3)); } @Override public void flatMap2(String value, Collector<String> out) { out.collect("hello " + value); out.collect("hi " + value); } }); result.print(); env.execute("CoFlatMap example"); }
DataStream → SplitStream
根据条件将一个流分成多个流,示例代码如下:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> data = env.generateSequence(0, 10);
SplitStream<Long> split = data.split((OutputSelector<Long>) value -> {
List<String> output = new ArrayList<>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
});
split.select("odd").print();
env.execute("Flink SplitStream Example");
}
select()
用于从SplitStream中选择一个或者多个数据流。
split.select("odd").print();
注意:在Flink 1.12 及之后的版本中,SplitStream 已经被弃用并移除,一般推荐使用 Side Outputs(侧输出流)来替代 Split和Select
示例代码如下:
private static final OutputTag<String> evenOutput = new OutputTag<String>("even"){}; private static final OutputTag<String> oddOutput = new OutputTag<String>("odd"){}; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = env.fromElements("1", "2", "3", "4", "5"); SingleOutputStreamOperator<String> processed = input.process(new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out){ int i = Integer.parseInt(value); if (i % 2 == 0) { ctx.output(evenOutput, value); } else { ctx.output(oddOutput, value); } } }); DataStream<String> evenStream = processed.getSideOutput(evenOutput); DataStream<String> oddStream = processed.getSideOutput(oddOutput); evenStream.print("even"); oddStream.print("odd"); env.execute("Side Output Example"); }
DataStream → IterativeStream → DataStream
Iterate算子提供了对数据流迭代的支持
一个数据集通过迭代运算符被划分为两部分:“反馈”部分(feedback)和“输出”部分(output)。反馈部分被反馈到迭代头(iteration head),从而形成下一次迭代。输出部分则构成该迭代的结果:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> input = env.fromElements(10L); // 定义迭代流,最大迭代10次 IterativeStream<Long> iteration = input.iterate(10000L); // 定义迭代逻辑 DataStream<Long> minusOne = iteration.map((MapFunction<Long, Long>) value -> value - 1); // 定义反馈流(满足条件继续迭代)和输出流(不满足条件的结果) DataStream<Long> stillGreaterThanZero = minusOne.filter(value -> value > 0).setParallelism(1);; DataStream<Long> lessThanZero = minusOne.filter(value -> value <= 0); // 关闭迭代,定义反馈流 iteration.closeWith(stillGreaterThanZero); // 打印结果 lessThanZero.print(); env.execute("Iterative Stream Example"); }
Apache Flink 中有两种类型的函数: 「普通函数(Regular Functions)」和 「富函数(Rich Functions)」。主要区别在于富函数相比普通函数提供了更多生命周期方法和上下文信息。
MapFunction
需要实现 map()
方法。它们没有生命周期方法,也不能访问执行环境的上下文。open()
和 close()
方法以便在函数启动前和关闭后做一些设置或清理工作。getRuntimeContext()
方法获取并行任务的信息,如当前子任务的索引等。简而言之,如果你需要在函数中使用 Flink 的高级功能,如状态管理或访问运行时上下文,则需要使用富函数。如果不需要这些功能,使用普通函数即可。
普通函数类 | 富函数类 |
---|---|
MapFunction | RichMapFunction |
FlatMapFunction | RichFlatMapFunction |
FilterFunction | RichFilterFunction |
… | … |
普通函数:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> words = Arrays.asList("hello", "world", "flink", "hello", "world"); env.fromCollection(words) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) { return new Tuple2<>(value, 1); } }) .keyBy(0) .sum(1) .print(); env.execute("Word Count Example"); }
富函数:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> words = Arrays.asList("hello", "world", "flink", "hello", "world"); env.fromCollection(words) .map(new RichMapFunction<String, Tuple2<String, Integer>>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 可以在这里设置相关的配置或者资源,如数据库连接等 } @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } @Override public void close() throws Exception { super.close(); // 可以在这里完成资源的清理工作 } }) .keyBy(0) .sum(1) .print(); env.execute("Word Count Example"); }
ProcessFunction属于低层次的API,在类继承关系上属于富函数。
我们前面讲的map
、filter
、flatMap
等算子都是基于这层封装出来的。
越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器等
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = env.socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) { return new Tuple2<>(value, 1); } }) .keyBy(0) .process(new AlertFunction()); dataStream.print(); env.execute("Process Function Example"); } public static class AlertFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, String> { private transient ValueState<Integer> countState; @Override public void open(Configuration config) { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>( "countState", // state name TypeInformation.of(new TypeHint<Integer>() {}), // type information 0); // default value countState = getRuntimeContext().getState(descriptor); } @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception { Integer currentCount = countState.value(); currentCount += 1; countState.update(currentCount); if (currentCount >= 3) { out.collect("Warning! The key '" + value.f0 + "' has been seen " + currentCount + " times."); } } }
这里,我们创建一个名为AlertFunction
的处理函数类,并继承KeyedProcessFunction
。其中,ValueState
用于保存状态信息,每个键会有其自己的状态实例。当计数达到或超过三次时,该系统将发出警告。这个例子主要展示了处理函数与其他运算符相比的两个优点:访问键控状态和生命周期管理方法(例如open()
)。
注意:上述示例假设你已经在本地的9999端口上设置了一个socket服务器,用于流式传输文本数据。如果没有,你需要替换这部分以适应你的输入源。
在Flink中,"Sink"是数据流计算的最后一步。它代表了一个输出端点,在那里计算结果被发送或存储。换句话说,Sink是数据流处理过程中的结束节点,负责将处理后的数据输出到外部系统,如数据库、文件、消息队列等。
Flink内置了大量Sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等。
Flink处理的数据可以存储到Redis中,以便实时查询。
首先,需要导入Flink和Redis的连接器依赖:
<!-- Flink Redis connector -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
<version>1.1.0</version>
</dependency>
下面的代码展示了"Word Count"(词频统计)操作,并将结果存储到Redis数据库中:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( "Hello World", "Hello Flink", "Hello Java"); DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build(); counts.addSink(new RedisSink<>(conf, new RedisExampleMapper())); env.execute("Word Count Example"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.toLowerCase().split("\\W+"); for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } } public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET); } @Override public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } }
处理结果写入到kafka topic中,Flink也是支持的,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同,之前添加过就不需要再添加了。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
还是用上面词频统计的例子:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( "Hello World", "Hello Flink", "Hello Java"); DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); // Define Kafka properties Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // Write the data stream to Kafka counts.map(new MapFunction<Tuple2<String,Integer>, String>() { @Override public String map(Tuple2<String,Integer> value) throws Exception { return value.f0 + "," + value.f1.toString(); } }) .addSink(new FlinkKafkaProducer<>("my-topic", new SimpleStringSchema(), properties)); env.execute("Word Count Example"); }
Flink处理结果写入到MySQL中,这并不是Flink默认支持的,需要添加MySQL的驱动依赖:
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
因为不是内嵌支持的,所以需要基于SinkFunction自定义Sink。
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( "Hello World", "Hello Flink", "Hello Java"); DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); // Transform the Tuple2<String, Integer> to a format acceptable by MySQL DataStream<String> mysqlData = counts.map(new MapFunction<Tuple2<String, Integer>, String>() { @Override public String map(Tuple2<String, Integer> value) throws Exception { return "'" + value.f0 + "'," + value.f1.toString(); } }); // Write the data stream to MySQL mysqlData.addSink(new MySqlSink()); env.execute("Word Count Example"); } public static class MySqlSink implements SinkFunction<String> { private Connection connection; private PreparedStatement preparedStatement; @Override public void invoke(String value, Context context) throws Exception { if(connection == null) { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "username", "password"); preparedStatement = connection.prepareStatement("INSERT INTO my_table(word, count) VALUES("+ value +")"); } preparedStatement.executeUpdate(); } } }
需要导入HBase的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.5.2</version>
</dependency>
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( "Hello World", "Hello Flink", "Hello Java"); DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); counts.addSink(new HBaseSink()); env.execute("Word Count Example"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.toLowerCase().split("\\W+"); for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } } public static class HBaseSink extends RichSinkFunction<Tuple2<String, Integer>> { private org.apache.hadoop.conf.Configuration config; private org.apache.hadoop.hbase.client.Connection connection; private Table table; @Override public void invoke(Tuple2<String, Integer> value, Context context) throws IOException { Put put = new Put(Bytes.toBytes(value.f0)); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(value.f1)); table.put(put); } @Override public void open(Configuration parameters) throws Exception { config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "localhost"); config.set("hbase.zookeeper.property.clientPort", "2181"); connection = ConnectionFactory.createConnection(config); table = connection.getTable(TableName.valueOf("my-table")); } @Override public void close() throws Exception { table.close(); connection.close(); } }
HBaseSink
类是RichSinkFunction
的实现,用于将结果写入HBase数据库。在invoke
方法中,它将接收到的每个二元组(单词和计数)写入HBase。在open
方法中,它创建了与HBase的连接,并指定了要写入的表。在close
方法中,它关闭了与HBase的连接和表。
在 Apache Flink 中,分区(Partitioning)是将数据流按照一定的规则划分成多个子数据流或分片,以便在不同的并行任务或算子中并行处理数据。分区是实现并行计算和数据流处理的基础机制。Flink 的分区决定了数据在作业中的流动方式,以及在并行任务之间如何分配和处理数据。
在 Flink 中,数据流可以看作是一个有向图,图中的节点代表算子(Operators),边代表数据流(Data Streams)。数据从源算子流向下游算子,这些算子可能并行地处理输入数据,而分区就是决定数据如何从一个算子传递到另一个算子的机制。
下面介绍Flink中常用的几种分区策略。
场景:增大分区、提高并行度,解决数据倾斜。
DataStream → DataStream
分区元素随机均匀分发到下游分区,网络开销比较大
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> stream = env.fromSequence(1, 10).setParallelism(1);
System.out.println(stream.getParallelism());
stream.shuffle().print();
env.execute();
}
输出结果:上游数据比较随意地分发到下游
1> 7
7> 1
2> 8
4> 5
8> 3
1> 9
8> 4
8> 10
6> 2
6> 6
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> stream = env.fromSequence(1, 10).setParallelism(1);
System.out.println(stream.getParallelism());
stream.rebalance().print();
env.execute();
}
输出:上游数据比较均匀的分发到下游
2> 2
1> 1
8> 8
5> 5
7> 7
4> 4
3> 3
6> 6
1> 9
2> 10
场景:减少分区,防止发生大量的网络传输,不会发生全量的重分区
DataStream → DataStream
通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
和其他重分区策略(如 rebalance、forward、broadcast 等)不同的是,rescale 在运行时不会改变并行度,而且它只在本地(同一个 TaskManager 内)进行数据交换,所以它比其他重分区策略更加高效
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = env.fromElements("1", "2", "3", "4", "5"); // 使用MapFunction将元素转换为整数类型 DataStream<Integer> intStream = dataStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } }); // 使用rescale()进行重分区 DataStream<Integer> rescaledStream = intStream.rescale(); rescaledStream.print(); env.execute("Rescale Example"); }
在这个例子中,我们创建了一个字符串类型的DataStream然后通过map()
将每一个元素转换为整数。然后,我们对结果DataStream应用rescale()
操作来重分区数据。
值得注意的是,rescale()
的实际影响取决于你的并行度和集群环境,如果不同的并行实例都在同一台机器上,或者并行度只有1,那么可能不会看到rescale()
的效果。而在大规模并行处理的情况下,使用rescale()
操作可以提高数据处理的效率。
此外,我们不能直接在打印结果中看到rescale
的影响,因为它改变的是内部数据分布和处理方式,而不是输出的结果。如果想观察rescale
的作用,需要通过Flink的Web UI或者日志来查看任务执行情况,如数据流的分布、各个子任务的运行状态等信息。
场景:需要使用映射表、并且映射表会经常发生变动的场景
DataStream → DataStream
上游中每一个元素内容广播到下游每一个分区中
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5); DataStream<String> broadcastStream = env.fromElements("2", "4"); MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); BroadcastStream<String> broadcastData = broadcastStream.broadcast(descriptor); dataStream.connect(broadcastData) .process(new BroadcastProcessFunction<Integer, String, String>() { @Override public void processElement(Integer value, ReadOnlyContext ctx, Collector<String> out) throws Exception { if (ctx.getBroadcastState(descriptor).contains(String.valueOf(value))) { out.collect("Value " + value + " matches with a broadcasted rule"); } } @Override public void processBroadcastElement(String rule, Context ctx, Collector<String> out) throws Exception { ctx.getBroadcastState(descriptor).put(rule, rule); } }).print(); env.execute("Broadcast State Example"); }
上述代码首先定义了一个主流和一个要广播的流。然后,我们创建了一个MapStateDescriptor
,用于存储广播数据。接着,我们将广播流转换为BroadcastStream
。
最后,我们使用connect()
方法连接主流和广播流,并执行process()
方法。在这个process()
方法中,我们定义了两个处理函数:processElement()
和processBroadcastElement()
。processElement()
用于处理主流中的每个元素,并检查该元素是否存在于广播状态中。如果是,则输出一个字符串,表明匹配成功。而processBroadcastElement()
则用于处理广播流中的每个元素,并将其添加到广播状态中。
注意:在分布式计算环境中,每个并行实例都会接收广播流中的所有元素。因此,广播状态对于所有的并行实例都是一样的。不过,在Flink 1.13版本中,广播状态尚未在故障恢复中提供完全的保障。所以在事件出现故障时,广播状态可能会丢失数据。
场景:并行度降为1
DataStream → DataStream
在 Apache Flink 中,Global 分区策略意味着所有数据都被发送到下游算子的同一个分区中。这种情况下,下游算子只有一个任务处理全部数据。这是一种特殊的分区策略,只有在下游算子能够很快地处理所有数据,或者需要全局排序或全局聚合时才会使用。
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建一个从1到100的数字流 DataStream<Long> numberStream = env.fromSequence(1, 100); // 对流应用 map function DataStream<Long> result = numberStream.global() .map(new MapFunction<Long, Long>() { @Override public Long map(Long value) { System.out.println("Processing " + value); return value * 2; } }); result.print(); env.execute("Global Partition Example"); }
以上代码创建了一个顺序生成 1-100 的数字流,并应用了 Global Partition,然后对每个数字进行乘2的操作。实际运行此代码时,你会观察到所有的数字都由同一任务处理,打印出来的处理顺序是连续的。这就是 Global Partition 的作用:所有数据都被发送到下游算子的同一实例进行处理。
需要注意的是,此示例只是为了演示 Global Partition 的工作原理,实际上并不推荐在负载均衡很重要的应用场景中使用这种分区策略,因为它可能导致严重的性能问题。
场景:一对一的数据分发,默认的分区策略,数据在各个算子之间不会重新分配。map、flatMap、filter 等都是这种分区策略
DataStream → DataStream
上游分区数据分发到下游对应分区中
partition1->partition1;partition2->partition2
注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:
Forward partitioning does not allow change of parallelism. Upstream operation: Source: Socket Stream-1 parallelism: 1, downstream operation: Map-3 parallelism: 8 You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).setParallelism(1);
DataStream<Integer> forwardStream = dataStream.forward().map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
}).setParallelism(1);
forwardStream.print();
env.execute("Flink Forward Example");
}
此代码首先创建一个从1到10的数据流。然后,它使用 Forward 策略将这个数据流送入一个 MapFunction 中,该函数将每个数字平方。然后,它打印出结果。注意:以上代码中的forward调用实际上并没有改变任何分区策略,因为forward是默认分区策略。这里添加forward调用主要是为了说明其存在和使用方法。
场景:与业务场景匹配
DataStream → DataStream
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区
MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Integer, Integer>> dataStream = env.fromElements( new Tuple2<>(1, 3), new Tuple2<>(1, 5), new Tuple2<>(2, 4), new Tuple2<>(2, 6), new Tuple2<>(3, 7) ); // 使用 keyBy 对流进行分区操作 DataStream<Tuple2<Integer, Integer>> keyedStream = dataStream .keyBy(0) // 根据元组的第一个字段进行分区 .sum(1); // 对每个键对应的第二个字段求和 keyedStream.print(); env.execute("KeyBy example"); }
以上程序首先创建了一个包含五个元组的流,然后使用 keyBy
方法根据元组的第一个字段进行分区,并对每个键对应的第二个字段求和。执行结果中,每个键的值集合都被映射成了一个新的元组,其第一个字段是键,第二个字段是相应的和。
注意:在以上代码中,keyBy(0)
表示根据元组的第一个字段(索引从0开始)进行分区操作。另外,无论什么情况,都需要确保你的 Flink 集群是正常运行的,否则程序可能无法执行成功。
DataStream → DataStream
通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> data = env.fromElements(1,2,3,4,5,6,7,8,9,10);
// 使用自定义分区器进行分区
data.partitionCustom(new MyPartitioner(), i -> i).print();
env.execute("Custom partition example");
}
public static class MyPartitioner implements Partitioner<Integer> {
@Override
public int partition(Integer key, int numPartitions) {
return key % numPartitions;
}
}
这个程序将创建一个数据流,其中包含从1到10的整数。然后,它使用了一个自定义的分区器MyPartitioner
来对这个数据流进行分区。这个分区器根据元素的值对numPartitions
取模来决定数据去到哪个分区。
由于篇幅限制,我们将在此结束本篇内容。稍微整理一下,下篇马上发。 。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。