赞
踩
Flink 是一个流式计算引擎。既支持实时的 Streaming 模式对进来的数据进行逐一处理,也适合对批量的数据做 Batch 处理。 一句话,对实时/离线的数据处理做到了批流合一。
Flink 对于数据和数据流做了非常好的抽象,在大数据处理里面得到非常广泛的应用。
一个典型的场景是对实时输入的数据做分析处理后, 得到分析的结果。
以接收从 Socket 传入的数据, 统计每5秒钟不同单词出现的次数为例, 分享如何开发第一个 Flink Job。 以下省去创建 Maven 项目的过程(建议使用 IntelliJ IDEA 创建 Java 应用,可以使用不收费的开源社区版本)。
在 Maven 项目的 pom.xml 中添加相应的。flink 的依赖,里面的 flink.version 需要替换成实际的版本,比如 1.13, 或者 1.5 。
注意下面的 Transformer 里面 指定了程序的入口类 WindowWordCount。 就是说达成 Jar 包之后,执行 Jar 包时会
运行这个类。
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.15.0</flink.version>
- <target.java.version>1.8</target.java.version>
-
- <maven.compiler.source>${target.java.version}</maven.compiler.source>
- <maven.compiler.target>${target.java.version}</maven.compiler.target>
-
- <log4j.version>2.12.1</log4j.version>
- </properties>
-
- <dependencies>
- <!-- Apache Flink dependencies -->
- <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- Add connector dependencies here. They must be in the default scope (compile). -->
-
- <!-- Example:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- -->
-
- <!-- Add logging framework, to produce console output when running in the IDE. -->
- <!-- These dependencies are excluded from the application JAR by default. -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${log4j.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j.version}</version>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
-
- <!-- Java Compiler -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${target.java.version}</source>
- <target>${target.java.version}</target>
- </configuration>
- </plugin>
-
- <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
- <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.1.1</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>org.apache.flink:force-shading</exclude>
- <exclude>com.google.code.findbugs:jsr305</exclude>
- <exclude>org.slf4j:*</exclude>
- <exclude>org.apache.logging.log4j:*</exclude>
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <!-- Do not copy the signatures in the META-INF folder.
- Otherwise, this might cause SecurityExceptions when using the JAR. -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.flink.demo.WindowWordCount</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- </plugins>
- </pluginManagement>
- </build>
在 src/main/java/com/flink/demo
中添加一个 WindowWordCount.java
类。
- package com.flink.demo;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
-
- /** 从 Socket 中读取数据(即单词),然后每 5s 统计一次所有单词出现的次数,然后输出。 */
- public class WindowWordCount {
-
- public static void main(String[] args) throws Exception {
-
- // 创建执行环境。执行环境可以用来定义任务属性(如并发度)、创建数据源以及启动任务。
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 定义 Source, 创建数据源,数据源的作用是从外部系统如 Kafka、Rabbit MQ 或日志服务等系统中接收数据,然后将数据传输到 Flink 任务中。
- // 这里从本地端口号为 9999 的 socket 中读取数据。env.socketTextStream 数据源默认是按行读取输入的数据。
- // 可以通过 netcat 来连接本地的 9999 端口,并发送数据过来。
- // nc -lk 9999, 然后按行输入数据,每一行回车之后(即 \n 字符), env.socketTextStream 就会读取这一行数据。
- DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
-
- // 定义 Source 的处理过程
- // Flink 提供了大量的 算子(operators) 用来处理数据,比如 Map、FlatMap、KeyBy、Reduce、Window 等等。
- // 这里使用 Flink 的 flatMap 算子处理输入的数据。使用了自定义的 WordSplitter 类将输入的一行数据按空格拆分为多个单词,每个单词的出现次数是1,把 (单词,1) 的 Tuple 元组保存到集合中,
- // 使用 flatMap 处理了输入的数据后,得到是输入的所有单词,以及出现次数1。
- // 接下来对单词进行分组,这时就使用 flink 的 keyBy 算子,以元组的第一个属性 f0 作为 key 进行汇总统计。 value -> value.f0 是 lambda 函数的写法。实际上这里实现了 KeySelector.
- // 接着需要 5s 统计一次单词次数,这里用到 Flink 的窗口函数TumblingProcessingTimeWindows 来做窗口统计, 窗口按照 5s 的时间窗口来统计。
- // 最后,对每个窗口中每个分组的单词进行聚合,就是统计每一个 Window 中的数据。即计算出 5s 内每个单词出现的次数。
- DataStream<Tuple2<String, Integer>> dataStream = source
- .flatMap(new WordSplitter())
- .keyBy(value -> value.f0)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .sum(1);
-
- // 通常我们会将数据输出到另一个外部系统,比如 Kafka、Rabbit MQ 或日志服务等。
- // 这里直接打印出结果 (代码运行到这里实际并没有真正执行,只是构建了执行图)
- dataStream.print();
-
- // Flink 任务只有在 execute() 被调用后,才会提交到集群
- env.execute("Window WordCount");
- }
-
- public static class WordSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
- for (String word : sentence.split(" ")) {
- out.collect(new Tuple2<String, Integer>(word, 1));
- }
- }
- }
-
- }
在 IDEA 的开发环境中可以直接运行 Flink 程序, 运行的时候如果出现这个错误:
java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
说明一些依赖的类没有找到。 原因在 pom.xml 中, flink-streaming-java
和 flink-clients
这两个依赖的 scope 是 provided
,它们不需要被打包到 jar 中。这些依赖是 flink 运行时环境中的,因此不需要把它们打包到 jar 中。那么在 IDEA 中直接运行,如何加载这些 Jar 包?
解决这个报错的办法就是在 IntelliJ IDEA 运行配置中勾选 Include dependencies with "Provided" scope
。
然后控制台里面(Mac/Linux 的 Terminal,或者 Windows 的 Command 命令行中),使用 netcat 启动一个 Socket 输入流:
$ nc -lk 9999
如果是在 Windows 下面,需要自己安装 nc 命令行。
在 启动 nc 命令之后,按行输入文本即可。 每行回车之后,Flink 应用将收到输入的数据。统计每5秒钟收到的各个单词的出现次数。
接下来我们在本地启动 Flink 集群。如果你已经有可以使用的集群,就不用执行该步骤了。
启动本地的开发集群步骤可以参考:First steps | Apache Flink
下载 Flink 集群的压缩包,你可以在这个页面 Flink Downloads 找到,然后解压之后,执行启动命令即可:
./bin/start-cluster.sh
mvn clean package
编译成功后,会在 target 目录下生成 wordcount-0.0.1-SNAPSHOT.jar
文件
Flink 提供了命令行工具 bin/flink 来跟 flink 集群交互。
./bin/flink run ~/workspace/wordcount/target/wordcount-0.0.1-SNAPSHOT.jar
任务启动之后,可以在 flink 的 logs 目录看到运行日志。 也可以通过 Web UI 来管理集群,在浏览器打开 localhost:8081
可以看到管理界面。
停止集群:任务运行结束后,你可以通过 bin/stop-cluster.sh 这个脚本来停止 flink 集群。
到这里 WordCount 应用开发并部署完成。
Flink 涉及的概念比较多,比如 Source, Sink, Transformer, 开发的模式也比较多,可以使用 DataSet/DataStream API,可以使用 Table API,可以直接写 SQL,前面的例子是用的 DataStream API ,部署的模式也比较多可以开发机器单机部署,也可以部署到 Kubernetes,或者 Yarn 之上。 如果你对这些有疑问,请关注后续博文。
参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。