赞
踩
https://archive.apache.org/dist/flink/flink-1.9.0/
我们选择下载flink-1.9.0-bin-scala_2.11.tgz 。从名字上可以看出,需要scala_2.11的环境。
如果没有,需要在下面地址下载
https://www.scala-lang.org/download/
参考下面文档进行安装
https://www.jianshu.com/p/d7c94372020c
flink支持如下环境
这个本质上也不需要安装flink,只要使用flink依赖就好
public static void main(String[] args) throws Exception { //定义socket的端口号 int port; try{ ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("没有指定port参数,使用默认值9000"); port = 9000; } //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n"); //计算数据 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word:splits) { out.collect(new WordWithCount(word,1L)); } } })//打平操作,把每行的单词转为<word,count>类型的数据 .keyBy("word")//针对相同的word数据进行分组 .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小 .sum("count"); //把数据打印到控制台 windowCount.print() .setParallelism(1);//使用一个并行度 //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("streaming word count"); } /** * 主要为了存储单词以及单词出现的次数 */ public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
这个其实不需要安装flink,只要在maven中引入flink的依赖就可以
public class LocalTestJob { public static void main(String[] args) throws Exception{ // initialize a new Collection-based execution environment final ExecutionEnvironment env =new CollectionEnvironment(); DataSet<String> users = env.fromCollection(Arrays.asList("AAA","BBB","CCC")); /* Data Set transformations ... */ // retrieve the resulting Tuple2 elements into a ArrayList. List<String> result =new ArrayList(); users.output(new LocalCollectionOutputFormat<String>(result)); // kick off execution. env.execute(); // Do some work with the resulting ArrayList (=Collection). for(String t : result){ System.err.println("Result = "+t); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。