赞
踩
Maven 3.0.4 (or higher)
Java 11
进入flink下载页面
https://flink.apache.org/zh/downloads.html
笔者选择的版本是1.15.1
若不想打开页面,可以直接使用下载链接
https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz
文件大小 435.6MB 需要等待一段时间…
选择 Apache Flink 1.15.1 for Scala 2.12 下载
注:这篇文章写时最新版本是 Apache Flink 1.15.1
解压
$ tar -xzf flink-1.15.1-bin-scala_2.12.tgz
$ cd flink-1.15.1
启动
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
查看 flink 运行状态
http://localhost:8081/
能看到管理界面说明启动成功了
$ ./bin/stop-cluster.sh
运行以下程序时,Flink需是运行状态
直接使用指令创建maven项目(推荐)
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.15.0
-DgroupId=flink-project
-DartifactId=flink-project
-Dversion=0.1
-Dpackage=myflink
-DinteractiveMode=false
得到 flink-project/
如下
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>flink-project</groupId> <artifactId>flink-project</artifactId> <version>0.1</version> <packaging>jar</packaging> <name>Flink Quickstart Job</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.15.0</flink.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>2.17.1</log4j.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> --> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${target.java.version}</source> <target>${target.java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:flink-shaded-force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>org.apache.logging.log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>myflink.DataStreamJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> <version>1.0.0</version> <configuration> <lifecycleMappingMetadata> <pluginExecutions> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <versionRange>[3.1.1,)</versionRange> <goals> <goal>shade</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <versionRange>[3.1,)</versionRange> <goals> <goal>testCompile</goal> <goal>compile</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> </plugin> </plugins> </pluginManagement> </build> </project>
添加 一个简单的单词计数程序
WordCount.class
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author ximu * @date 2022/7/24 * @description */ public class WordCount { // // Program // public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new LineSplitter()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) //(i,1) (am,1) (chinese,1) .sum(1); // execute and print result counts.print(); } // // User Functions // /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). */ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
运行得到结果
如果运行报错
错误: 无法初始化主类 myflink.WordCount
修改pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
修改为
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
即可
添加 一个滑动窗口的单词计数程序
WindowWordCount.class
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
请勿直接运行,因为本程序监听了本地端口9999所输入的数据,所以在运行程序之前,需要先开启端口9999
在Terminal输入
nc -lk 9999
进入输入模式,此时启动程序 WindowWordCount
程序启动完成后
在Terminal输入一些数据
程序输出单词与出现的次数
笔者使用的是5s的滑窗,可根据参数调整时间
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
mvn clean package -Dmaven.test.skip=true
在flink目录下执行
bin/flink run -c ${类相对路径} ${Jar绝对路径}
如
bin/flink run -c myflink.WordCount /Users/ximu/Project/Java/flink-project/target/flink-project-0.1.jar
运行结果
也可在Flink控制台查看任务运行状态
好了,Flink的探索先告一段落了,欢迎给我留言~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。