赞
踩
需求描述:每隔5秒,计算最近10秒单词出现的次数。
/*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] fields = line.split(","); for (String word : fields) { out.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1); result.print().setParallelism(1); env.execute("TimeWindowWordCount"); } }
/*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream = env.socketTextStream("hadoop1", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] fields = line.split(","); for (String word : fields) { out.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .process(new SumProcessWindowFunction()); result.print().setParallelism(1); env.execute("TimeWindowWordCount"); } /** * IN, OUT, KEY, W * IN:输入的数据类型 * OUT:输出的数据类型 * Key:key的数据类型(在Flink里面,String用Tuple表示) * W:Window的数据类型 */ public static class SumProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeW indow> { FastDateFormat dataFormat = FastDateFormat.getInstance("HH:mm:ss"); /*** 当一个window触发计算的时候会调用这个方法 * @param tuple key * @param context operator的上下文 * @param elements 指定window的所有元素 * @param out 用户输出 */ @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) { System.out.println("当天系统的时 间:"+dataFormat.format(System.currentTimeMillis())); System.out.println("Window的处理时 间:"+dataFormat.format(context.currentProcessingTime())); System.out.println("Window的开始时 间:"+dataFormat.format(context.window().getStart())); System.out.println("Window的结束时 间:"+dataFormat.format(context.window().getEnd())); int sum = 0; for (Tuple2<String, Integer> ele : elements) { sum += 1; } // 输出单词出现的次数 out.collect(Tuple2.of(tuple.getField(0), sum)); } } }
先输入:
hive
然后输入hive,hbase
输出结果:
当天系统的时间:15:10:30 Window的处理时间:15:10:30 Window的开始时间:15:10:20 Window的结束时间:15:10:30 (hive,1) 当天系统的时间:15:10:35 Window的处理时间:15:10:35 Window的开始时间:15:10:25 Window的结束时间:15:10:35 当天系统的时间:15:10:35 Window的处理时间:15:10:35 Window的开始时间:15:10:25 Window的结束时间:15:10:35 (hbase,1) (hive,1)
根据每隔5秒执行最近10秒的数据,Flink划分的窗口
[00:00:00, 00:00:05) [00:00:05, 00:00:10) [00:00:10, 00:00:15) [00:00:15, 00:00:20) [00:00:20, 00:00:25) [00:00:25, 00:00:30) [00:00:30, 00:00:35) [00:00:35, 00:00:40) [00:00:40, 00:00:45) [00:00:45, 00:00:50) [00:00:50, 00:00:55) [00:00:55, 00:01:00) [00:01:00, 00:01:05) …
针对stream数据中的时间,可以分为以下三种:
日志数据:
Event Time:事件产生的时间,它通常由事件中的时间戳描述。
Ingestion time:事件进入Flink的时间(一般不用)
Processing Time:事件被处理时当前系统的时间
案例演示:
原始日志如下
2020-11-11 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
这条数据进入Flink的时间是2020-11-11 20:00:00,102
到达window处理的时间为2020-11-11 20:00:01,100
2020-11-11 10:00:01,134 是Event time
2020-11-11 20:00:00,102 是Ingestion time
2020-11-11 20:00:01,100 是Processing time
思考:
如果我们想要统计每分钟内接口调用失败的错误日志个数,使用哪个时间才有意义?
需求:每隔5秒计算最近10秒的单词出现的次数(接口调用出错的次数)
自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件
/*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStream = env.addSource(new TestSouce()); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] fields = line.split(","); for (String word : fields) { out.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .process(new SumProcessWindowFunction()); result.print().setParallelism(1); env.execute("TimeWindowWordCount"); } public static class TestSouce implements SourceFunction<String>{ FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); @Override public void run(SourceContext<String> ctx) throws Exception { // 控制大约在 10 秒的倍数的时间点发送事件 String currTime = String.valueOf(System.currentTimeMillis()); while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) { currTime = String.valueOf(System.currentTimeMillis()); continue; }System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis())); // 第 13 秒发送两个事件 TimeUnit.SECONDS.sleep(13); ctx.collect("hadoop," + System.currentTimeMillis()); // 产生了一个事件,但是由于网络原因,事件没有发送 ctx.collect("hadoop," + System.currentTimeMillis()); // 第 16 秒发送一个事件 TimeUnit.SECONDS.sleep(3); ctx.collect("hadoop," + System.currentTimeMillis()); TimeUnit.SECONDS.sleep(300); } @Override public void cancel() { } } /** * IN, OUT, KEY, W * IN:输入的数据类型 * OUT:输出的数据类型 * Key:key的数据类型(在Flink里面,String用Tuple表示) * W:Window的数据类型 */ * public static class SumProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeW indow> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); /** * 当一个window触发计算的时候会调用这个方法 * @param tuple key * @param context operator的上下文 * @param elements 指定window的所有元素 * @param out 用户输出 */ @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) { // System.out.println("当天系统的时 间:"+dateFormat.format(System.currentTimeMillis())); // // System.out.println("Window的处理时 间:"+dateFormat.format(context.currentProcessingTime())); // System.out.println("Window的开始时 间:"+dateFormat.format(context.window().getStart())); // System.out.println("Window的结束时 间:"+dateFormat.format(context.window().getEnd())); int sum = 0; for (Tuple2<String, Integer> ele : elements) { sum += 1; } // 输出单词出现的次数 out.collect(Tuple2.of(tuple.getField(0), sum)); } } }
输出结果:
开始发送事件的时间:16:16:40
(hadoop,2)
(hadoop,3)
(hadoop,1)
接口的调用失败的次数(时间段)
自定义source,模拟:正常情况下第 13 秒的时候连续发送 2 个事件,但是有一个事件确实在第13秒的
发送出去了,另外一个事件因为某种原因在19秒的时候才发送出去,第 16 秒的时候再发送 1 个事件
/*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStream = env.addSource(new TestSouce()); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] fields = line.split(","); for (String word : fields) { out.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .process(new SumProcessWindowFunction()); result.print().setParallelism(1); env.execute("TimeWindowWordCount"); } /*** 模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件 */ public static class TestSouce implements SourceFunction<String>{ FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); @Override public void run(SourceContext<String> ctx) throws Exception { // 控制大约在 10 秒的倍数的时间点发送事件 String currTime = String.valueOf(System.currentTimeMillis()); while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) { currTime = String.valueOf(System.currentTimeMillis()); continue; } System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis())); // 第 13 秒发送两个事件 TimeUnit.SECONDS.sleep(3); ctx.collect("hadoop," + System.currentTimeMillis()); // 产生了一个事件,但是由于网络原因,事件没有发送 String event = "hadoop," + System.currentTimeMillis(); // 第 16 秒发送一个事件 TimeUnit.SECONDS.sleep(3); ctx.collect("hadoop," + System.currentTimeMillis()); // 第 19 秒的时候发送 TimeUnit.SECONDS.sleep(3); ctx.collect(event); TimeUnit.SECONDS.sleep(300); } @Override public void cancel() { } } /** * IN, OUT, KEY, W * IN:输入的数据类型 * OUT:输出的数据类型 * Key:key的数据类型(在Flink里面,String用Tuple表示) * W:Window的数据类型 */ public static class SumProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeW indow> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); /** * 当一个window触发计算的时候会调用这个方法 * @param tuple key * @param context operator的上下文 * @param elements 指定window的所有元素 * @param out 用户输出 */ * @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) { // System.out.println("当天系统的时 间:"+dateFormat.format(System.currentTimeMillis())); // // System.out.println("Window的处理时间:"+dateFormat.format(context.currentProcessingTime())); // System.out.println("Window的开始时 间:"+dateFormat.format(context.window().getStart())); // System.out.println("Window的结束时 间:"+dateFormat.format(context.window().getEnd())); int sum = 0; for (Tuple2<String, Integer> ele : elements) { sum += 1; } // 输出单词出现的次数 out.collect(Tuple2.of(tuple.getField(0), sum)); } } }
处理结果:
开始发送事件的时间:16:18:50
(hadoop,1)
(1573287543001,1)
(1573287543001,1)
(hadoop,3)
(1573287546016,1)
(1573287543016,1)
(1573287546016,1)
(hadoop,2)
(1573287543016,1)
使用Event Time处理 /*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //步骤一:设置时间类型 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource<String> dataStream = env.addSource(new TestSouce()); dataStream.map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(String line) throws Exception { String[] fields = line.split(","); return new Tuple2<>(fields[0],Long.valueOf(fields[1])); } //步骤二:获取数据里面的event Time }).assignTimestampsAndWatermarks(new EventTimeExtractor() ) .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .process(new SumProcessWindowFunction()) .print().setParallelism(1); env.execute("TimeWindowWordCount"); } public static class TestSouce implements SourceFunction<String>{ FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); @Override public void run(SourceContext<String> ctx) throws Exception { // 控制大约在 10 秒的倍数的时间点发送事件 String currTime = String.valueOf(System.currentTimeMillis()); while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) { currTime = String.valueOf(System.currentTimeMillis()); continue; } System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis())); // 第 13 秒发送两个事件 TimeUnit.SECONDS.sleep(13); ctx.collect("hadoop," + System.currentTimeMillis()); // 产生了一个事件,但是由于网络原因,事件没有发送 String event = "hadoop," + System.currentTimeMillis(); // 第 16 秒发送一个事件 TimeUnit.SECONDS.sleep(3); ctx.collect("hadoop," + System.currentTimeMillis()); // 第 19 秒的时候发送 TimeUnit.SECONDS.sleep(3); ctx.collect(event); TimeUnit.SECONDS.sleep(300); } @Override public void cancel() { } } /** * IN, OUT, KEY, W * * IN:输入的数据类型 * * OUT:输出的数据类型 * * Key:key的数据类型(在Flink里面,String用Tuple表示) * * W:Window的数据类型 */ public static class SumProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWind ow> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); /** * 当一个window触发计算的时候会调用这个方法 * @param tuple key * @param context operator的上下文 * @param elements 指定window的所有元素 * @param out 用户输出 */ @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Integer>> out) { // System.out.println("当天系统的时 间:"+dateFormat.format(System.currentTimeMillis())); // // System.out.println("Window的处理时 间:"+dateFormat.format(context.currentProcessingTime())); // System.out.println("Window的开始时 间:"+dateFormat.format(context.window().getStart())); // System.out.println("Window的结束时 间:"+dateFormat.format(context.window().getEnd())); int sum = 0; for (Tuple2<String, Long> ele : elements) { sum += 1; } // 输出单词出现的次数 out.collect(Tuple2.of(tuple.getField(0), sum)); } } private static class EventTimeExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); // 拿到每一个事件的 Event Time @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { return element.f1; } @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis()); } } }
计算结果:
开始发送事件的时间:16:44:10
(hadoop,1)
(hadoop,3)
(hadoop,1)
现在我们第三个window的结果已经计算准确了,但是我们还是没有彻底的解决问题。接下来就需要我
们使用WaterMark机制来解决了。
/*** 每隔5秒计算最近10秒单词出现的次数 */ public class TimeWindowWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //步骤一:设置时间类型 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource<String> dataStream = env.addSource(new TestSouce()); dataStream.map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(String line) throws Exception { String[] fields = line.split(","); return new Tuple2<>(fields[0],Long.valueOf(fields[1])); } //步骤二:获取数据里面的event Time }).assignTimestampsAndWatermarks(new EventTimeExtractor() ) .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .process(new SumProcessWindowFunction()) .print().setParallelism(1); env.execute("TimeWindowWordCount"); } public static class TestSouce implements SourceFunction<String>{ FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); @Override public void run(SourceContext<String> ctx) throws Exception { // 控制大约在 10 秒的倍数的时间点发送事件 String currTime = String.valueOf(System.currentTimeMillis()); while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) { currTime = String.valueOf(System.currentTimeMillis()); continue; } System.out.println("开始发送事件的时间:" + dateFormat.format(System.currentTimeMillis())); // 第 13 秒发送两个事件 TimeUnit.SECONDS.sleep(13); ctx.collect("hadoop," + System.currentTimeMillis()); // 产生了一个事件,但是由于网络原因,事件没有发送 String event = "hadoop," + System.currentTimeMillis(); // 第 16 秒发送一个事件 TimeUnit.SECONDS.sleep(3); ctx.collect("hadoop," + System.currentTimeMillis()); // 第 19 秒的时候发送 TimeUnit.SECONDS.sleep(3); ctx.collect(event); TimeUnit.SECONDS.sleep(300); } @Override public void cancel() { } } /** * IN, OUT, KEY, W * IN:输入的数据类型 * OUT:输出的数据类型 * Key:key的数据类型(在Flink里面,String用Tuple表示) * W:Window的数据类型 */ public static class SumProcessWindowFunction extends ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWind ow> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); /** * 当一个window触发计算的时候会调用这个方法 * @param tuple key * @param context operator的上下文 * @param elements 指定window的所有元素 * * @param out 用户输出 */ @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Integer>> out) { // System.out.println("当天系统的时 间:"+dateFormat.format(System.currentTimeMillis())); // // System.out.println("Window的处理时 间:"+dateFormat.format(context.currentProcessingTime())); // System.out.println("Window的开始时 间:"+dateFormat.format(context.window().getStart())); // System.out.println("Window的结束时 间:"+dateFormat.format(context.window().getEnd())); int sum = 0; for (Tuple2<String, Long> ele : elements) { sum += 1; } // 输出单词出现的次数 out.collect(Tuple2.of(tuple.getField(0), sum)); } } private static class EventTimeExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss"); // 拿到每一个事件的 Event Time @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { return element.f1; } @Nullable @Override public Watermark getCurrentWatermark() { //window延迟5秒触发 return new Watermark(System.currentTimeMillis() - 5000); } } }
计算结果:
开始发送事件的时间:16:57:40
(hadoop,2)
(hadoop,3)
(hadoop,1)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。