赞
踩
本文的主要目的是学会flink window窗口编程,本文章不会对:滑动窗口,滚动窗口,sessoin窗口作深入的讲解,那不是本文的目的。本文在假设你对窗口有一定了解的基础的前提下,带你看看flink窗口编程。 在读本篇文章的基础上,你需要先阅读:flink 水位线彻底站起来
内容较多,无法一一道来, 欢迎留言讨论。
回忆下批处理吧, 我们以日志数据来举例。一般来说我们的日志都是按照日期进行分类的, 比如:/log/2022/03/02/, 如果我想分析2022年三月二号的日志的话,我只需要加载上面的目录就行了, 而实际上这不就是一个窗口嘛? 这个窗口就是我们定义好的文件夹中的数据。 所以窗口不是流处理独有的概念,他并没有那么高深。 我们来给窗口下个定义: 窗口就是对数据精确划分的一个集合。
至于你怎么划分那是你的事了, 比如常见的根据时间划分, 也有的是根据数量划分, 甚至根据某些特性划分,划分的方式多了去了。 所有这些划分最终的目的,就是把聚集的数据 按照业务层需要的方式 去分割成一个个的小集合, 而这每个小集合都是一个窗口。
我为什么会将理解窗口作为一个小节,那是因为我也是小白过来的,明白小白的心态。理解了窗口的本质,会对后面的学习事半功倍。
flink的窗口对象有两个实现类,见下图:
官网这么说的:A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
flink有一个全局窗口分配器(后面会讲分配器),该分配器将相同key(指的是keyedStream)流数据全部分配到一个全局窗口GlabalWindow中,GlabalWindow窗口必须指定触发器trigger,因为全局窗口分配器没有默认的触发器,所以必须指定触发器trigger。
看个例子,keyBy数据流有个方法。countWindow()大家知道countWindow不属于时间窗口,那么其只可能是GlobalWindow结合触发器trigger实现的,看源码:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
//发现其底层就是调用了window方法,
GlobalWindows.create()创建了全局窗口分配器
然后指定trigger
这个代码看不懂没关系,看了后面的分配器就懂了,此时只需要知道GlobalWindow没有触发器即可。要用GlobalWindow的时候开发者必须指定一个触发器,否则的话GlobalWindow永远不会被触发。
时间窗口,可以说这是做常用的了吧,不管是滑动窗口,滚动窗口又或是sessoin窗口内部都是TimeWindow对象。
TimeWindow 对象有三个属性
在看下面的介绍之前,其实flink的窗口可能和大部分的人理解的不一样,他并不是先构建一个携带时间范围的窗口对象,然后流数据过来的时候根据流数据的时间去判断其属于哪个窗口。 你是不是这么理解的,此处请对号入座。 网上很多人也是这么说的,为什么大家都这样说,因为这样说好理解,但是实际上flink内部不是这样实现的。 请往下看…
当流数据到来的时候flink通过窗口分配器WindowAssigner为每个数据建立一个TimeWindow对象,同时每条数据再注册一个触发器trigger。
flink会根据数据中的event time 和用户子自定义的窗口宽度size 生成TimeWindow对象的窗口范围[startTime, endTime] .
假如现在时间是15:01, 窗口宽度size=5, 然后连续过来三个流数据:
e1(event time=15:02),e2(event time=15:02),e3(event time=15:02)。然后这三个流数据会生成三个一一对应的TimeWindow对象。
注意了,flink根据e1,e2,e3的event time和 窗口宽度size=5 计算出这三个TimeWindow对象的窗口范围都为[15:00, 15:05], 这种计算方法是flink计算的,这个计算才是核心,有兴趣的建议看看源码。
这三个窗口对象的窗口时间范围都是[15:00, 15:05], 在此基础上flink会为这三个流数据都注册一个trigger定时触发器。 因为这三个元素的窗口时间范围一样, 因此这三个trigger触发器的时间也是一样的,然后等时间一到, 这三个对象就会同时参与计算, 这样的话这三个对象逻辑上就属于同一个窗口。下图是滑动窗口在event time下的trigger定时触发器源码:
这块比较拗口,我们再来理一理:
1.流中的每个元素到来, 都会分配一个TimeWindow对象和一个触发器Trigger
2.TimeWindow对象根据当前元素的event time以及窗口的宽度 来确定 窗口范围[starttime,endtime]
3.随着流元素的逐个到来, 紧挨着到来的流元素生成的TimeWindow对象的[starttime,endtime]可能一样
4.[starttime,endtime]一样的TimeWindow对象逻辑上属于一个窗口,这个逻辑上的窗口才是
大众理解的窗口
5.属于同一个逻辑窗口的流数据,对应的trigger也是一样的。为什么呢?因为这些元素的TimeWindow对象的窗口范围[starttime,endtime]是一样的, 而trigger对象需要的时间就是从maxtimestamp 其实(maxtimestamp=endtime-1)
触发逻辑
如果你2.3小节没看懂建议不要往下看了。到这里我假设2.3小节你已经明白。 每个流数据都有一个
TimeWindow对象和一个Trigger触发器,
当不同流元素TimeWindow的窗口时间范围[start,end]一样的时候,其触发器Trigger也是一样的,
既然Trigger一样那么时间一到就会同时开始处理。
此时他们属于同一个逻辑窗口, 不严谨的说法就是他们是属于一个窗口的。
flink时间窗口根据时间的特性分为:event time和process time,不管哪种窗口上面的触发逻辑都是一摸一样。不同的是 event time类型的Trigger触发器多了一个判断水位线的逻辑,process time的触发其实就是我上面说的触发逻辑,接下来聊聊event time的触发.
event time 模式下:Trigger对象解析数据的时候,除了注册了触发器之外,还加了了watermar>=TimeWindow.maxtimestamp 的判断。 步骤1:就是Trigger对象根据TimeWindow的 maxtimestamp注册的定时器,即便2不发生,窗口最终也会被触发。此步骤就是本小节开头说的触发逻辑。
步骤2:仔细看红色圈圈的代码,我们可以看到和水位线条进行比较的是窗口的maxtimestamp时间,而触发器的用的时间也是maxtimestamp, 如果watermar>=TimeWindow.maxtimestamp 成立的话,也会触发窗口计算。
有些读者迷茫了,会什么会计算两次。 那是因为2 是为了处理迟到数据的时候触发的计算, 而1则是正常的窗口计算.
默认情况下:迟到数据会被丢弃,当配置迟到时间:
window(TumblingEventTimeWindows.of(Time.seconds(timeWindowSize)))
.allowedLateness(Time.seconds(timeWindowAllowLater))//允许迟到时间
的时候,1 才会生效。 此时的逻辑是:触发器 触发一次窗口计算, 水位线时间越过 window.maxTimestamp+allowedLateness 的时候再重新计算一次。 这意味着计算了两次, 因此下游需要 将window starttime 和window endtime设置在主键中 以修正之前的窗口结果。
此时窗口的状态数据才会被真的清空,如果往后再来迟到数据,则 默认会丢弃。 为了处理再迟到的数据,此时可以采用侧流收集然后留着后续处理。
// UserSlotGame 是我自定义的类,用于接收kafka数据。
OutputTag<UserSlotGame> outputTag = new OutputTag<>("late", TypeInformation.of(UserSlotGame.class));
.window(TumblingEventTimeWindows.of(Time.seconds(timeWindowSize)))
.allowedLateness(Time.seconds(timeWindowAllowLater))//允许迟到时间
.sideOutputLateData(outputTag) //侧流收集
.process(省略)
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(WindowAssigner) <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
.allowedLateness(Time) <- 可选: 允许元素迟到多久,默认为0意思是不允许迟到
.sideOutputLateData(OutputTag)<- 可选: 迟到的数据输出到OutputTag管道中
.reduce/aggregate/apply() <- 必须: 落到窗口中的数据怎么处理
.getSideOutput(OutputTag) <- 可选: 获取迟到的数据
stream
.windowAll(WindowAssigner) ->必须
.trigger(Trigger) ->可选
.evictor(Evictor) ->可选
.allowedLateness(Time) ->可选
.sideOutputLateData(OutputTag) ->可选
.reduce/aggregate/apply() ->必须
.getSideOutput(OutputTag) ->可选
说明:可以看到键控流和非键控流的最大的区别就是是否对流数据 调用了keyBy算子。 经过keyBy处理的数据流,会根据key分成多条task并行任务, 这些task任务可以并行执行。 而不经过keyBy算子的话,那所有的数据都将被一个窗口处理,也就是只能调用windowAll, 这样的话所有的数据都会被一个task处理, 也就是说所有的数据都将发送到下游算子的单个task中–>并行度为1.
WindowAssigner是最顶层接口,其余的都是它的实现类,其实现类都称为窗口分配器,可以拿来直接用。 其实现类我不会都说一遍,下文会说一些常用的,至于其他的感兴趣可以自己去看。
public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** *每个流数据 element都会经过这个方法,其作用就是根据timestamp 生成TimeWindow或者GlobalWindow,实际开发中大部分都是TimeWindow TimeWindow包含starttime,endtime,maxtimestamp */ public abstract Collection<W> assignWindows( T element, long timestamp, WindowAssignerContext context); /** 该方法为到来的流元素element定义了一个Trigger对象,该Trigger对象决定了窗口以何种方式触发 */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); ... }
对应的WindowAssigner实现类有两个:
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
常用场景:每隔五分钟统计一次
下图是一个每隔五秒划分一个窗口的图解。滑动窗口是不会重叠的。
仔细看上图,Tumbling在英文中是滑动的意思,因此上图中:
TumblingEventTimeWindows和TumblingProcessingTimeWindows都是滑动窗口。
TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))
第一个参数指的是窗口的大小,第二个参数指的是窗口的起始偏移量。 窗口是按照自然时间划分的,比如Time.minutes(5)
划分的窗口是:[00:00:00,00:05:00],[00:05:00,00:10:00]。TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))意思是在自然划分的窗口的基础上起始偏移量为十秒,此时划分的窗口就是:
[00:00:10, 00:05:10], [00:05:10,00:10:10]
TumblingProcessingTimeWindows 参数也是一样的就不多说了
stream1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10)))
.<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
对应的WindowAssigner实现类有两个:
- SlidingEventTimeWindows
- SlidingProcessingTimeWindows
常用场景:每隔一分钟统计过去五分钟的数据量
滚动窗口有两个必要属性:1.步长slide 2.窗口宽度size
意思是每隔slide 滑动一次, 一般slide比size小, 你仔细想想,当slide=size的时候滚动窗口就变成了滑动窗口。 滚动窗口会有重叠,意思是一个流元素可能会属于多个滑动窗口中, slide越小窗口分配的就越多。
SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.se conds(1))第一个参数是窗口大小size,第二个参数是步长,第三个参数是偏移量。窗口划分也是基于自然时间划分的。
stream1
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.seconds(1))).<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
对应的WindowAssigner实现类有两个:
- EventTimeSessionWindows //静态事件时间
- DynamicEventTimeSessionWindows //动态事件时间
- ProcessingTimeSessionWindows //静态处理时间
- DynamicProcessingTimeSessionWindows //动态处理时间
常用于:十分钟内没有新的数据到来则开启新的窗口。
会话窗口其实逻辑也很简单, 它有有一个时间间隔在这里为了下文方便我们把这个时间间隔记作sessionTimeout。在sessionTimeout内如果没有新的数据流到窗口则该窗口关闭, 然后开启新的窗口。 那如果一直有数据过来则该窗口永远不会关闭。 这也意味着会话窗口的endTime是不确定的,话句话说窗口的大小是不确定的。
sessoin窗口按照类型分为静态和动态窗口
上面关于动态和静态的sessoin很难理解,因为sessoin窗口和普通的窗口不一样,sessoin窗口涉及到窗口的合并,如果不了解窗口分配很难理解,下面我将尽量讲清楚窗口的分配逻辑。
窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)
timestamp是流数据的时间,可能是event time 也可能是process time,和你用的窗口分配器类型有关(如果你用EventTimeSessionWindows则timestamp表示event time )
sessoin窗口没办法很简单的举例子来说明窗口的分配,我们必须结合源码来看,我将尽量讲解清除。
上面你可能知道了sessoin窗口根据sessionTimeout是自动生成还是动态提取被分为:静态sessoin窗口和动态sessoin窗口。
sessoin窗口和滑动/滚动窗口最大的不同就在于sessoin窗口有一个合并操作。不管是动态还是静态,通过源码可以看到sessoin每到来一个元素都会创建一个时间窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)
基于时间timestamp 和sessoinTimeout确定一个窗口范围,我们说过sessoin窗口范围是在不断变化的,如果在[timestamp, timestamp + sessionTimeout]范围内到达了新的流数据,该流数据对应的窗口为[timestamp2, timestamp2 + sessionTimeout], 则合并后二者实际上属于一个窗口[timestamp,timestamp2 + sessionTimeout],看下图。
stream1
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply()
stream1.keyBy(0).window( DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>(){
@Override
public long extract(Tuple2<String, Long> element) {
return element.f1;
}
}));
SessionWindowTimeGapExtractor用于返回当前流数据
构建窗口的sessoinTimeout参数。
每个流数据都会调用一次,因为
每个流数据都会构建一个TimeWindow(timestamp, timestamp + sessionTimeout)
关于sessoin只能尽量说成这样了。 另外值得注意的就是,上面我讲解的WindowAssigner都是flink已经实现好的,其内部包含了Trigger所以不需要再次指定,内置的WindowAssigner都有一个默认的Trigger. 你可以点开源码看下。
滚动窗口和滑动窗口用法基本一样,我们挑滑动窗口来做两个例子,例子分为event time 和process time两种模式。 记住process time不需要水位线。
public class Order { private String name;//订单名称 private int price;//订单价格 private long timestamp; //订单产生时间,用于event time public Order() { } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "Order{" + "name='" + name + '\'' + ", price=" + price + ", timestamp=" + timestamp + '}'; } }
import org.apache.flink.streaming.api.functions.source.SourceFunction; //模拟订单产生的时间 public class SourceOrder implements SourceFunction<Order> { private volatile boolean flag=true; @Override public void run(SourceContext<Order> ctx) throws Exception { long timestamp = System.currentTimeMillis(); while (flag) { Order order_0 = new Order(); Order order_1 = new Order(); order_0.setName("order_0"); order_0.setPrice(1); order_0.setTimestamp(timestamp); order_1.setName("order_1"); order_1.setPrice(2); order_1.setTimestamp(timestamp); ctx.collect(order_0); ctx.collect(order_1); Thread.sleep(1000); timestamp += 1000;//订单产生间隔一秒 } } @Override public void cancel() { flag = false; } }
import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.*; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; public class WindowsEventTimeMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){ @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e,timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){}; SingleOutputStreamOperator<Order> single= keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .reduce(new ReduceFunction<Order>() { @Override public Order reduce(Order value1, Order value2) throws Exception { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String str_time01 = sdf.format(value1.getTimestamp()); String str_time02 = sdf.format(value2.getTimestamp()); Order order = new Order(); order.setName(value1.getName()+">>"+str_time01+":"+str_time02); order.setPrice(value1.getPrice()+value2.getPrice()); order.setTimestamp(9999999); return order; } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实窗口输出:"); env.execute("event time demo."); } }
基于process time就很简单了,不需要水位线条,代码很简单那。
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.text.SimpleDateFormat; public class WindowsProcessTimeMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print(); SingleOutputStreamOperator<Order> outStream = ds .keyBy("name") .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .reduce(new ReduceFunction<Order>() { @Override public Order reduce(Order value1, Order value2) throws Exception { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String str_time01 = sdf.format(value1.getTimestamp()); String str_time02 = sdf.format(value2.getTimestamp()); Order order = new Order(); order.setName(value1.getName()+">>"+str_time01+":"+str_time02); order.setPrice(value1.getPrice()+value2.getPrice()); return order; } }); outStream.print(); env.execute("prcocess time demo"); } }
结果:
对了,你可以把时间TumblingProcessingTimeWindows.of(Time.seconds(2))改成:
TumblingProcessingTimeWindows.of(Time.seconds(1)) 结果如下:
你会发现,这不是reduce的结果,reduce根本没有被调用。 因为设置一秒的话,由于数据源是一秒生成一条数据, 那么也就意味着每个窗口大概率只有一条数据, 那么reduce在只有一条数据的时候不会生效。 咳咳,感觉有点坑。 希望对此有研究的小伙伴给我留言。
reduce:对窗口中的每条数据做逐个处理,最后整个窗口输出一条数据。
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
/**
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
通过源码可以看到:reduce最后的返回值类型和流数据的类型必须保持一致,
而这也是开发中需要主义的地方。
上一节的实战部分展示了reduce函数,我们想一下,reduce函数发出的结果有一个小的问题,就是这个结果是属于哪个窗口的我们并不知道。 如果我可以获取到结果所属的窗口的时间那就很有用了,这样的话结果更加清晰。 你想每条reduce的结果我都知道其计算的是哪个窗口,把窗口时间附加到结果中很明确,方便后续我们做一些基于时间的过滤。 但是有些人可能会说了,我不需要知道窗口的时间啊, 我只需要用System.currentTimeMillis()获取当前系统时间,然后把这个时间赋予到结果中不也行嘛? 问题已经出现,请大家好好思考一下,对event time来说其窗口时间 范围i是根据event time来计算的,如果我今天去kafka拿到昨天产生的基于event time的数据,明显窗口时间是昨天的,而你用System.currentTimeMillis()获取的时间就是今天的啊,这样肯定有问题。 有人就说了既然如此我随便取一个 流元素的event time不也行嘛? 答案是可以的,但是不严谨。 flink的reduce 有一个重载函数方法reduce(ReduceFunction, WindowFunction)
windowFunction用于处理reduce发出的结果。
下面代码实现了,对reduce结果添加窗口时间范围的功能。
import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.util.function.Consumer; public class WindowsEventTimeReducetionMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){ @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e,timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){}; SingleOutputStreamOperator<Order> single= keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .reduce(new ReduceFunction<Order>() { @Override public Order reduce(Order value1, Order value2) throws Exception { Order order = new Order(); order.setName(value1.getName()); order.setPrice(value1.getPrice() + value2.getPrice());//订单价格求和 return order; } }, new WindowFunction<Order, Order, String, TimeWindow>() { //由于窗口的数据已经被reduceFunction处理并且做了累加,数据到WindowFunction //的时候就是那个reduceFunction处理后的数据,处理后的数据都在Iterable //中,直接遍历出来加上 窗口时间即可 @Override public void apply(String s, TimeWindow window, Iterable<Order> input, Collector<Order> out) throws Exception { input.forEach(new Consumer<Order>() { @Override public void accept(Order order) { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间 String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间 order.setTimestamp(window.maxTimestamp());//maxTimestamp=endtime-1 order.setName(order.getName()+">>>>window_time_range[ "+window_start_time+" , "+window_end_time+" ]"); out.collect(order); } }); } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实输出:"); env.execute("sdaaf"); } }
结果:
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=2, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=4, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=4, timestamp=1661326535999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=2, timestamp=1661326535999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=4, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=2, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=2, timestamp=1661326539999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=4, timestamp=1661326539999}
reduce是一个的局限性有点大,输入类型和输出类型必须保持一致,对于window窗口内的聚合行为有一个比较高级的聚合方法aggregate,aggregate方法接收一个聚合器接口参数,以及洽谈的一些参数,所有的重载方法都在下图。
除了红色箭头是AggregationFunction ,其他的都是 AggregateFunction<T, ACC, V>
先看红色箭头AggregationFunction的接口源码:
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) { return reduce(aggregator); } public abstract class AggregationFunction<T> implements ReduceFunction<T> { private static final long serialVersionUID = 1L; /** Aggregation types that can be used on a windowed stream or keyed stream. */ public enum AggregationType { SUM, MIN, MAX, MINBY, MAXBY, } }
请仔细看上面的代码,你会发现
aggregate(AggregationFunction aggregator) { return reduce(aggregator);
这个聚合器竟然调用了reduce,当你点开AggregationFunction的时候发现了这家伙继承了Reducetion,且里面有一个枚举对象,枚举对象中的值为: SUM, MIN,MAX, MINBY, MAXBY。
。AggregationFunction有两个实现类:
具体我就不展开了,这个聚合器你基本不会重新的,flink基本上已经实现好了:
可以直接用,用法就是: window后面直接调用".sum" “.max” “.min” “.minBy” “maxBy”
这也是这一小节的重点,它和reduce最大的不同就是输入类型和输出类型可以不一致
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { /** * 创建一个累加器,可以是任意flink支持的可序列化的类型 * 用于缓存数据 * 最终的结果是从这个缓存中拿出来的。 */ ACC createAccumulator(); /** * 每来一条数据都要和缓存中的数据做同样的逻辑计算 * 然后返回一个新的缓存对象 (缓存指的是累加器) */ ACC add(IN value, ACC accumulator); /** * 当窗口出发的时候会调用这个方法,从累加器中获取结果 * 即发出窗口的计算结果。 */ OUT getResult(ACC accumulator); /** * 我之前说过,flink的窗口只有sessoin窗口的大小是不固定的 * 这个不固定是通过窗口合并实现的, 所以 * 当你的窗口是sessoin窗口的时候,可能会有窗口合并 * 而累加器是属于窗口的属性,当窗口合并的时候必须在这里 * 手动合并两个窗口的累加器缓存。 * 换言之如果我们不用sessoin窗口,此方法不需要实现就行。 * 无论是滑动窗口还是滚动窗口他们的窗口大小时不变的,也不需要合并。 */ ACC merge(ACC a, ACC b); }
下面我们还是原来的reduce代码稍加改动实现和reduce一样的功能,我自定义了HashMap累加器存储订单号和 价格,方会该订单的所有价格之和,最终结果定义成String类型。
package com.pg.flink.dataStreame.window; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.util.HashMap; public class WindowsEventTimeAggregateMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){ @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e,timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){}; SingleOutputStreamOperator<String> single= keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .aggregate(new AggregateFunction<Order, HashMap<String,Integer>, String>() { @Override //初始化累加器(其实就是一个缓存) public HashMap<String, Integer> createAccumulator() { return new HashMap<>();//key 存储order的name,value 存储其价格 } @Override public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) { String name = value.getName(); int price = value.getPrice(); HashMap<String, Integer> new_accumulator = new HashMap<>(); //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可 new_accumulator.put(name, price + accumulator.getOrDefault(name,0)); return new_accumulator; } @Override public String getResult(HashMap<String, Integer> accumulator) { return accumulator.toString(); } @Override public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) { //因为不是sessoin窗口,返回一个空的map即可 return new HashMap<>(); } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实输出:"); env.execute("sdaaf"); } }
结果如下:
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
多说一句:我自定义的sourec每隔一秒中产生一个order_0,和一个order_1
窗口长度两秒, 所以每一次的结果都是两条数据相加,所以结果是准确的哦。
aggregate(AggregateFunction<IN, ACC, OUT>,WindowFunction<IN, OUT, KEY, W extends Window>)
window中的数据结果就是AggregateFunction处理后的结果, 然后此结果会再次流入WindowFunction再次处理,最后返回最终的处理结果,WindowFunction最主要的功能就是提供的访问窗口的时间的方法,能让我们知道 当前结果的计算时间信息。
import com.pg.flink.dataStreame.window.Order; import com.pg.flink.dataStreame.window.SourceOrder; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.function.Consumer; public class WindowsEventTimeAggregateWindowFunctionMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){ @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e,timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){}; SingleOutputStreamOperator<String> single= keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .aggregate(new AggregateFunction<Order, HashMap<String, Integer>, String>() { @Override //初始化累加器(其实就是一个缓存) public HashMap<String, Integer> createAccumulator() { return new HashMap<>();//key 存储order的name,value 存储其价格 } @Override public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) { String name = value.getName(); int price = value.getPrice(); HashMap<String, Integer> new_accumulator = new HashMap<>(); //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可 new_accumulator.put(name, price + accumulator.getOrDefault(name, 0)); return new_accumulator; } @Override public String getResult(HashMap<String, Integer> accumulator) { return accumulator.toString(); } @Override public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) { //因为不是sessoin窗口,返回一个空的map即可 return new HashMap<>(); } }, new WindowFunction<String, String, String, TimeWindow>() { @Override //第一个参数是key的类型 // 第二个参数是当前数据是被TimeWindow对象包装的 //第三个获取的窗口中的所有数据,因为窗口数据已经被AggregateTion处理过来,所以迭代器此时只有一条数据,此数据就是AggregateTion中getResult返回的结果 //第四个参数用来发出最终结果 public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { // input.forEach(new Consumer<String>() { @Override public void accept(String order) { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间 String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间 out.collect(order+">>window_time_range["+window_start_time+" , "+window_end_time+" ]"); } }); } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实输出:"); env.execute("sdaaf"); } }
结果:
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:40 , 2022-08-25 19:10:42 ]
aggregate(AggregateFunction<IN, ACC, OUT>,ProcessWindowFunction<IN,OUT, KEY,W>
和6.23比较类似,也可以获取到窗口的时间,但是ProcessWindowFunction有自己的特点:
- 可以访问算子的processTime
- 可以访问算子的水位线时钟
- 可以使用定时器(尤其注意只有keyedStream可以使用此功能)
- 可以访问更新以及初始化状态,此状态是你自定义的状态,我们可以基于这些状态去发出数据,这样甚至可以直接更改算子的行为。
更多有关processFunction的介绍参考:flink processFunction
代码实例暂时先不展示状态相关的操作,后续会讲解,就展示一下简单的获取窗口时间吧。
import com.pg.flink.dataStreame.window.Order; import com.pg.flink.dataStreame.window.SourceOrder; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.function.Consumer; public class WindowsEventTimeAggregateProcessFunctionMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){ @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e,timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){}; SingleOutputStreamOperator<String> single= keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .aggregate(new AggregateFunction<Order, HashMap<String, Integer>, String>() { @Override //初始化累加器(其实就是一个缓存) public HashMap<String, Integer> createAccumulator() { return new HashMap<>();//key 存储order的name,value 存储其价格 } @Override public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) { String name = value.getName(); int price = value.getPrice(); HashMap<String, Integer> new_accumulator = new HashMap<>(); //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可 new_accumulator.put(name, price + accumulator.getOrDefault(name, 0)); return new_accumulator; } @Override public String getResult(HashMap<String, Integer> accumulator) { return accumulator.toString(); } @Override public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) { //因为不是sessoin窗口,返回一个空的map即可 return new HashMap<>(); } }, new ProcessWindowFunction<String, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception { elements.forEach(new Consumer<String>() { @Override public void accept(String s) { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String window_start_time =sdf.format( context.window().getStart());//当前窗口的起始时间 String window_end_time =sdf.format( context.window().getEnd());//endtime是下一个窗口的初始时间 out.collect(s+">>window_time_range["+window_start_time+" , "+window_end_time+" ]"); } }); } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实输出:"); env.execute("sdaaf"); } }
真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:10 , 2022-08-26 00:35:12 ]
process(ProcessWindowFunction<T, R, K, W> function) {…}
此函数接收一个ProcessWindowFunction函数,process函数是最强大也是最难用的函数,可以这么说无论是reduce还是aggregate都定义了窗口的个性化操作, reduce是前后数据的迭代处理,aggregate定义通过累加器自由可以实现很多的功能,但是不管是reduce韩式aggregate中处理数据的时候数据元素都是一个个过来的, 是逐个过来的。 最容易看到的就是aggregate中的累加器,每次数据到来都会更新累加器,这明显就是逐个的行为。 但是process就不一样的,process是直接拿到当前窗口的所有数据,其内部有一个迭代器Iterable, 包含了当前窗口的所有数据。
除此之外,ProcessWindowFunction还有个内部类, 可以访问水位线,processTime, window对象,和状态。 可以说, reduce和aggregate能实现的功能, process都能实现。 往深了说reduce和aggregate不过是process的特殊形式, 它们不过是把常用的功能抽取出来了而已。 而比较复杂的功能reduce和aggregate是无法实现的, reduce限制最严格,要求输入类型和输出类型一致,且reduce无法访问窗口的结束和开始时间, aggregate好一点,其内部有一个累加器相对而言也更灵活。 重要的来了,无论是reduce还是aggregate其参数都是继承了Function,而process函数的参数继承的是RichFunction,这也就意味着 processFunction是可以自定义以及访问状态的, 这也是proces最大的特性。 关于状态我之后会单独出一篇文章,状态是flink的核心,也是很难理解的地方。
我们已经说了,process可以实现reduce和aggregate的所有功能,以及可以实现一些更高级的功能,为了方便理解,我们下面用process实现reduce功能,并为每条输出添加所属的窗口的时间。
import com.pg.flink.dataStreame.window.Order; import com.pg.flink.dataStreame.window.SourceOrder; import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; public class WindowsEventTimeProcessFunctionMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定义两条流 DataStreamSource<Order> ds = env.addSource(new SourceOrder()); // ds.print("源数据:"); // ds.print(); env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期 KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>() { @Override public TimestampAssigner<Order> createTimestampAssigner(Context context) { return (e, timestamp) -> e.getTimestamp(); } })).keyBy((KeySelector<Order, String>) value -> value.getName()); OutputTag<Order> outputTag = new OutputTag<Order>("sideout") { }; SingleOutputStreamOperator<String> single = keyedStream .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sideOutputLateData(outputTag) .process(new ProcessWindowFunction<Order, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Order> elements, Collector<String> out) throws Exception { int sum = 0; for (Order element : elements) { sum += element.getPrice(); } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String window_start_time = sdf.format(context.window().getStart());//当前窗口的起始时间 String window_end_time = sdf.format(context.window().getEnd());//endtime是下一个窗口的初始时间 out.collect("key=" + s + ", sum=" + sum + ">>>" + "window_time_range[" + window_start_time + "," + window_end_time + "]"); } }); single.getSideOutput(outputTag).print("迟到数据:"); single.print("真实输出:"); env.execute("sdaaf"); } }
结果:
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]
总结:process是万能的,窗口的一切你想要的操作在这里都能实现。
我们再来看看窗口的代码结构:
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(WindowAssigner) <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
.allowedLateness(Time) <- 可选: 允许元素迟到多久,默认为0意思是不允许迟到
.sideOutputLateData(OutputTag)<- 可选: 迟到的数据输出到OutputTag管道中
.reduce/aggregate/apply() <- 必须: 落到窗口中的数据怎么处理
.getSideOutput(OutputTag) <- 可选: 获取迟到的数据
读到这里请回头看看我们的代码中用到的部分:
...省略。。。
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sideOutputLateData(outputTag)
.process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {...省略...};
请仔细阅读上面两处的代码,你会发现:
.trigger(Trigger) <- 可选: "触发器" (若不设置会选择默认值)
.evictor(Evictor) <- 可选: "清除器" (else no evictor)
这两个地方我们根本没用到,我们用的是TumblingEventTimeWindows,这是flink内置的窗口分配器,其内部源码自己点开你会发现里面有个trigger,也就是说flink内置的窗口分配器其内部有一个默认的触发器trigger. 还拿TumblingEventTimeWindows举例子,其内部的触发器是:EventTimeTrigger,这个触发器有时候并不能满足我们的需要。 具体请看下一节。
短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况。比如TumblingEventTimeWindows.of(Time.days(1)), 意思是按天划分窗口,也就是说必须是以天的维度触发一次计算,这种统计历史数据没问题,但是不适合实时看板统计。 窗口虽然是按照天划分,但是我想每分钟都看一下当前天的实时结果,那么此时TumblingEventTimeWindows内置的默认的trigger就不适合了,此时我们需要一个新的trigger取代其默认的trigger,
对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。
场景:求每个区域的每小时的商品销售额, 要求每隔1min能能够看到销售额变动情况。核心代码实现如下:
.keyBy(...)
.timeWindow(Time.hours(1))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。