赞
踩
转换和滚动聚合一次处理一个事件产生输出事件并可能更新状态。但是,有些操作必须收集并缓冲数据以计算其结果。
例如,考虑不同流之间的连接或整体聚合这样的操作,例如中值函数。为了在无界流上高效运行这 些操作符,我们需要限制 这些操作维护的数据量。
窗口还可以在语义上实现关于流的比较复杂的查询。
我们已经看到了滚动聚合的方式,以聚合值编码整个流的历史数据来为每个事件提供低延迟的结果。
在实际案例中Keyed Window 使用最多,所以我们需要掌握Keyed Window的算子,
code: stream.keyBy(...)是Keyed类型数据集 .window(...)//指定窗口分配器类型 [.trigger(...)]//指定触发器类型(可选) [.evictor(...)] // 指定evictor或者不指定(可选) [.allowedLateness(...)] //指定是否延迟处理数据(可选) [.sideOutputLateData(...)] // 指定Output lag(可选) .reduce/aggregate/fold/apply() //指定窗口计算函数 [.getSideOutput(...)] //根据Tag输出数据(可选) intro: Windows Assigner : 指定窗口的类型,定义如何将数据流分配到一个或多个窗口 Windows Trigger : 指定窗口触发的时机,定义窗口满足什么样的条件触发计算 Evictor : 用于数据剔除 allowedLateness : 标记是否处理迟到数据,当迟到数据达到窗口是否触发计算 Output Tag: 标记输出标签,然后在通过getSideOutput将窗口中的数据根据标签输出 Windows Function: 定义窗口上数据处理的逻辑,例如对数据进行Sum操作
Flink要操作窗口,先得将StreamSource 转成WindowedStream.
方法名: | 描述 |
---|---|
window KeyedStream → WindowedStream | 可以在已经分区的KeyedStream上定义 Windows,即K,V格式的数据。 |
WindowAll DataStream → AllWindowedStream | 对常规的DataStream上定义Window,即非 K,V格式的数据 |
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | 将函数应用于整个窗口中的数据。 |
Window Reduce WindowedStream → DataStream | 对窗口里的数据进行”reduce”减少聚合统计 |
Aggregations on windows WindowedStream → DataStream | 对窗口里的数据进行聚合操作: windowedStream.sum(0); windowedStream.sum(“key”); |
//基于事件驱动,每100个事件,划分一个窗口
dataStream.keyBy(0)
.countWindow(100)
.sum(1)
.printToErr();
//基于时间驱动,每隔1分钟划分一个窗口
dataStream.keyBy(0)
.timeWindow(Time.minutes(1))
.sum(1)
.printToErr();
滑动窗口是固定窗口的更广义的一种形式;
滑动窗口将事件分配到固定大小的可重叠的窗口中去。
特点:时间对齐,窗口长度固定,可以有重叠。
通过提供窗口的长度和滑动距离来定义滑动窗口。滑动距离定义了创建新窗口的间隔。
//基于时间驱动,每隔30s计算一下最近一分钟的数据
mapStream
.keyBy(0)
.timeWindow(Time.minutes(1),Time.seconds(30))
.sum(1)
.printToErr();
//基于事件驱动,每10个元素触发一次计算,窗口里的事件数据最多为100个
mapStream
.keyBy(0)
.countWindow(100,10)
.sum(1)
.printToErr();
//基于会话驱动,通过会话Session Gap来区分
source
.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1)
.print(System.currentTimeMillis() + ":");
//.windowAll() //.timeWindowAll() //.countWindowAll() //简单的字符串--每5个操作男生女生各有多少人 streamSource.countWindowAll(5).apply(new AllWindowFunction<String, String, GlobalWindow>() { @Override public void apply(GlobalWindow globalWindow, Iterable<String> iterable, Collector<String> collector) throws Exception { int man = 0; int woman = 0; Iterator<String> iterator = iterable.iterator(); while (iterator.hasNext()) { if (iterator.next().equals("w")) { woman++; } else { man++; } } collector.collect("man:" + man); collector.collect("woman:" + woman); } }).print();
包括:ReduceFunction、AggregateFunction和FoldFunction
streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
}).keyBy(0).countWindow(5).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> all, Tuple2<String, Integer> each) throws Exception {
System.out.println("Hello06ReduceFunction.reduce[" + all + "][" + each + "]");
all.setField(all.f1 + each.f1, 1);
return all;
}
}).print();
streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return Tuple2.of(s, 1); } }).keyBy(0).countWindow(5).process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, Tuple, GlobalWindow>() { @Override public void process(Tuple key, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Object> collector) throws Exception { System.out.println("Hello07ProcessFunction.process[" + key + "]"); //计算平均结果 int sum = 0; int count = 0; Iterator<Tuple2<String, Integer>> iterator = iterable.iterator(); while (iterator.hasNext()) { Tuple2<String, Integer> tuple2 = iterator.next(); sum += tuple2.f1; count++; } //计算平均值并进行收集 collector.collect(key + "--" + (sum * 1.0 / count)); } }).print();
.trigger()
——触发器
定义window 什么时候关闭,触发计算并输出结果
.evitor()
——移除器
定义移除某些数据的逻辑
.allowedLateness()
——允许处理迟到的数据
.sideOutputLateData()
——将迟到的数据放入侧输出流
.getSideOutput()
——获取侧输出流
Event Time:事件时间 ;
Ingestion Time:数据进入Flink的时间
Processing Time:处理时间;
乱序数据的影响
Event Time
模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。watermark 是一条特殊的数据记录 。
watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退 。
watermark 与数据的时间戳相关。
在 Flink 的窗口处理过程中,如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。
这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到 达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。
当任何 Event 进入到 Flink系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
如果有 窗口的结束时间 <= WaterMark(maxEventTime – t(设置的延迟时间))
,那么这个窗口被触发执行。
Flink内部传播水位线的策略可以归纳为3点:
首先,水位线是以广播的形式在算子之间进行传播
Long.MAX_VALUE表示事件时间的结束,即未来不会有数据到来了
/**
* 当一个source关闭时,会输出一个Long.MAX_VALUE的水位线,当一个算子接收到该水
位线时,
* 相当于接收到一个信号:未来不会再有数据输入了
*/
@PublicEvolving
public final class Watermark extends StreamElement {
//表示事件时间的结束
public static final Watermark MAX_WATERMARK = new Watermark(9223372036854775807L);
}
单个分区的输入取最大值,多个分区的输入取最小值
本来有序的Stream中的Watermark
乱序事件中的Watermark
并行数据流中的Watermark
一种方式为在数据源完成的,即利用SourceFunction在应用读入数据流的时候分配时间戳与水位线。
通过实现接口的自定义函数,该方式又包括两种实现方式:
//给源数据添加水位线
andWatermarks = dataStream.assignTimestampsAndWatermarks(new PunctuatedWaterMark()).setParallelism(1);
周期性生成水位线,即实现AssignerWithPeriodicWatermarks接口,
ExecutionConfig.setAutoWatermarkInterval()
方法进行设置val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每隔 5 秒产生一个 env.getConfig.setAutoWatermarkInterval(5000) //给源数据添加水位线 SingleOutputStreamOperator<String> andWatermarks = dataStream.assignTimestampsAndWatermarks(new PeriodicWaterMark()).setParallelism(1); class PeriodicWaterMark implements AssignerWithPeriodicWatermarks<String> { //数据允许的延迟情况 long maxLateTime = 5000; //当前系统最大的时间 long currentMaxTimestamp = Long.MIN_VALUE; //水印产生,周期性产生,默认200ms,基于自己业务的时间容忍度去产生水印,因为要通过水印来解决数据的延迟/乱序问题 @Override public Watermark getCurrentWatermark() { long watermarkTimeStamp = System.currentTimeMillis() - maxLateTime; System.out.println("PeriodicWaterMark.getCurrentWatermark[" + long2date(watermarkTimeStamp) + "]"); //本次水位线的位置 Watermark waterMark = new Watermark(watermarkTimeStamp); return waterMark; } /** * 从事件中抽取时间,假设数据格式为 hello,1630034287000 * * @param element * @param previousElementTimestamp * @return */ @Override public long extractTimestamp(String element, long previousElementTimestamp) { long eventTimestamp = Long.valueOf(element.split(",")[1]); System.out.println("PeriodicWaterMark.extractTimestamp事件时间[" + long2date(eventTimestamp) + "]"); return eventTimestamp; } private String long2date(long time) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss - SSS").format(new Date(time)); } }
//给源数据添加水位线 SingleOutputStreamOperator<String> andWatermarks = dataStream.assignTimestampsAndWatermarks(new PunctuatedWaterMark()).setParallelism(1); class PunctuatedWaterMark implements AssignerWithPunctuatedWatermarks<String> { @Override public Watermark checkAndGetNextWatermark(String line, long l) { if (line != null && "hello".equals(line)) { return new Watermark(System.currentTimeMillis()); } else { return null; } } @Override public long extractTimestamp(String line, long previousElementTimestamp) { long timestamp = System.currentTimeMillis(); System.out.println("[" + line + "][" + timestamp + "]"); return timestamp; } }
水位线可能会大于后来数据的时间戳,这就意味着数据有延迟,关于延迟数据的处理,Flink提供了一些机制,具体如下:
//给源数据添加水位线
SingleOutputStreamOperator<String> andWatermarks = dataStream.assignTimestampsAndWatermarks(new HelloPeriodicWaterMark()).setParallelism(1);
//开始处理数据
andWatermarks.map(word -> Tuple2.of(word.split(",")[0], (int) (Long.parseLong(word.split(",")[1]) % 1000)))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.seconds(3))
//设置allowedLateness()方法 迟到的数据也可以计算
.allowedLateness(Time.seconds(1))
.sum(1)
.print();
//需要提前声明侧输出的容器 OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late") {}; //给源数据添加水位线 SingleOutputStreamOperator<String> andWatermarks = dataStream.assignTimestampsAndWatermarks(new HelloPeriodicWaterMark()).setParallelism(1); //开始处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = andWatermarks.map(word -> Tuple2.of(word.split(",")[0], (int) (Long.parseLong(word.split(",")[1]) % 1000))) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(0) .timeWindow(Time.seconds(3)) .allowedLateness(Time.seconds(1)) //使用sideOutputLateData()方法 .sideOutputLateData(lateOutputTag) .sum(1); sum.print("sum:"); sum.getSideOutput(lateOutputTag).print("side:");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。