赞
踩
- package org.apache.flink.streaming.api.windowing.assigners;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.ExecutionConfig;
- import org.apache.flink.api.common.typeutils.TypeSerializer;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- import java.util.Collection;
- import java.util.Collections;
-
- /**
- * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
- * elements. Windows cannot overlap.
- *
- * <p>For example, in order to window into windows of 1 minute:
- *
- * <pre>{@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
- * }</pre>
- */
- @PublicEvolving
- public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private final long size;
-
- private final long globalOffset;
-
- private Long staggerOffset = null;
-
- private final WindowStagger windowStagger;
-
- protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
- if (Math.abs(offset) >= size) {
- throw new IllegalArgumentException(
- "TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
- }
-
- this.size = size;
- this.globalOffset = offset;
- this.windowStagger = windowStagger;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(
- Object element, long timestamp, WindowAssignerContext context) {
- if (timestamp > Long.MIN_VALUE) {
- if (staggerOffset == null) {
- staggerOffset =
- windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
- }
- // Long.MIN_VALUE is currently assigned when no timestamp is present
- long start =
- TimeWindow.getWindowStartWithOffset(
- timestamp, (globalOffset + staggerOffset) % size, size);
- return Collections.singletonList(new TimeWindow(start, start + size));
- } else {
- throw new RuntimeException(
- "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
- + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
- + "'DataStream.assignTimestampsAndWatermarks(...)'?");
- }
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- return EventTimeTrigger.create();
- }
-
- @Override
- public String toString() {
- return "TumblingEventTimeWindows(" + size + ")";
- }
-
- /**
- * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
- * to time windows based on the element timestamp.
- *
- * @param size The size of the generated windows.
- * @return The time policy.
- */
- public static TumblingEventTimeWindows of(Time size) {
- return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
- }
-
- /**
- * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
- * to time windows based on the element timestamp and offset.
- *
- * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
- * each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
- * windows start at 0:15:00,1:15:00,2:15:00,etc.
- *
- * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
- * China which is using UTC+08:00,and you want a time window with size of one day, and window
- * begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
- * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
- * UTC time.
- *
- * @param size The size of the generated windows.
- * @param offset The offset which window start would be shifted by.
- */
- public static TumblingEventTimeWindows of(Time size, Time offset) {
- return new TumblingEventTimeWindows(
- size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
- }
-
- /**
- * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns elements
- * to time windows based on the element timestamp, offset and a staggering offset, depending on
- * the staggering policy.
- *
- * @param size The size of the generated windows.
- * @param offset The globalOffset which window start would be shifted by.
- * @param windowStagger The utility that produces staggering offset in runtime.
- */
- @PublicEvolving
- public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
- return new TumblingEventTimeWindows(
- size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
- }
-
- @Override
- public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new TimeWindow.Serializer();
- }
-
- @Override
- public boolean isEventTime() {
- return true;
- }
- }
- package org.apache.flink.streaming.api.windowing.assigners;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.ExecutionConfig;
- import org.apache.flink.api.common.typeutils.TypeSerializer;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
-
- /**
- * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
- * elements. Windows can possibly overlap.
- *
- * <p>For example, in order to window into windows of 1 minute, every 10 seconds:
- *
- * <pre>{@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- * keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
- * }</pre>
- */
- @PublicEvolving
- public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private final long size;
-
- private final long slide;
-
- private final long offset;
-
- protected SlidingEventTimeWindows(long size, long slide, long offset) {
- if (Math.abs(offset) >= slide || size <= 0) {
- throw new IllegalArgumentException(
- "SlidingEventTimeWindows parameters must satisfy "
- + "abs(offset) < slide and size > 0");
- }
-
- this.size = size;
- this.slide = slide;
- this.offset = offset;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(
- Object element, long timestamp, WindowAssignerContext context) {
- if (timestamp > Long.MIN_VALUE) {
- List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
- long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
- for (long start = lastStart; start > timestamp - size; start -= slide) {
- windows.add(new TimeWindow(start, start + size));
- }
- return windows;
- } else {
- throw new RuntimeException(
- "Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
- + "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
- + "'DataStream.assignTimestampsAndWatermarks(...)'?");
- }
- }
-
- public long getSize() {
- return size;
- }
-
- public long getSlide() {
- return slide;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- return EventTimeTrigger.create();
- }
-
- @Override
- public String toString() {
- return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
- }
-
- /**
- * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
- * sliding time windows based on the element timestamp.
- *
- * @param size The size of the generated windows.
- * @param slide The slide interval of the generated windows.
- * @return The time policy.
- */
- public static SlidingEventTimeWindows of(Time size, Time slide) {
- return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
- }
-
- /**
- * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns elements to
- * time windows based on the element timestamp and offset.
- *
- * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes of
- * each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get time
- * windows start at 0:15:00,1:15:00,2:15:00,etc.
- *
- * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as
- * China which is using UTC+08:00,and you want a time window with size of one day, and window
- * begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
- * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than
- * UTC time.
- *
- * @param size The size of the generated windows.
- * @param slide The slide interval of the generated windows.
- * @param offset The offset which window start would be shifted by.
- * @return The time policy.
- */
- public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
- return new SlidingEventTimeWindows(
- size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
- }
-
- @Override
- public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new TimeWindow.Serializer();
- }
-
- @Override
- public boolean isEventTime() {
- return true;
- }
- }
- /**
- * Assigns timestamps to the elements in the data stream and generates watermarks to signal
- * event time progress. The given [[WatermarkStrategy is used to create a [[TimestampAssigner]]
- * and [[org.apache.flink.api.common.eventtime.WatermarkGenerator]].
- *
- * For each event in the data stream, the [[TimestampAssigner#extractTimestamp(Object, long)]]
- * method is called to assign an event timestamp.
- *
- * For each event in the data stream, the
- * [[WatermarkGenerator#onEvent(Object, long, WatermarkOutput)]] will be called.
- *
- * Periodically (defined by the [[ExecutionConfig#getAutoWatermarkInterval()]]), the
- * [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be called.
- *
- * Common watermark generation patterns can be found as static methods in the
- * [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class.
- */
- def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T] = {
- val cleanedStrategy = clean(watermarkStrategy)
-
- asScalaStream(stream.assignTimestampsAndWatermarks(cleanedStrategy))
- }
分布式架构,有可能出现数据的乱序,窗口要关闭的时候,数据还没有到,那么窗口等一会再关闭,解决数据的迟到问题。允许处理迟到的数据。
- /**
- * Sets the allowed lateness to a user-specified value.
- * If not explicitly set, the allowed lateness is [[0L]].
- * Setting the allowed lateness is only valid for event-time windows.
- * If a value different than 0 is provided with a processing-time
- * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
- * then an exception is thrown.
- */
- @PublicEvolving
- def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
- javaStream.allowedLateness(lateness)
- this
- }
上面allowedLateness()之后,发现还有没到的,放在侧输出流。将迟到的数据放入侧输出流。
- /**
- * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
- * is considered late after the watermark has passed the end of the window plus the allowed
- * lateness set using [[allowedLateness(Time)]].
- *
- * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
- * resulting from the windowed operation with the same [[OutputTag]].
- */
- @PublicEvolving
- def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
- javaStream.sideOutputLateData(outputTag)
- this
- }
(1)
- package nj.zb.kb23.api.windows
-
- import java.time.Duration
-
- import nj.zb.kb23.source.SensorReading
- import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.flink.util.Collector
-
- object WindowEventTimeTest {
- def main(args: Array[String]): Unit = {
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
- val dataStream: DataStream[SensorReading] = inputStream.map(data => {
- val arr: Array[String] = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- // dataStream.setParallelism()
- //设置以事件时间为时间语意
- val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
- WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
- .withTimestampAssigner(
- new SerializableTimestampAssigner[SensorReading] {
- override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
- //指定事件时间的字段
- element.timestamp * 1000
- }
- }
- )
- )
- val lataTag = new OutputTag[SensorReading]("lastdata")
- val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
- .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
- // .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
- .allowedLateness(Time.minutes(1)) //最长延迟一分钟
- .sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
-
- /* val resultStream: DataStream[SensorReading] = windowStream.reduce(
- (curReduce, newReducw) => {
- SensorReading(curReduce.id, curReduce.timestamp, curReduce.temperature.min(newReducw.temperature))
- }
- )*/
- val resultStream: DataStream[SensorReading] = windowStream.process(new MyEventProcessWindowFunction)
- resultStream.print("result:")
- resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
- env.execute("windowEventTime")
-
- }
- }
-
- class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,SensorReading,String,TimeWindow]{
- override def process(
- key: String,
- context: Context,
- elements: Iterable[SensorReading],
- out: Collector[SensorReading]): Unit = {
- val window: TimeWindow = context.window
- println(window.getStart,window.getEnd)
- val iterator: Iterator[SensorReading] = elements.iterator
- var temp = 100.0
- var timestam = 1
- out.collect(SensorReading(key,timestam,temp))
- // out.collect(SensorReading(key,1,0.0))
- }
- }
(2)
- package nj.zb.kb23.api.windows
-
- import java.time.Duration
-
- import nj.zb.kb23.source.SensorReading
- import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.flink.util.Collector
-
- object WindowEventTimeTest {
- def main(args: Array[String]): Unit = {
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val inputStream: DataStream[String] = env.socketTextStream("192.168.91.11", 7777)
- val dataStream: DataStream[SensorReading] = inputStream.map(data => {
- val arr: Array[String] = data.split(",")
- SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
- })
- // dataStream.setParallelism()
- //设置以时间时间为时间语意
- val dataStream2: DataStream[SensorReading] = dataStream.assignTimestampsAndWatermarks(
- WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds((3)))
- .withTimestampAssigner(
- new SerializableTimestampAssigner[SensorReading] {
- override def extractTimestamp(element: SensorReading, recordTimestamp: Long): Long = {
- //指定事件时间的字段
- element.timestamp * 1000
- }
- }
- )
- )
- val lataTag = new OutputTag[SensorReading]("lastdata")
- val windowStream: WindowedStream[SensorReading, String, TimeWindow] = dataStream2.keyBy(_.id)
- .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动开窗
- // .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(3)))//滑动开窗
- .allowedLateness(Time.minutes(1)) //最长延迟一分钟
- .sideOutputLateData(lataTag)//非常非常延迟的数据,放入侧输出流
-
- val resultStream: DataStream[(String, Double, Long, Long)] = windowStream.process(new MyEventProcessWindowFunction)
- resultStream.print("result:")
- resultStream.getSideOutput(lataTag).print("late value:")//.addSink()
- env.execute("windowEventTime")
-
- }
- }
- class MyEventProcessWindowFunction extends ProcessWindowFunction[SensorReading,(String,Double,Long,Long),String,TimeWindow]{
- override def process(
- key: String,
- context: Context,
- elements: Iterable[SensorReading],
- out: Collector[(String, Double, Long, Long)]): Unit = {
- val window: TimeWindow = context.window
- val iterator: Iterator[SensorReading] = elements.iterator
- var temp = 100.0
- var timestam = 1
- while (iterator.hasNext){
- val sensorReading: SensorReading = iterator.next()
- temp = temp.min(sensorReading.temperature)
- }
- out.collect((key,temp,window.getStart,window.getEnd))
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。