赞
踩
Flink项目依赖于Jdk8和maven环境,我们可以通过flink官方提供的maven archetype模板快速生成第一个项目
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.11.0 \
-DgroupId=first-flink-project \
-DartifactId=first-flink-project \
-Dversion=0.1 \
-Dpackage=com.first.flink \
-DinteractiveMode=false \
-DarchetypeCatalog=internal
mvn archetype:generate 构建模板项目速度慢得令人发指,增加-X显示debug级别的调试信息
[INFO] Generating project in Batch mode
[DEBUG] Searching for remote catalog: https://repo.maven.apache.org/maven2/archetype-catalog.xml
加上-DarchetypeCatalog=internal 运行参数,archetype-catalog.xml从本地获取就能解决这个问题
通过上诉命令生成的项目结构如下:
192:first-flink-project root$ tree
.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── first
│ └── flink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j2.properties
7 directories, 4 files
pom文件中已包含flink项目所需版本的依赖Jar包,打开Intellij IEDA编辑器根据提示导入项目:
在第一个项目中我们以FLINK提供的wordcount为例,可以学习到Flink 核心API提供的流式Stream和批式Batch编程的基本结构。
在目录 src/main/com.first.flink.examples 下创建 WordCountBatchByJava 类,首先创建执行环境ExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism等
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
下一步创建数据源,Flink 提供了DataSet API 用于处理批量数据,
DataSet<String> text = env.fromElements("i live flink", "i love java", "i love scala");
接下来将DataSet通过flink提供的各种算子如flatMap、grouby、sum等进行转换
DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
LineSplitter 实现了 Flink 提供的 FlatMapFunction 接口。本例中我们统计每个单词出现的次数,因此需要将输入的字符串解析成单词和次数(Tuple2<String, Integer>),Tuple2 的第一个字段是单词,第二个字段是次数。通过 flatMap 算子来解析是因为输入的每一行字符串可能包含多个单词,字符串按照空格(这里比较简单)进行分解。接着grouBy(0)算子按照第一个字段(0是索引)也就是单词进行分组,sum(1)是按照第二个字段求和统计次数
static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
最后将数据输出到目的地控制台
ds.print();
输出结果如下:
(scala,1)
(flink,1)
(love,2)
(live,1)
(i,3)
(java,1)
在目录 src/main/com.first.flink.examples 下创建 WordCountStreamByJava 类,首先创建流式Stream任务的执行环境StreamExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism、容错机制配置、checkpoint配置等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
下一步创建流式数据源,Flink 提供了DataStream API 用于处理流式数据,这里我们设置socket数据源,从9999端口读取数据,每一条数据以 "\n"为结尾
DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999, "\n"); // 转化处理数据 DataStream<WordWithCount> dataStream = source .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String line, Collector<WordWithCount> collector) throws Exception { for (String word : line.split(" ")) { collector.collect(new WordWithCount(word, 1)); } } }).keyBy(event -> event.getWord())//以key分组统计 .timeWindow(Time.seconds(5))//设置一个窗口函数,模拟数据流动 .sum("count");//计算时间窗口内的词语个数 // 输出数据到目的端 dataStream.print(); // 执行任务操作 env.execute("Flink Streaming Word Count By Java"); public static class WordWithCount { public String word; public int count; //省略getter/setter
keyBy(0)按照WordWithCount类的word字段也就是单词字段进行分组,timeWindow用于设置了一个2S的固定窗口,sum(“count”)聚合函数对单词出现的次数进行求和。
最后,输出的结果是每隔5秒一次性输出5秒内每个单词出现的次数。
如果要运行程序,首先要通过netcat监听9999端口:nc -lk 9999。然后执行main函数后,在监听的端口上输入:
i love java
输出如下结果:
2> WordWithCount{word='i', count=1}
6> WordWithCount{word='love', count=1}
2> WordWithCount{word='java', count=1}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。