当前位置:   article > 正文

快速灵敏的Flink2_slidingprocessingtimewindows parameters must satis

slidingprocessingtimewindows parameters must satisfy abs(offset) < slide and

flink基础知识

TumblingEventTimeWindows 滚动开窗

  1. package org.apache.flink.streaming.api.windowing.assigners;
  2. import org.apache.flink.annotation.PublicEvolving;
  3. import org.apache.flink.api.common.ExecutionConfig;
  4. import org.apache.flink.api.common.typeutils.TypeSerializer;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
  8. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. import java.util.Collection;
  11. import java.util.Collections;
  12. /**
  13. * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
  14. * elements. Windows cannot overlap.
  15. *
  16. * <p>For example, in order to window into windows of 1 minute:
  17. *
  18. * <pre>{@code
  19. * DataStream<Tuple2<String, Integer>> in = ...;
  20. * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
  21. * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
  22. * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
  23. * }</pre>
  24. */
  25. @PublicEvolving
  26. public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
  27. private static final long serialVersionUID = 1L;
  28. private final long size;
  29. private final long globalOffset;
  30. private Long staggerOffset = null;
  31. private final WindowStagger windowStagger;
  32. protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
  33. if (Math.abs(offset) >= size) {
  34. throw new IllegalArgumentException(
  35. "TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
  36. }
  37. this.size = size;
  38. this.globalOffset = offset;
  39. this.windowStagger = windowStagger;
  40. }
  41. @Override
  42. public Collection<TimeWindow> assignWindows(
  43. Object element, long timestamp, WindowAssignerContext context) {
  44. if (timestamp > Long.MIN_VALUE) {
  45. if (staggerOffset == null) {
  46. staggerOffset =
  47. windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
  48. }
  49. // Long.MIN_VALUE is currently assigned when no timestamp is present
  50. long start =
  51. TimeWindow.getWindowStartWithOffset(
  52. timestamp, (globalOffset + staggerOffset) % size, size);
  53. return Collections.singletonList(new TimeWindow(start, start + size));
  54. } else {
  55. throw new RuntimeException(
  56. "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
  57. + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
  58. + "'DataStream.assignTimestampsAndWatermarks(...)'?");
  59. }
  60. }
  61. @Override
  62. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  63. return EventTimeTrigger.create();
  64. }
  65. @Override
  66. public String toString() {
  67. return "TumblingEventTimeWindows(" + size + ")";
  68. }
  69. /**
  70. * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
  71. * to time windows based on the element timestamp.
  72. *
  73. * @param size The size of the generated windows.
  74. * @return The time policy.
  75. */
  76. public static TumblingEventTimeWindows of(Time size) {
  77. return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
  78. }
  79. /**
  80. * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
  81. * to time windows based on the element timestamp and offset.
  82. *
  83. * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
  84. * each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
  85. * windows start at 0:15:00,1:15:00,2:15:00,etc.
  86. *
  87. * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
  88. * China which is using UTC+08:00,and you want a time window with size of one day, and window
  89. * begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
  90. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
  91. * UTC time.
  92. *
  93. * @param size The size of the generated windows.
  94. * @param offset The offset which window start would be shifted by.
  95. */
  96. public static TumblingEventTimeWindows of(Time size, Time offset) {
  97. return new TumblingEventTimeWindows(
  98. size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
  99. }
  100. /**
  101. * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
  102. * to time windows based on the element timestamp, offset and a staggering offset, depending on
  103. * the staggering policy.
  104. *
  105. * @param size The size of the generated windows.
  106. * @param offset The globalOffset which window start would be shifted by.
  107. * @param windowStagger The utility that produces staggering offset in runtime.
  108. */
  109. @PublicEvolving
  110. public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
  111. return new TumblingEventTimeWindows(
  112. size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
  113. }
  114. @Override
  115. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  116. return new TimeWindow.Serializer();
  117. }
  118. @Override
  119. public boolean isEventTime() {
  120. return true;
  121. }
  122. }

