赞
踩
首先,创建一个 Maven 项目,在pom.xml中增加所需的 Flink 依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.1</version>
</dependency>
</dependencies>
创建一个WordCount.java文件:
package com.flink;
public class WordCount {
public static void main(String[] args) throws Exception {
}
}
接着第一步是创建一个执行环境 ExecutionEnvironment类,用来设置参数和创建数据源以及提交任务的操作:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
下一步创建一个数据集DataSet,存放的类型是String类型,并初始化了三个英文句子:
DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese");
我们把数据再转化一下为Tuple2类型的数据,TupleN: 代表有N个元素,通过查看源码可以看到在flink-core-1.12.1.jar中,N的最大值为25。这里Tuple2第一个元素是String类型,用来存放单词,第二个元素是Integer类型,表示出现次数。goupBy(0)表示按第一个元素分组,sum(1)表示将第二个元素加起来。最后实现一个 flatMap 类来做解析字符串的工作,如下所示:
DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
定义一个实现FlatMap函数相关的类来实现解析字符串,把字符串按照空格分割开,然后把每个单词次数计数一次,装配进Colloector,以提供给程序最后分组及计算。
static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word: string.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word,1));
}
}
}
完整的程序如下,可以直接执行main函数,在控制台可以看到打印出来的结果。
package com.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements("a chinese", "china", "i am chinese");
DataSet<Tuple2<String, Integer>> ds=text.flatMap(new LineSplitter()).groupBy(0).sum(1);
// 输出数据到目的端
ds.print();
}
static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word:string.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word,1));
}
}
}
}
程序运行结果:
此程序连接到服务器Socket读取字符串作为数据源,在这里我们使用netcat工具作为服务器Socket进行测试。
首先定义一个数据流,读取字符串类型的数据,以换行符为一次输入结果:
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
我们定义一个解析类,用来存放解析过程中的结果,变量word存放解析出来的单词,count存放单词的统计个数:
public static class WordWithCount {
public String word;
public long count;
}
然后通过flatMap来解析源数据,使用keyBy函数按照WordWithCount 中word的值进行分组,并定义以每隔5秒为一个处理时间窗口,统计5秒内输入的单词的个数,最后使用reduce方法把筛选出相同的单词,把他们的count值相加,然后返回传递给下次调用,完整的代码如下:
package com.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。