当前位置:   article > 正文

Flink-Window_flink windowall

flink windowall

streaming流式计算是⼀种被设计用于处理⽆限数据集的数据处理引擎,而⽆限数据集是指一种不断增长的本质上无限数据集,⽽window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将⼀个⽆限stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

1.Time(Flink中涉及的时间)

在这里插入图片描述

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每⼀条⽇志都会记录⾃己的生成时间,Flink通过时间戳分配器访问事件时间戳。
  • Ingestion Time:是数据进入Flink的时间。
  • Processing Time:是每⼀个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

2.window类型

2.1 TimeWindow(按照时间生成Window)

TimeWindow是将指定时间范围内的所有数据组成⼀个window,⼀次对一个window⾥面的所有数据进行计算。

2.1.1 滚动窗口

Flink默认的时间窗⼝根据Processing Time 进⾏窗⼝的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

将数据依据固定的窗⼝⻓度对数据进行切片。

特点:时间对⻬,窗口⻓度固定,没有重叠。

滚动窗⼝分配器将每个元素分配到⼀个指定窗⼝⼤小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,如下图所示:
图片
适用场景:适合做BI统计等(做每个时间段的聚合计算)。

2.1.1.1 timeWindowAll(全局数据,默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class TumblingTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt);
        //划分窗口
        AllWindowedStream<Integer, TimeWindow> timeWindowAll = num.timeWindowAll(Time.seconds(5));
        //对窗口数据进行计算
        SingleOutputStreamOperator<Integer> sum = timeWindowAll.sum(0);
        sum.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.1.1.2 timeWindow(窗口滚动的时候,所有组都要执行,并行处理,默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class TumblingTimeWindow2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> {
            String[] fileds = line.split(",");
            return Tuple2.of(fileds[0], Integer.parseInt(fileds[1]));
        }).returns(Types.TUPLE(Types.STRING,Types.INT));
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(5));
        timeWindow.sum(1).print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.1.1.3 timeWindowAll(全局数据,使用Event Time)

package com.wedoctor.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class EventTimeTumbingWindwAllDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //创建一个DataStream
        //2020-11-08 18:22:43,1
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //提取数据中的时间
        SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            @Override
            public long extractTimestamp(String element) {
                long timestamp = 0;
                try {
                    timestamp = sdf.parse(element.split(",")[0]).getTime();
                } catch (ParseException e) {
                    timestamp = System.currentTimeMillis();
                }
                return timestamp;
            }
        });
        SingleOutputStreamOperator<Integer> nums = watermarksDataStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value.split(",")[1]);
            }
        });
        AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Integer> summed = windowed.sum(0);
        summed.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

2.1.1.4 timeWindow(分组数据,使用Event Time)

package com.wedoctor.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class EventTimeTumblingWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,hadoop,1
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = watermarksDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fileds = value.split(",");
                String word = fileds[1];
                int count = Integer.parseInt(fileds[2]);
                return Tuple2.of(word, count);
            }
        });
        //先分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);
        //划分窗口
          WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

2.1.2 滑动窗口

滑动窗⼝是固定窗口的更⼴义的⼀种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠

该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。

例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。
图片
适⽤场景:对最近⼀个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

2.1.2.1 全局滑动(默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class TumblingTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt);
        //划分滑动窗口
        AllWindowedStream<Integer, TimeWindow> timeWindowAll = num.timeWindowAll(Time.seconds(10),Time.seconds(5));
        //对窗口数据进行计算
        SingleOutputStreamOperator<Integer> sum = timeWindowAll.sum(0);
        sum.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.1.2.2 分组滑动(默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class TumblingTimeWindow2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> {
            String[] fileds = line.split(",");
            return Tuple2.of(fileds[0], Integer.parseInt(fileds[1]));
        }).returns(Types.TUPLE(Types.STRING,Types.INT));
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10),Time.seconds(5));
        timeWindow.sum(1).print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.1.3 会话窗口

由⼀系列事件组合⼀个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间⽆对⻬。

在会话窗口中按活动会话分配器组中的元素。会话窗口不重叠,没有固定的开始和结束时间,与翻滚窗口和滑动窗口相反。相反,当会话窗口在一段时间内没有接收到元素时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或 会话间隙提取器功能,该功能定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续元素将分配给新的会话窗口。
图片
2.1.3.1 不分组(默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class ProcessingTimeSessionWindowAllDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
        //不分组,划分会话窗口
        AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
        //划分完窗口要调用WindowFunction对窗口内的数据进行计算
        SingleOutputStreamOperator<Integer> summed = windowed.sum(0);
        summed.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.1.3.2 分组(单个组出发,不是全部触发,默认Processing Time)

package com.wedoctor.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class ProcessingTimeSessionWindwDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //spark,3
        //hadoop,2
        //flink,1
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> {
            String[] fields = line.split(",");
            return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        //先分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = windowed.sum(1);
        summed.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

2.1.3.3 不分组(使用Event time)

package com.wedoctor.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class EventTimeSessionWindowAllDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,1
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //提取数据中的时间
        SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        SingleOutputStreamOperator<Integer> nums = watermarksDataStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value.split(",")[1]);
            }
        });
        //不分组划分窗口
        AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
        windowed.sum(0).print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

2.1.3.3 分组(使用Event time)

package com.wedoctor.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class EventTimeSessionWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,spark,1
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = watermarksDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fileds = value.split(",");
                String word = fileds[1];
                int count = Integer.parseInt(fileds[2]);
                return Tuple2.of(word, count);
            }
        });
        //先分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);
        //划分窗口
        //keyed.timeWindow(Time.seconds(5));
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyed.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

2.2 GlobalWindow(CountWindow)
按照指定的数据条数生成⼀个Window,与时间无关

2.2.1 countWindowAll

全部数据发送到一个task里面 并不是分布式执行

package com.wedoctor.flink;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt);
        //划分窗口
        AllWindowedStream<Integer, GlobalWindow> windowd = num.countWindowAll(5);
        //对窗口数据进行计算
        SingleOutputStreamOperator<Integer> sum = windowd.sum(0);
        sum.print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.2.2 countWindow

分组满足触发条件即可,并不是触发后每个分区都会执行

package com.wedoctor.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

public class CountWindow2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999);
        //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> {
            String[] fileds = line.split(",");
            return Tuple2.of(fileds[0], Integer.parseInt(fileds[1]));
        }).returns(Types.TUPLE(Types.STRING,Types.INT));
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);
        WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(5);
        countWindow.sum(1).print();
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

参考
Flink Window窗口机制
flink 彻底理解 window(窗口)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/1019191
推荐阅读
相关标签
  

闽ICP备14008679号