赞
踩
之前已经记录了在Mac上安装Flink1.8
https://blog.csdn.net/zhangvalue/article/details/93166895
1️⃣、开始创建一个项目名为flink_begin的maven项目
2️⃣、pom.xml文件添加如下dependency:
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.8.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.8.0</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.8.0</version>
- </dependency>
- </dependencies>

wordcount.java 文件如下:
- /**
- * @ Author zhangsf
- * @CreateTime 2019-06-20 - 18:58
- */
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
-
-
- public class WordCount {
- public static void main(String[] args) throws Exception {
- //定义socket的端口号
- int port;
- try {
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
- port = parameterTool.getInt("port");
- } catch (Exception e) {
- System.err.println("指定port参数,默认值为9000");
- port = 9000;
- }
-
- //获取运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //连接socket获取输入的数据
- DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");
-
- //计算数据
- DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
- public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
- String[] splits = value.split("\\s");
- for (String word : splits) {
- out.collect(new WordWithCount(word, 1L));
- }
- }
- })//打平操作,把每行的单词转为<word,count>类型的数据
- .keyBy("word")//针对相同的word数据进行分组
- .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
- .sum("count");
-
- //把数据打印到控制台
- windowCount.print()
- .setParallelism(1);//使用一个并行度
- //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
- env.execute("streaming word count");
-
- }
-
- /**
- * 主要为了存储单词以及单词出现的次数
- */
- public static class WordWithCount {
- public String word;
- public long count;
-
- public WordWithCount() {
- }
-
- public WordWithCount(String word, long count) {
- this.word = word;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "WordWithCount{" +
- "word='" + word + '\'' +
- ", count=" + count +
- '}';
- }
- }
-
-
- }

此时需要将本机的终端上开一个端口号为9000的监听
nc -l 9000
准备就绪就可以开始run 起来了
可能会出现问题
java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream
https://blog.csdn.net/zhangvalue/article/details/93165357 解决
启动起来之后在
输入hello hello world world world world flink flink flink flink flink,回车。在IDEA的控制台会显示如下单词和词频的信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。