赞
踩
入门级程序
public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { //获取 flink 的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String host="localhost"; int port=9000; String delimiter="\n"; // DataStreamSource<String> text = env.socketTextStream(host, port, delimiter); SingleOutputStreamOperator<WordCount> wcs = text.flatMap(new FlatMapFunction<String, WordCount>() { @Override public void flatMap(String value, Collector<WordCount> out) throws Exception { String[] vs = value.split("\\s"); for (String word : vs) { out.collect(new WordCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1)) .sum("count"); wcs.print().setParallelism(1); //下面这行很重要,不然程序不会执行 env.execute(" word count..."); } @Data @NoArgsConstructor public static class WordCount{ String word; long count; public WordCount(String word, long count) { this.word = word; this.count = count; } } }
1.在终端输入 nc -l 9000
2.启动程序
3.在终端输入单词
如何将程序打包,放到 flink 运行呢?
1.打包
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>2.7</version> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifestEntries> <Main-Class>frame.flink.learn.SocketWindowWordCountJava</Main-Class> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build>
2.启动 flink 程序
下载 flink 程序包。flink-1.6.1-bin-hadoop27-scala_2.11.tgz,然后解压
进入到 bin 目录 运行下面的命令:
./start-cluster.sh
./flink run ~/Desktop/hadoop-stream-frame-0.0.1-SNAPSHOT.jar --port 9000
3.查看运行结果
在终端输入文字以后,运行下面的命令 查看 日志:
tail -500f …/log/flink-yuzhihao-taskexecutor-0-xxx.out
也可以在 Flink web 页面查看任务运行状态:
http://localhost:8081/#/running-jobs
也可以上传 jar 包 到 flink
查看运行结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。