SlidingEventTimeWindows 滑动开窗

  1. package org.apache.flink.streaming.api.windowing.assigners;
  2. import org.apache.flink.annotation.PublicEvolving;
  3. import org.apache.flink.api.common.ExecutionConfig;
  4. import org.apache.flink.api.common.typeutils.TypeSerializer;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
  8. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. import java.util.ArrayList;
  11. import java.util.Collection;
  12. import java.util.List;
  13. /**
  14. * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
  15. * elements. Windows can possibly overlap.
  16. *
  17. * <p>For example, in order to window into windows of 1 minute, every 10 seconds:
  18. *
  19. * <pre>{@code
  20. * DataStream<Tuple2<String, Integer>> in = ...;
  21. * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
  22. * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
  23. * keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
  24. * }</pre>
  25. */
  26. @PublicEvolving
  27. public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
  28. private static final long serialVersionUID = 1L;
  29. private final long size;
  30. private final long slide;
  31. private final long offset;
  32. protected SlidingEventTimeWindows(long size, long slide, long offset) {
  33. if (Math.abs(offset) >= slide || size <= 0) {
  34. throw new IllegalArgumentException(
  35. "SlidingEventTimeWindows parameters must satisfy "
  36. + "abs(offset) < slide and size > 0");
  37. }
  38. this.size = size;
  39. this.slide = slide;
  40. this.offset = offset;
  41. }
  42. @Override
  43. public Collection<TimeWindow> assignWindows(
  44. Object element, long timestamp, WindowAssignerContext context) {
  45. if (timestamp > Long.MIN_VALUE) {
  46. List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
  47. long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
  48. for (long start = lastStart; start > timestamp - size; start -= slide) {
  49. windows.add(new TimeWindow(start, start + size));
  50. }
  51. return windows;
  52. } else {
  53. throw new RuntimeException(
  54. "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
  55. + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
  56. + "'DataStream.assignTimestampsAndWatermarks(...)'?");
  57. }
  58. }
  59. public long getSize() {
  60. return size;
  61. }
  62. public long getSlide() {
  63. return slide;
  64. }
  65. @Override
  66. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  67. return EventTimeTrigger.create();
  68. }
  69. @Override
  70. public String toString() {
  71. return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
  72. }
  73. /**
  74. * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
  75. * sliding time windows based on the element timestamp.
  76. *
  77. * @param size The size of the generated windows.
  78. * @param slide The slide interval of the generated windows.
  79. * @return The time policy.
  80. */
  81. public static SlidingEventTimeWindows of(Time size, Time slide) {
  82. return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
  83. }
  84. /**
  85. * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
  86. * time windows based on the element timestamp and offset.
  87. *
  88. * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
  89. * each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
  90. * windows start at 0:15:00,1:15:00,2:15:00,etc.
  91. *
  92. * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
  93. * China which is using UTC+08:00,and you want a time window with size of one day, and window
  94. * begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
  95. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
  96. * UTC time.
  97. *
  98. * @param size The size of the generated windows.
  99. * @param slide The slide interval of the generated windows.
  100. * @param offset The offset which window start would be shifted by.
  101. * @return The time policy.
  102. */
  103. public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
  104. return new SlidingEventTimeWindows(
  105. size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
  106. }
  107. @Override
  108. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  109. return new TimeWindow.Serializer();
  110. }
  111. @Override
  112. public boolean isEventTime() {
  113. return true;
  114. }
  115. }

assignTimestampsAndWatermarks 水印

  1. /**
  2. * Assigns timestamps to the elements in the data stream and generates watermarks to signal
  3. * event time progress. The given [[WatermarkStrategy is used to create a [[TimestampAssigner]]
  4. * and [[org.apache.flink.api.common.eventtime.WatermarkGenerator]].
  5. *
  6. * For each event in the data stream, the [[TimestampAssigner#extractTimestamp(Object, long)]]
  7. * method is called to assign an event timestamp.
  8. *
  9. * For each event in the data stream, the
  10. * [[WatermarkGenerator#onEvent(Object, long, WatermarkOutput)]] will be called.
  11. *
  12. * Periodically (defined by the [[ExecutionConfig#getAutoWatermarkInterval()]]), the
  13. * [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be called.
  14. *
  15. * Common watermark generation patterns can be found as static methods in the
  16. * [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class.
  17. */
  18. def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T] = {
  19. val cleanedStrategy = clean(watermarkStrategy)
  20. asScalaStream(stream.assignTimestampsAndWatermarks(cleanedStrategy))
  21. }

allowedLateness 分布式架构

分布式架构,有可能出现数据的乱序,窗口要关闭的时候,数据还没有到,那么窗口等一会再关闭,解决数据的迟到问题。允许处理迟到的数据。

  1. /**
  2. * Sets the allowed lateness to a user-specified value.
  3. * If not explicitly set, the allowed lateness is [[0L]].
  4. * Setting the allowed lateness is only valid for event-time windows.
  5. * If a value different than 0 is provided with a processing-time
  6. * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
  7. * then an exception is thrown.
  8. */
  9. @PublicEvolving
  10. def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
  11. javaStream.allowedLateness(lateness)
  12. this
  13. }

sideOutputLateData  侧输出流

