当前位置:   article > 正文

聊聊flink的window操作

slidingeventtimewindows of(time size, time slide)

本文主要研究一下flink的window操作

window

DataStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

  1. public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
  2. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
  3. return windowAll(TumblingProcessingTimeWindows.of(size));
  4. } else {
  5. return windowAll(TumblingEventTimeWindows.of(size));
  6. }
  7. }
  8. public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
  9. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
  10. return windowAll(SlidingProcessingTimeWindows.of(size, slide));
  11. } else {
  12. return windowAll(SlidingEventTimeWindows.of(size, slide));
  13. }
  14. }
  15. public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
  16. return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
  17. }
  18. public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
  19. return windowAll(GlobalWindows.create())
  20. .evictor(CountEvictor.of(size))
  21. .trigger(CountTrigger.of(slide));
  22. }
  23. @PublicEvolving
  24. public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
  25. return new AllWindowedStream<>(this, assigner);
  26. }
  27. 复制代码
  • 对于非KeyedStream,有timeWindowAll、countWindowAll、windowAll操作,其中最主要的是windowAll操作,它的parallelism为1,它需要一个WindowAssigner参数,返回的是AllWindowedStream

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

  1. public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
  2. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
  3. return window(TumblingProcessingTimeWindows.of(size));
  4. } else {
  5. return window(TumblingEventTimeWindows.of(size));
  6. }
  7. }
  8. public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
  9. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
  10. return window(SlidingProcessingTimeWindows.of(size, slide));
  11. } else {
  12. return window(SlidingEventTimeWindows.of(size, slide));
  13. }
  14. }
  15. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
  16. return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
  17. }
  18. public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
  19. return window(GlobalWindows.create())
  20. .evictor(CountEvictor.of(size))
  21. .trigger(CountTrigger.of(slide));
  22. }
  23. @PublicEvolving
  24. public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
  25. return new WindowedStream<>(this, assigner);
  26. }
  27. 复制代码
  • 对于KeyedStream除了继承了DataStream的window相关操作,它主要用的是timeWindow、countWindow、window操作,其中最主要的是window操作,它也需要一个WindowAssigner参数,返回的是WindowedStream

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java

  1. @Public
  2. public class WindowedStream<T, K, W extends Window> {
  3. /** The keyed data stream that is windowed by this stream. */
  4. private final KeyedStream<T, K> input;
  5. /** The window assigner. */
  6. private final WindowAssigner<? super T, W> windowAssigner;
  7. /** The trigger that is used for window evaluation/emission. */
  8. private Trigger<? super T, ? super W> trigger;
  9. /** The evictor that is used for evicting elements before window evaluation. */
  10. private Evictor<? super T, ? super W> evictor;
  11. /** The user-specified allowed lateness. */
  12. private long allowedLateness = 0L;
  13. /**
  14. * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
  15. * dropped.
  16. */
  17. private OutputTag<T> lateDataOutputTag;
  18. @PublicEvolving
  19. public WindowedStream(KeyedStream<T, K> input,
  20. WindowAssigner<? super T, W> windowAssigner) {
  21. this.input = input;
  22. this.windowAssigner = windowAssigner;
  23. this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
  24. }
  25. @PublicEvolving
  26. public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
  27. if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
  28. throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
  29. }
  30. if (windowAssigner instanceof BaseAlignedWindowAssigner) {
  31. throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
  32. }
  33. this.trigger = trigger;
  34. return this;
  35. }
  36. @PublicEvolving
  37. public WindowedStream<T, K, W> allowedLateness(Time lateness) {
  38. final long millis = lateness.toMilliseconds();
  39. checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
  40. this.allowedLateness = millis;
  41. return this;
  42. }
  43. @PublicEvolving
  44. public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
  45. Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
  46. this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
  47. return this;
  48. }
  49. @PublicEvolving
  50. public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
  51. if (windowAssigner instanceof BaseAlignedWindowAssigner) {
  52. throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor.");
  53. }
  54. this.evictor = evictor;
  55. return this;
  56. }
  57. // ------------------------------------------------------------------------
  58. // Operations on the keyed windows
  59. // ------------------------------------------------------------------------
  60. //......
  61. }
  62. 复制代码
  • WindowedStream有几个参数,其中构造器要求的是input及windowAssigner参数,然后还有Trigger、Evictor、allowedLateness、OutputTag这几个可选参数;另外还必须设置operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(废弃)、ProcessWindowFunction这几个
  • windowAssigner主要用来决定元素如何划分到window中,这里主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows这几个
  • Trigger用来触发window的发射,Evictor用来在发射window的时候剔除元素,allowedLateness用于指定允许元素落后于watermark的最大时间,超出则被丢弃(仅仅对于event-time window有效),OutputTag用于将late数据输出到side output,可以通过SingleOutputStreamOperator.getSideOutput(OutputTag)方法来获取

AllWindowedStream的属性/操作基本跟WindowedStream类似,这里就不详细展开

小结

  • window操作是处理无限数据流的核心,它将数据流分割为有限大小的buckets,然后就可以在这些有限数据上进行相关的操作。flink的window操作主要分为两大类,一类是针对KeyedStream的window操作,一个是针对non-key stream的windowAll操作
  • window操作主要有几个参数,WindowAssigner是必不可少的参数,主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows这几个;另外还必须设置operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(废弃)、ProcessWindowFunction这几个
  • Trigger、Evictor、allowedLateness、OutputTag这几个为可选参数,Trigger用来触发window的发射,Evictor用来在发射window的时候剔除元素,allowedLateness用于指定允许元素落后于watermark的最大时间,超出则被丢弃(仅仅对于event-time window有效),OutputTag用于将late数据输出到side output,可以通过SingleOutputStreamOperator.getSideOutput(OutputTag)方法来获取

doc

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号