当前位置:   article > 正文

Flink 流式数据处理(一): Hello Flink_org.apache.flink:force-shading

org.apache.flink:force-shading

Flink 是一个流式计算引擎。既支持实时的 Streaming 模式对进来的数据进行逐一处理,也适合对批量的数据做 Batch 处理。 一句话,对实时/离线的数据处理做到了批流合一。

Flink 对于数据和数据流做了非常好的抽象,在大数据处理里面得到非常广泛的应用。

一个典型的场景是对实时输入的数据做分析处理后, 得到分析的结果。

以接收从 Socket 传入的数据, 统计每5秒钟不同单词出现的次数为例, 分享如何开发第一个 Flink Job。 以下省去创建 Maven 项目的过程(建议使用 IntelliJ IDEA 创建 Java 应用,可以使用不收费的开源社区版本)。

1.Maven Project 中导入依赖包

在 Maven 项目的 pom.xml 中添加相应的。flink 的依赖,里面的 flink.version 需要替换成实际的版本,比如 1.13, 或者 1.5 。

注意下面的 Transformer 里面 指定了程序的入口类 WindowWordCount。 就是说达成 Jar 包之后,执行 Jar 包时会运行这个类。

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <flink.version>1.15.0</flink.version>
  4. <target.java.version>1.8</target.java.version>
  5. <maven.compiler.source>${target.java.version}</maven.compiler.source>
  6. <maven.compiler.target>${target.java.version}</maven.compiler.target>
  7. <log4j.version>2.12.1</log4j.version>
  8. </properties>
  9. <dependencies>
  10. <!-- Apache Flink dependencies -->
  11. <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-java</artifactId>
  15. <version>${flink.version}</version>
  16. <scope>provided</scope>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  21. <version>${flink.version}</version>
  22. <scope>provided</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  27. <version>${flink.version}</version>
  28. <scope>provided</scope>
  29. </dependency>
  30. <!-- Add connector dependencies here. They must be in the default scope (compile). -->
  31. <!-- Example:
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. -->
  38. <!-- Add logging framework, to produce console output when running in the IDE. -->
  39. <!-- These dependencies are excluded from the application JAR by default. -->
  40. <dependency>
  41. <groupId>org.apache.logging.log4j</groupId>
  42. <artifactId>log4j-slf4j-impl</artifactId>
  43. <version>${log4j.version}</version>
  44. <scope>runtime</scope>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.apache.logging.log4j</groupId>
  48. <artifactId>log4j-api</artifactId>
  49. <version>${log4j.version}</version>
  50. <scope>runtime</scope>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.apache.logging.log4j</groupId>
  54. <artifactId>log4j-core</artifactId>
  55. <version>${log4j.version}</version>
  56. <scope>runtime</scope>
  57. </dependency>
  58. </dependencies>
  59. <build>
  60. <plugins>
  61. <!-- Java Compiler -->
  62. <plugin>
  63. <groupId>org.apache.maven.plugins</groupId>
  64. <artifactId>maven-compiler-plugin</artifactId>
  65. <version>3.1</version>
  66. <configuration>
  67. <source>${target.java.version}</source>
  68. <target>${target.java.version}</target>
  69. </configuration>
  70. </plugin>
  71. <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
  72. <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
  73. <plugin>
  74. <groupId>org.apache.maven.plugins</groupId>
  75. <artifactId>maven-shade-plugin</artifactId>
  76. <version>3.1.1</version>
  77. <executions>
  78. <!-- Run shade goal on package phase -->
  79. <execution>
  80. <phase>package</phase>
  81. <goals>
  82. <goal>shade</goal>
  83. </goals>
  84. <configuration>
  85. <artifactSet>
  86. <excludes>
  87. <exclude>org.apache.flink:force-shading</exclude>
  88. <exclude>com.google.code.findbugs:jsr305</exclude>
  89. <exclude>org.slf4j:*</exclude>
  90. <exclude>org.apache.logging.log4j:*</exclude>
  91. </excludes>
  92. </artifactSet>
  93. <filters>
  94. <filter>
  95. <!-- Do not copy the signatures in the META-INF folder.
  96. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  97. <artifact>*:*</artifact>
  98. <excludes>
  99. <exclude>META-INF/*.SF</exclude>
  100. <exclude>META-INF/*.DSA</exclude>
  101. <exclude>META-INF/*.RSA</exclude>
  102. </excludes>
  103. </filter>
  104. </filters>
  105. <transformers>
  106. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  107. <mainClass>com.flink.demo.WindowWordCount</mainClass>
  108. </transformer>
  109. </transformers>
  110. </configuration>
  111. </execution>
  112. </executions>
  113. </plugin>
  114. </plugins>
  115. <pluginManagement>
  116. <plugins>
  117. </plugins>
  118. </pluginManagement>
  119. </build>

2.实现单词统计类

src/main/java/com/flink/demo 中添加一个 WindowWordCount.java 类。

  1. package com.flink.demo;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.util.Collector;
  10. /** 从 Socket 中读取数据(即单词),然后每 5s 统计一次所有单词出现的次数,然后输出。 */
  11. public class WindowWordCount {
  12. public static void main(String[] args) throws Exception {
  13. // 创建执行环境。执行环境可以用来定义任务属性(如并发度)、创建数据源以及启动任务。
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. // 定义 Source, 创建数据源,数据源的作用是从外部系统如 Kafka、Rabbit MQ 或日志服务等系统中接收数据,然后将数据传输到 Flink 任务中。
  16. // 这里从本地端口号为 9999 的 socket 中读取数据。env.socketTextStream 数据源默认是按行读取输入的数据。
  17. // 可以通过 netcat 来连接本地的 9999 端口,并发送数据过来。
  18. // nc -lk 9999, 然后按行输入数据,每一行回车之后(即 \n 字符), env.socketTextStream 就会读取这一行数据。
  19. DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
  20. // 定义 Source 的处理过程
  21. // Flink 提供了大量的 算子(operators) 用来处理数据,比如 Map、FlatMap、KeyBy、Reduce、Window 等等。
  22. // 这里使用 Flink 的 flatMap 算子处理输入的数据。使用了自定义的 WordSplitter 类将输入的一行数据按空格拆分为多个单词,每个单词的出现次数是1,把 (单词,1) 的 Tuple 元组保存到集合中,
  23. // 使用 flatMap 处理了输入的数据后,得到是输入的所有单词,以及出现次数1
  24. // 接下来对单词进行分组,这时就使用 flink 的 keyBy 算子,以元组的第一个属性 f0 作为 key 进行汇总统计。 value -> value.f0 是 lambda 函数的写法。实际上这里实现了 KeySelector.
  25. // 接着需要 5s 统计一次单词次数,这里用到 Flink 的窗口函数TumblingProcessingTimeWindows 来做窗口统计, 窗口按照 5s 的时间窗口来统计。
  26. // 最后,对每个窗口中每个分组的单词进行聚合,就是统计每一个 Window 中的数据。即计算出 5s 内每个单词出现的次数。
  27. DataStream<Tuple2<String, Integer>> dataStream = source
  28. .flatMap(new WordSplitter())
  29. .keyBy(value -> value.f0)
  30. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  31. .sum(1);
  32. // 通常我们会将数据输出到另一个外部系统,比如 Kafka、Rabbit MQ 或日志服务等。
  33. // 这里直接打印出结果 (代码运行到这里实际并没有真正执行,只是构建了执行图)
  34. dataStream.print();
  35. // Flink 任务只有在 execute() 被调用后,才会提交到集群
  36. env.execute("Window WordCount");
  37. }
  38. public static class WordSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  39. @Override
  40. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
  41. for (String word : sentence.split(" ")) {
  42. out.collect(new Tuple2<String, Integer>(word, 1));
  43. }
  44. }
  45. }
  46. }