上面allowedLateness()之后,发现还有没到的,放在侧输出流。将迟到的数据放入侧输出流。

  1. /**
  2. * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
  3. * is considered late after the watermark has passed the end of the window plus the allowed
  4. * lateness set using [[allowedLateness(Time)]].
  5. *
  6. * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
  7. * resulting from the windowed operation with the same [[OutputTag]].
  8. */
  9. @PublicEvolving
  10. def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
  11. javaStream.sideOutputLateData(outputTag)
  12. this
  13. }

flink实操

(1)

  1. package nj.zb.kb23.api.windows
  2. import java.time.Duration
  3. import nj.zb.kb23.source.SensorReading
  4. import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  7. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  8. import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  11. import org.apache.flink.util.Collector
  12. object WindowEventTimeTest {
  13. def main(args: Array[String]): Unit = {
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
  17. val dataStream: DataStream[SensorReading] = inputStream.map(data => {
  18. val arr: Array[String] = data.split(",")
  19. SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
  20. })
  21. // dataStream.setParallelism()
  22. //设置以事件时间为时间语意
  23. val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
  24. WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
  25. .withTimestampAssigner(
  26. new SerializableTimestampAssigner[SensorReading] {
  27. override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
  28. //指定事件时间的字段
  29. element.timestamp * 1000
  30. }
  31. }
  32. )
  33. )
  34. val lataTag = new OutputTag[SensorReading]("lastdata")
  35. val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
  36. .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
  37. // .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
  38. .allowedLateness(Time.minutes(1)) //最长延迟一分钟
  39. .sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
  40. /* val resultStream: DataStream[SensorReading] = windowStream.reduce(
  41. (curReduce, newReducw) => {
  42. SensorReading(curReduce.id, curReduce.timestamp, curReduce.temperature.min(newReducw.temperature))
  43. }
  44. )*/
  45. val resultStream: DataStream[SensorReading] = windowStream.process(new MyEventProcessWindowFunction)
  46. resultStream.print("result:")
  47. resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
  48. env.execute("windowEventTime")
  49. }
  50. }
  51. class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,SensorReading,String,TimeWindow]{
  52. override def process(
  53. key: String,
  54. context: Context,
  55. elements: Iterable[SensorReading],
  56. out: Collector[SensorReading]): Unit = {
  57. val window: TimeWindow = context.window
  58. println(window.getStart,window.getEnd)
  59. val iterator: Iterator[SensorReading] = elements.iterator
  60. var temp = 100.0
  61. var timestam = 1
  62. out.collect(SensorReading(key,timestam,temp))
  63. // out.collect(SensorReading(key,1,0.0))
  64. }
  65. }

(2)

  1. package nj.zb.kb23.api.windows
  2. import java.time.Duration
  3. import nj.zb.kb23.source.SensorReading
  4. import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  7. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  8. import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  11. import org.apache.flink.util.Collector
  12. object WindowEventTimeTest {
  13. def main(args: Array[String]): Unit = {
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
  17. val dataStream: DataStream[SensorReading] = inputStream.map(data => {
  18. val arr: Array[String] = data.split(",")
  19. SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
  20. })
  21. // dataStream.setParallelism()
  22. //设置以时间时间为时间语意
  23. val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
  24. WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
  25. .withTimestampAssigner(
  26. new SerializableTimestampAssigner[SensorReading] {
  27. override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
  28. //指定事件时间的字段
  29. element.timestamp * 1000
  30. }
  31. }
  32. )
  33. )
  34. val lataTag = new OutputTag[SensorReading]("lastdata")
  35. val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
  36. .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
  37. // .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
  38. .allowedLateness(Time.minutes(1)) //最长延迟一分钟
  39. .sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
  40. val resultStream: DataStream[(String, Double, Long, Long)] = windowStream.process(new MyEventProcessWindowFunction)
  41. resultStream.print("result:")
  42. resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
  43. env.execute("windowEventTime")
  44. }
  45. }
  46. class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,(String,Double,Long,Long),String,TimeWindow]{
  47. override def process(
  48. key: String,
  49. context: Context,
  50. elements: Iterable[SensorReading],
  51. out: Collector[(String, Double, Long, Long)]): Unit = {
  52. val window: TimeWindow = context.window
  53. val iterator: Iterator[SensorReading] = elements.iterator
  54. var temp = 100.0
  55. var timestam = 1
  56. while (iterator.hasNext){
  57. val sensorReading: SensorReading = iterator.next()
  58. temp = temp.min(sensorReading.temperature)
  59. }
  60. out.collect((key,temp,window.getStart,window.getEnd))
  61. }
  62. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1019226
推荐阅读
相关标签
  

闽ICP备14008679号