赞
踩
创建空的 maven 工程
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.6.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.6.1</version> <scope>provided</scope> </dependency>
手工通过 socket实时产生一些单词,使用 flink 实时接收数据,对指定时间窗口内的数据进行聚合统计。
execution
出发执行程序;execute()
方法的时候才会真正出发执行程序;package com.tzb.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 滑动窗口计算 * 通过 socket 模拟产生数据 */ public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { // 获取端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); } catch (Exception e) { port = 9000; e.printStackTrace(); System.err.println("没有指定端口,使用默认的端口号9000"); } // 获取 flink 运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "master"; String delimiter = "\n"; // 连接 socket,获取数据 DataStreamSource<String> text = env.socketTextStream(hostname, port,delimiter); // a a c // a 2 // c 1 SingleOutputStreamOperator<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String s, Collector<WordWithCount> out) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))// 时间窗口大小是 2s , 间隔是 1s .sum("count");// 使用 sum 或者 reduce 都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count + b.count); } });*/ // 数据打印到控制台,设置并行度 windowCount.print().setParallelism(1); // 没有这一步,程序不执行 env.execute("socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
provided
只在编译时发挥作用,但是后期打包时要加上,这里运行时先注释掉package com.tzb.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 滑动窗口计算 * 通过 socket 模拟产生数据 */ public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception { // 获取端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); } catch (Exception e) { port = 9000; e.printStackTrace(); System.err.println("没有指定端口,使用默认的端口号9000"); } // 获取 flink 运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "master"; String delimiter = "\n"; // 连接 socket,获取数据 DataStreamSource<String> text = env.socketTextStream(hostname, port,delimiter); // a a c // a 2 // c 1 SingleOutputStreamOperator<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String s, Collector<WordWithCount> out) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))// 时间窗口大小是 2s , 间隔是 1s .sum("count");// 使用 sum 或者 reduce 都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count + b.count); } });*/ // 数据打印到控制台,设置并行度 windowCount.print().setParallelism(1); // 没有这一步,程序不执行 env.execute("socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
package com.tzb.scalademo import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.api.windowing.time.Time /** * 每隔1s统计最近2s内的数据 */ object SocketWindowWordCountScala { def main(args: Array[String]): Unit = { //获取端口号 val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("没有设置端口,使用默认端口9000") } 9000 } // 获取运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 连接 socket 获取输入数据 val text = env.socketTextStream("master", port, '\n') // 解析数据,分组,窗口,窗口计算 val windowCounts = text.flatMap(line => line.split("\\s")) .map(word => WordWithCount(word, 1)) .keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1)) //.sum("count") .reduce((a, b) => WordWithCount(a.word, a.count + b.count)) windowCounts.print().setParallelism(1) // 执行任务 env.execute("Socket window count") } case class WordWithCount(word: String, count: Long) }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。