当前位置:   article > 正文

聊聊flink的Tumbling Window

tumblingwindowassigner parameters must satisfy size > 0

本文主要研究一下flink的Tumbling Window

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

  1. @PublicEvolving
  2. public abstract class WindowAssigner<T, W extends Window> implements Serializable {
  3. private static final long serialVersionUID = 1L;
  4. /**
  5. * Returns a {@code Collection} of windows that should be assigned to the element.
  6. *
  7. * @param element The element to which windows should be assigned.
  8. * @param timestamp The timestamp of the element.
  9. * @param context The {@link WindowAssignerContext} in which the assigner operates.
  10. */
  11. public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
  12. /**
  13. * Returns the default trigger associated with this {@code WindowAssigner}.
  14. */
  15. public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
  16. /**
  17. * Returns a {@link TypeSerializer} for serializing windows that are assigned by
  18. * this {@code WindowAssigner}.
  19. */
  20. public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
  21. /**
  22. * Returns {@code true} if elements are assigned to windows based on event time,
  23. * {@code false} otherwise.
  24. */
  25. public abstract boolean isEventTime();
  26. /**
  27. * A context provided to the {@link WindowAssigner} that allows it to query the
  28. * current processing time.
  29. *
  30. * <p>This is provided to the assigner by its containing
  31. * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
  32. * which, in turn, gets it from the containing
  33. * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
  34. */
  35. public abstract static class WindowAssignerContext {
  36. /**
  37. * Returns the current processing time.
  38. */
  39. public abstract long getCurrentProcessingTime();
  40. }
  41. }
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型

Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

  1. @PublicEvolving
  2. public abstract class Window {
  3. /**
  4. * Gets the largest timestamp that still belongs to this window.
  5. *
  6. * @return The largest timestamp that still belongs to this window.
  7. */
  8. public abstract long maxTimestamp();
  9. }
  • Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindow

TimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

  1. @PublicEvolving
  2. public class TimeWindow extends Window {
  3. private final long start;
  4. private final long end;
  5. public TimeWindow(long start, long end) {
  6. this.start = start;
  7. this.end = end;
  8. }
  9. /**
  10. * Gets the starting timestamp of the window. This is the first timestamp that belongs
  11. * to this window.
  12. *
  13. * @return The starting timestamp of this window.
  14. */
  15. public long getStart() {
  16. return start;
  17. }
  18. /**
  19. * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
  20. * is the first timestamp that does not belong to this window any more.
  21. *
  22. * @return The exclusive end timestamp of this window.
  23. */
  24. public long getEnd() {
  25. return end;
  26. }
  27. /**
  28. * Gets the largest timestamp that still belongs to this window.
  29. *
  30. * <p>This timestamp is identical to {@code getEnd() - 1}.
  31. *
  32. * @return The largest timestamp that still belongs to this window.
  33. *
  34. * @see #getEnd()
  35. */
  36. @Override
  37. public long maxTimestamp() {
  38. return end - 1;
  39. }
  40. @Override
  41. public boolean equals(Object o) {
  42. if (this == o) {
  43. return true;
  44. }
  45. if (o == null || getClass() != o.getClass()) {
  46. return false;
  47. }
  48. TimeWindow window = (TimeWindow) o;
  49. return end == window.end && start == window.start;
  50. }
  51. @Override
  52. public int hashCode() {
  53. return MathUtils.longToIntWithBitMixing(start + end);
  54. }
  55. @Override
  56. public String toString() {
  57. return "TimeWindow{" +
  58. "start=" + start +
  59. ", end=" + end +
  60. '}';
  61. }
  62. /**
  63. * Returns {@code true} if this window intersects the given window.
  64. */
  65. public boolean intersects(TimeWindow other) {
  66. return this.start <= other.end && this.end >= other.start;
  67. }
  68. /**
  69. * Returns the minimal window covers both this window and the given window.
  70. */
  71. public TimeWindow cover(TimeWindow other) {
  72. return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
  73. }
  74. // ------------------------------------------------------------------------
  75. // Serializer
  76. // ------------------------------------------------------------------------
  77. //......
  78. // ------------------------------------------------------------------------
  79. // Utilities
  80. // ------------------------------------------------------------------------
  81. /**
  82. * Merge overlapping {@link TimeWindow}s. For use by merging
  83. * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
  84. */
  85. public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
  86. // sort the windows by the start time and then merge overlapping windows
  87. List<TimeWindow> sortedWindows = new ArrayList<>(windows);
  88. Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
  89. @Override
  90. public int compare(TimeWindow o1, TimeWindow o2) {
  91. return Long.compare(o1.getStart(), o2.getStart());
  92. }
  93. });
  94. List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
  95. Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
  96. for (TimeWindow candidate: sortedWindows) {
  97. if (currentMerge == null) {
  98. currentMerge = new Tuple2<>();
  99. currentMerge.f0 = candidate;
  100. currentMerge.f1 = new HashSet<>();
  101. currentMerge.f1.add(candidate);
  102. } else if (currentMerge.f0.intersects(candidate)) {
  103. currentMerge.f0 = currentMerge.f0.cover(candidate);
  104. currentMerge.f1.add(candidate);
  105. } else {
  106. merged.add(currentMerge);
  107. currentMerge = new Tuple2<>();
  108. currentMerge.f0 = candidate;
  109. currentMerge.f1 = new HashSet<>();
  110. currentMerge.f1.add(candidate);
  111. }
  112. }
  113. if (currentMerge != null) {
  114. merged.add(currentMerge);
  115. }
  116. for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
  117. if (m.f1.size() > 1) {
  118. c.merge(m.f1, m.f0);
  119. }
  120. }
  121. }
  122. /**
  123. * Method to get the window start for a timestamp.
  124. *
  125. * @param timestamp epoch millisecond to get the window start.
  126. * @param offset The offset which window start would be shifted by.
  127. * @param windowSize The size of the generated windows.
  128. * @return window start
  129. */
  130. public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  131. return timestamp - (timestamp - offset + windowSize) % windowSize;
  132. }
  133. }
  • TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法
  • TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口
  • TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start

TumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

  1. @PublicEvolving
  2. public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
  3. private static final long serialVersionUID = 1L;
  4. private final long size;
  5. private final long offset;
  6. protected TumblingEventTimeWindows(long size, long offset) {
  7. if (offset < 0 || offset >= size) {
  8. throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
  9. }
  10. this.size = size;
  11. this.offset = offset;
  12. }
  13. @Override
  14. public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
  15. if (timestamp > Long.MIN_VALUE) {
  16. // Long.MIN_VALUE is currently assigned when no timestamp is present
  17. long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
  18. return Collections.singletonList(new TimeWindow(start, start + size));
  19. } else {
  20. throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
  21. "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
  22. "'DataStream.assignTimestampsAndWatermarks(...)'?");
  23. }
  24. }
  25. @Override
  26. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  27. return EventTimeTrigger.create();
  28. }
  29. @Override
  30. public String toString() {
  31. return "TumblingEventTimeWindows(" + size + ")";
  32. }
  33. public static TumblingEventTimeWindows of(Time size) {
  34. return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
  35. }
  36. public static TumblingEventTimeWindows of(Time size, Time offset) {
  37. return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
  38. }
  39. @Override
  40. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  41. return new TimeWindow.Serializer();
  42. }
  43. @Override
  44. public boolean isEventTime() {
  45. return true;
  46. }
  47. }
  • TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回true
  • TumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数

TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

  1. public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
  2. private static final long serialVersionUID = 1L;
  3. private final long size;
  4. private final long offset;
  5. private TumblingProcessingTimeWindows(long size, long offset) {
  6. if (offset < 0 || offset >= size) {
  7. throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size");
  8. }
  9. this.size = size;
  10. this.offset = offset;
  11. }
  12. @Override
  13. public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
  14. final long now = context.getCurrentProcessingTime();
  15. long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
  16. return Collections.singletonList(new TimeWindow(start, start + size));
  17. }
  18. public long getSize() {
  19. return size;
  20. }
  21. @Override
  22. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  23. return ProcessingTimeTrigger.create();
  24. }
  25. @Override
  26. public String toString() {
  27. return "TumblingProcessingTimeWindows(" + size + ")";
  28. }
  29. public static TumblingProcessingTimeWindows of(Time size) {
  30. return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
  31. }
  32. public static TumblingProcessingTimeWindows of(Time size, Time offset) {
  33. return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
  34. }
  35. @Override
  36. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  37. return new TimeWindow.Serializer();
  38. }
  39. @Override
  40. public boolean isEventTime() {
  41. return false;
  42. }
  43. }
  • TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为false
  • TumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数

小结

  • flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
  • TumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

doc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/1019215
推荐阅读
相关标签
  

闽ICP备14008679号