3.运行应用

在 IDEA 的开发环境中可以直接运行 Flink 程序, 运行的时候如果出现这个错误:

java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

说明一些依赖的类没有找到。 原因在 pom.xml 中, flink-streaming-javaflink-clients 这两个依赖的 scope 是 provided,它们不需要被打包到 jar 中。这些依赖是 flink 运行时环境中的,因此不需要把它们打包到 jar 中。那么在 IDEA 中直接运行,如何加载这些 Jar 包?

解决这个报错的办法就是在  IntelliJ IDEA  运行配置中勾选 Include dependencies with "Provided" scope

然后控制台里面(Mac/Linux 的 Terminal,或者 Windows 的 Command 命令行中),使用 netcat 启动一个 Socket 输入流:

$ nc -lk 9999

如果是在 Windows 下面,需要自己安装 nc 命令行。

在 启动 nc 命令之后,按行输入文本即可。 每行回车之后,Flink 应用将收到输入的数据。统计每5秒钟收到的各个单词的出现次数。

4.启动 Flink 集群

接下来我们在本地启动 Flink 集群。如果你已经有可以使用的集群,就不用执行该步骤了。

启动本地的开发集群步骤可以参考:First steps | Apache Flink

下载 Flink 集群的压缩包,你可以在这个页面 Flink Downloads 找到,然后解压之后,执行启动命令即可:

./bin/start-cluster.sh

5.打包应用,部署到 Flink 集群中

mvn  clean package

编译成功后,会在 target 目录下生成 wordcount-0.0.1-SNAPSHOT.jar 文件

Flink 提供了命令行工具 bin/flink 来跟 flink 集群交互。

./bin/flink run  ~/workspace/wordcount/target/wordcount-0.0.1-SNAPSHOT.jar

任务启动之后,可以在 flink 的  logs 目录看到运行日志。 也可以通过 Web UI 来管理集群,在浏览器打开 localhost:8081 可以看到管理界面。

停止集群:任务运行结束后,你可以通过 bin/stop-cluster.sh 这个脚本来停止 flink 集群。

到这里 WordCount 应用开发并部署完成。

Flink 涉及的概念比较多,比如 Source, Sink, Transformer, 开发的模式也比较多,可以使用 DataSet/DataStream API,可以使用 Table API,可以直接写 SQL,前面的例子是用的 DataStream API ,部署的模式也比较多可以开发机器单机部署,也可以部署到 Kubernetes,或者 Yarn 之上。 如果你对这些有疑问,请关注后续博文。

参考:

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/594864
推荐阅读
相关标签
  

闽ICP备14008679号