赞
踩
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
// 滚动 event-time 窗口 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 滚动 processing-time 窗口 input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
// 滑动 event-time 窗口 input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 滑动 processing-time 窗口 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 滑动 processing-time 窗口,偏移量为 -8 小时 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
DataStream<T> input = ...; // 设置了固定间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // 设置了动态间隔的 event-time 会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>); // 设置了固定间隔的 processing-time session 窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // 设置了动态间隔的 processing-time 会话窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // 决定并返回会话间隔 })) .<windowed transformation>(<window function>);
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
求每10秒内用户平均访问次数,即avgPV
package com.hpsk.flink.beans; import java.sql.Timestamp; public class Event { public String user; // 用户 public String url; // 访问的url public Long timestamp; // 访问时间 public Event(){ } public Event(String user, String url, Long timestamp) { this.user = user; this.url = url; this.timestamp = timestamp; } @Override public String toString() { return "Event{" + "user='" + user + '\'' + ", url='" + url + '\'' + ", timestamp=" + new Timestamp(timestamp) + '}'; } }
package com.hpsk.flink.source; import com.hpsk.flink.beans.Event; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import java.util.Calendar; import java.util.Random; public class EventWithWatermarkSource implements ParallelSourceFunction<Event> { private boolean isRunning = true; String[] users = new String[]{ "Alice", "Bob", "Mary", "Tom"}; String[] urls = new String[]{ "./home", "./cart", "./prod?id=1", "./prod?id=10"}; @Override public void run(SourceContext<Event> ctx) throws Exception { Random random = new Random(); while (isRunning) { String user = users[random.nextInt(users.length)]; String url = urls[random.nextInt(urls.length)]; long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 Event event = new Event(user, url, currTs); // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段 ctx.collectWithTimestamp(event, currTs); // 发送水位线 ctx.emitWatermark(new Watermark(event.timestamp - 1L)); // 睡眠1秒 Thread.sleep(1000L); } } @Override public void cancel() { isRunning = false; } }
package com.hpsk.flink.window; import com.hpsk.flink.beans.Event; import com.hpsk.flink.source.EventWithWatermarkSource; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.HashSet; /** * 求每10秒内用户平均访问次数,即avgPV */
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。