当前位置:   article > 正文

Flink中关于Window API 演示以及时间窗口详解_flink datastream api代对数据流设置时间滚动口,窗口大小为1分钟val windo

flink datastream api代对数据流设置时间滚动口,窗口大小为1分钟val windowstream
1. Flink中窗口的API分类
  • 在Flink流计算中,提供Window窗口API分为两种

1)、针对KeyedStream窗口API:window

  • 第一步、数据流DataStream调用keyBy函数分组,获取KeyedStream
  • 第二步、KeyedStream.window设置窗口
  • 第三步、聚合操作,对窗口中数据进行聚合统计
    • 函数:reduce、fold、aggregate函数
    • apply() 函数

也就是调用了keyBy算子的,使用window
在这里插入图片描述

// TODO: KeyedStream窗口操作,先分组,再窗口,最后聚合
SingleOutputStreamOperator<String> windowDataStream = tupleStream
// 设置Key进行分组
.keyBy(0)
// 设置窗口,每5秒中统计最近5秒的数据,滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// 聚合操作
.apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, 
                      TimeWindow window, 
                      Iterable<Tuple2<String, Integer>> input, 
                      Collector<String> out) throws Exception {

    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2)、非KeyedStream窗口API:windowAll

  • 直接调用窗口函数:windowAll,对窗口所有数据进行处理,未进行分组
  • 聚合操作,对窗口中数据进行聚合统计
    • 函数:reduce、fold、aggregate函数
    • apply() 函数

也就是没有使用keyBy算子的,使用windowAll
在这里插入图片描述

// TODO: 未进行分组,直接窗口window,再聚合
SingleOutputStreamOperator<String> allWindowDataStream = tupleStream
	.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
	.apply(new AllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() {
		@Override
		public void apply(TimeWindow window,
		                  Iterable<Tuple2<String, Integer>> values,
		                  Collector<String> out) throws Exception {

		}
	});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
2. 关于滚动时间窗口的案例分析

在这里插入图片描述

在这里插入图片描述

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author liu a fu
 * @version 1.0
 * @date 2021/3/7 0007
 * @DESC   窗口统计案例演示:滚动时间窗口(Tumbling Time Window),实时交通卡口车流量统计
 * TODO: 滚动窗口数据不会重复  想成水流在流动
 *       滑动窗口数据会重复 想成窗口在走动
 */
public class StreamTumblingTimeWindow {
    public static void main(String[] args) throws Exception {
        //1-环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2-数据源source
        DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);

        //3-数据的transformation
/*
数据:
a,3
a,2
a,7
d,9
b,6
a,5
b,3
e,7
e,4
 */
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream
                .filter(line -> line != null && line.trim().split(",").length == 2)
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String line) throws Exception {
                        String[] split = line.trim().split(",");
                        return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1]));
                    }
                });
        // TODO: 先按照卡口分组,再进行窗口操作,最后聚合累加  使用了keyBy  所以使用window
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = mapDataStream
                .keyBy(0)   // 下标索引,卡口编号
                .timeWindow(Time.seconds(5))    // 滚动时间窗口,仅仅设置窗口大小即可
                .sum(1);
        //4-数据的sink
        resultDataStream.printToErr();
        //5-execute
        env.execute(StreamTumblingTimeWindow.class.getSimpleName()) ;

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

在这里插入图片描述

3. 关于滑动时间窗口的案例分析

在这里插入图片描述
在这里插入图片描述

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 窗口统计案例演示:滑动时间窗口(Sliding Time Window),实时交通卡口车流量统计
 */
public class StreamSlidingTimeWindow {

	public static void main(String[] args) throws Exception {
		// 1. 执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		// 2. 数据源-source
		DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);

		// 3. 数据转换-transformation
/*
数据:
a,3
a,2
a,7
d,9
b,6
a,5
b,3
e,7
e,4
 */
		SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream
			.filter(line -> null != line && line.trim().split(",").length == 2)
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String line) throws Exception {
					String[] split = line.trim().split(",");
					return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1]));
				}
			});

		// TODO: 先按照卡口分组,再进行窗口操作,最后聚合累加
		SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = mapDataStream
			.keyBy(0) // 下标索引,卡口编号
			// public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide)
			.timeWindow(Time.seconds(10), Time.seconds(5))  // 滑动时间窗口 大小10s  滑动时间5秒
			.sum(1);

		// 4. 数据终端-sink
		sumDataStream.printToErr();

		// 5. 触发执行-execute
		env.execute(StreamSlidingTimeWindow.class.getSimpleName()) ;
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/544077
推荐阅读
相关标签
  

闽ICP备14008679号