当前位置:   article > 正文

08_Flink Streaming window_.flink.streaming.api.windowing.windows.timewindow>

.flink.streaming.api.windowing.windows.timewindow>) in windowedstream cannot

flink提供时间和事件的滑动和跳动的四种窗口。来看下滑动时间窗口的实现。

flink中支持三种时间语义,

1:系统时间,也就是operator在处理数据的时候,当时机器上的时间,性能最高,ProcessingTime。

2:采集时间,也就是flink第一次采集到数据的时间,数据一进来的时候就指定好了。性能一般,还需要把source的时间往下传,IngestionTime

3:事件时间:也就是说,为每个记录,指定一个时间的抽取逻辑。可以实现真正的业务时间。性能代价也最大。EventTime

trigger用来描述复杂情况特定的触发行为。提供几种行为结果。TriggerResult的四种结果。

通过datastream对象来设置3不同的语义。AssignerWithPeriodicWatermarks注入这个对象。指定如何为每一个事件,指定时间。以及如何使用这个时间。

  1. /**
  2. * Windows this {@code DataStream} into sliding time windows.
  3. *
  4. * <p>
  5. * This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
  6. * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
  7. * set using
  8. * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
  9. *
  10. * <p>
  11. * Note: This operation can be inherently non-parallel since all elements have to pass through
  12. * the same operator instance. (Only for special cases, such as aligned time windows is
  13. * it possible to perform this operation in parallel).
  14. *
  15. * @param size The size of the window.
  16. */
  17. public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
  18. if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
  19. return windowAll(SlidingProcessingTimeWindows.of(size, slide));
  20. } else {
  21. return windowAll(SlidingEventTimeWindows.of(size, slide));
  22. }
  23. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. /**
  21. * The time characteristic defines how the system determines time for time-dependent
  22. * order and operations that depend on time (such as time windows).
  23. */
  24. @PublicEvolving
  25. public enum TimeCharacteristic {
  26. /**
  27. * Processing time for operators means that the operator uses the system clock of the machine
  28. * to determine the current time of the data stream. Processing-time windows trigger based
  29. * on wall-clock time and include whatever elements happen to have arrived at the operator at
  30. * that point in time.
  31. * <p>
  32. * Using processing time for window operations results in general in quite non-deterministic results,
  33. * because the contents of the windows depends on the speed in which elements arrive. It is, however,
  34. * the cheapest method of forming windows and the method that introduces the least latency.
  35. */
  36. ProcessingTime,
  37. /**
  38. * Ingestion time means that the time of each individual element in the stream is determined
  39. * when the element enters the Flink streaming data flow. Operations like windows group the
  40. * elements based on that time, meaning that processing speed within the streaming dataflow
  41. * does not affect windowing, but only the speed at which sources receive elements.
  42. * <p>
  43. * Ingestion time is often a good compromise between processing time and event time.
  44. * It does not need and special manual form of watermark generation, and events are typically
  45. * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
  46. * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
  47. * are not very much out-of-order means that the latency increase is moderate, compared to event
  48. * time.
  49. */
  50. IngestionTime,
  51. /**
  52. * Event time means that the time of each individual element in the stream (also called event)
  53. * is determined by the event's individual custom timestamp. These timestamps either exist in the
  54. * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
  55. * The big implication of this is that it allows for elements to arrive in the sources and in
  56. * all operators out of order, meaning that elements with earlier timestamps may arrive after
  57. * elements with later timestamps.
  58. * <p>
  59. * Operators that window or order data with respect to event time must buffer data until they can
  60. * be sure that all timestamps for a certain time interval have been received. This is handled by
  61. * the so called "time watermarks".
  62. * <p>
  63. * Operations based on event time are very predictable - the result of windowing operations
  64. * is typically identical no matter when the window is executed and how fast the streams operate.
  65. * At the same time, the buffering and tracking of event time is also costlier than operating
  66. * with processing time, and typically also introduces more latency. The amount of extra
  67. * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
  68. * between the arrival of early and late elements is. With respect to the "time watermarks", this
  69. * means that the cost typically depends on how early or late the watermarks can be generated
  70. * for their timestamp.
  71. * <p>
  72. * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
  73. * original time, rather than the time assigned at the data source. Practically, that means that
  74. * event time has generally more meaning, but also that it takes longer to determine that all
  75. * elements for a certain time have arrived.
  76. */
  77. EventTime
  78. }


  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package org.apache.flink.streaming.api.operators;
  18. import org.apache.flink.annotation.Internal;
  19. import org.apache.flink.streaming.api.TimeCharacteristic;
  20. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  21. import org.apache.flink.streaming.api.watermark.Watermark;
  22. import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  23. import java.util.concurrent.Executors;
  24. import java.util.concurrent.ScheduledExecutorService;
  25. import java.util.concurrent.ScheduledFuture;
  26. import java.util.concurrent.TimeUnit;
  27. /**
  28. * {@link StreamOperator} for streaming sources.
  29. *
  30. * @param <OUT> Type of the output elements
  31. * @param <SRC> Type of the source function of this stream source operator
  32. */
  33. @Internal
  34. public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
  35. extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
  36. private static final long serialVersionUID = 1L;
  37. private transient SourceFunction.SourceContext<OUT> ctx;
  38. private transient volatile boolean canceledOrStopped = false;
  39. public StreamSource(SRC sourceFunction) {
  40. super(sourceFunction);
  41. this.chainingStrategy = ChainingStrategy.HEAD;
  42. }
  43. public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
  44. final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
  45. final SourceFunction.SourceContext<OUT> ctx;
  46. switch (timeCharacteristic) {
  47. case EventTime:
  48. ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
  49. break;
  50. case IngestionTime:
  51. ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
  52. getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
  53. break;
  54. case ProcessingTime:
  55. ctx = new NonTimestampContext<>(this, lockingObject, collector);
  56. break;
  57. default:
  58. throw new Exception(String.valueOf(timeCharacteristic));
  59. }
  60. // copy to a field to give the 'cancel()' method access
  61. this.ctx = ctx;
  62. try {
  63. userFunction.run(ctx);
  64. // if we get here, then the user function either exited after being done (finite source)
  65. // or the function was canceled or stopped. For the finite source case, we should emit
  66. // a final watermark that indicates that we reached the end of event-time
  67. if (!isCanceledOrStopped()) {
  68. ctx.emitWatermark(Watermark.MAX_WATERMARK);
  69. }
  70. } finally {
  71. // make sure that the context is closed in any case
  72. ctx.close();
  73. }
  74. }
  75. public void cancel() {
  76. // important: marking the source as stopped has to happen before the function is stopped.
  77. // the flag that tracks this status is volatile, so the memory model also guarantees
  78. // the happens-before relationship
  79. markCanceledOrStopped();
  80. userFunction.cancel();
  81. // the context may not be initialized if the source was never running.
  82. if (ctx != null) {
  83. ctx.close();
  84. }
  85. }
  86. /**
  87. * Marks this source as canceled or stopped.
  88. *
  89. * <p>This indicates that any exit of the {@link #run(Object, Output)} method
  90. * cannot be interpreted as the result of a finite source.
  91. */
  92. protected void markCanceledOrStopped() {
  93. this.canceledOrStopped = true;
  94. }
  95. /**
  96. * Checks whether the source has been canceled or stopped.
  97. * @return True, if the source is canceled or stopped, false is not.
  98. */
  99. protected boolean isCanceledOrStopped() {
  100. return canceledOrStopped;
  101. }
  102. /**
  103. * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
  104. * has caused an exception. If one of these threads caused an exception, this method will
  105. * throw that exception.
  106. */
  107. void checkAsyncException() {
  108. getContainingTask().checkTimerException();
  109. }
  110. // ------------------------------------------------------------------------
  111. // Source contexts for various stream time characteristics
  112. // ------------------------------------------------------------------------
  113. /**
  114. * A source context that attached {@code -1} as a timestamp to all records, and that
  115. * does not forward watermarks.
  116. */
  117. public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
  118. private final StreamSource<?, ?> owner;
  119. private final Object lockingObject;
  120. private final Output<StreamRecord<T>> output;
  121. private final StreamRecord<T> reuse;
  122. public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
  123. this.owner = owner;
  124. this.lockingObject = lockingObject;
  125. this.output = output;
  126. this.reuse = new StreamRecord<T>(null);
  127. }
  128. @Override
  129. public void collect(T element) {
  130. owner.checkAsyncException();
  131. synchronized (lockingObject) {
  132. output.collect(reuse.replace(element));
  133. }
  134. }
  135. @Override
  136. public void collectWithTimestamp(T element, long timestamp) {
  137. // ignore the timestamp
  138. collect(element);
  139. }
  140. @Override
  141. public void emitWatermark(Watermark mark) {
  142. owner.checkAsyncException();
  143. // do nothing else
  144. }
  145. @Override
  146. public Object getCheckpointLock() {
  147. return lockingObject;
  148. }
  149. @Override
  150. public void close() {}
  151. }
  152. /**
  153. * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
  154. * and watermark emission.
  155. */
  156. public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
  157. private final StreamSource<?, ?> owner;
  158. private final Object lockingObject;
  159. private final Output<StreamRecord<T>> output;
  160. private final StreamRecord<T> reuse;
  161. private final ScheduledExecutorService scheduleExecutor;
  162. private final ScheduledFuture<?> watermarkTimer;
  163. private final long watermarkInterval;
  164. private volatile long nextWatermarkTime;
  165. public AutomaticWatermarkContext(
  166. final StreamSource<?, ?> owner,
  167. final Object lockingObjectParam,
  168. final Output<StreamRecord<T>> outputParam,
  169. final long watermarkInterval) {
  170. if (watermarkInterval < 1L) {
  171. throw new IllegalArgumentException("The watermark interval cannot be smaller than one.");
  172. }
  173. this.owner = owner;
  174. this.lockingObject = lockingObjectParam;
  175. this.output = outputParam;
  176. this.watermarkInterval = watermarkInterval;
  177. this.reuse = new StreamRecord<T>(null);
  178. this.scheduleExecutor = Executors.newScheduledThreadPool(1);
  179. this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
  180. @Override
  181. public void run() {
  182. final long currentTime = System.currentTimeMillis();
  183. if (currentTime > nextWatermarkTime) {
  184. // align the watermarks across all machines. this will ensure that we
  185. // don't have watermarks that creep along at different intervals because
  186. // the machine clocks are out of sync
  187. final long watermarkTime = currentTime - (currentTime % watermarkInterval);
  188. synchronized (lockingObjectParam) {
  189. if (currentTime > nextWatermarkTime) {
  190. outputParam.emitWatermark(new Watermark(watermarkTime));
  191. nextWatermarkTime += watermarkInterval;
  192. }
  193. }
  194. }
  195. }
  196. }, 0, watermarkInterval, TimeUnit.MILLISECONDS);
  197. }
  198. @Override
  199. public void collect(T element) {
  200. owner.checkAsyncException();
  201. synchronized (lockingObject) {
  202. final long currentTime = System.currentTimeMillis();
  203. output.collect(reuse.replace(element, currentTime));
  204. if (currentTime > nextWatermarkTime) {
  205. // in case we jumped some watermarks, recompute the next watermark time
  206. final long watermarkTime = currentTime - (currentTime % watermarkInterval);
  207. nextWatermarkTime = watermarkTime + watermarkInterval;
  208. output.emitWatermark(new Watermark(watermarkTime));
  209. }
  210. }
  211. }
  212. @Override
  213. public void collectWithTimestamp(T element, long timestamp) {
  214. collect(element);
  215. }
  216. @Override
  217. public void emitWatermark(Watermark mark) {
  218. owner.checkAsyncException();
  219. if (mark.getTimestamp() == Long.MAX_VALUE) {
  220. // allow it since this is the special end-watermark that for example the Kafka source emits
  221. synchronized (lockingObject) {
  222. nextWatermarkTime = Long.MAX_VALUE;
  223. output.emitWatermark(mark);
  224. }
  225. // we can shutdown the timer now, no watermarks will be needed any more
  226. watermarkTimer.cancel(true);
  227. scheduleExecutor.shutdownNow();
  228. }
  229. }
  230. @Override
  231. public Object getCheckpointLock() {
  232. return lockingObject;
  233. }
  234. @Override
  235. public void close() {
  236. watermarkTimer.cancel(true);
  237. scheduleExecutor.shutdownNow();
  238. }
  239. }
  240. /**
  241. * A SourceContext for event time. Sources may directly attach timestamps and generate
  242. * watermarks, but if records are emitted without timestamps, no timetamps are automatically
  243. * generated and attached. The records will simply have no timestamp in that case.
  244. *
  245. * Streaming topologies can use timestamp assigner functions to override the timestamps
  246. * assigned here.
  247. */
  248. public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
  249. private final StreamSource<?, ?> owner;
  250. private final Object lockingObject;
  251. private final Output<StreamRecord<T>> output;
  252. private final StreamRecord<T> reuse;
  253. public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
  254. this.owner = owner;
  255. this.lockingObject = lockingObject;
  256. this.output = output;
  257. this.reuse = new StreamRecord<T>(null);
  258. }
  259. @Override
  260. public void collect(T element) {
  261. owner.checkAsyncException();
  262. synchronized (lockingObject) {
  263. output.collect(reuse.replace(element));
  264. }
  265. }
  266. @Override
  267. public void collectWithTimestamp(T element, long timestamp) {
  268. owner.checkAsyncException();
  269. synchronized (lockingObject) {
  270. output.collect(reuse.replace(element, timestamp));
  271. }
  272. }
  273. @Override
  274. public void emitWatermark(Watermark mark) {
  275. owner.checkAsyncException();
  276. synchronized (lockingObject) {
  277. output.emitWatermark(mark);
  278. }
  279. }
  280. @Override
  281. public Object getCheckpointLock() {
  282. return lockingObject;
  283. }
  284. @Override
  285. public void close() {}
  286. }
  287. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.datastream;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.annotation.Public;
  21. import org.apache.flink.api.common.functions.FoldFunction;
  22. import org.apache.flink.api.common.functions.Function;
  23. import org.apache.flink.api.common.functions.ReduceFunction;
  24. import org.apache.flink.api.common.functions.RichFunction;
  25. import org.apache.flink.api.common.typeinfo.TypeInformation;
  26. import org.apache.flink.api.java.Utils;
  27. import org.apache.flink.api.java.typeutils.TypeExtractor;
  28. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  29. import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
  30. import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
  31. import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
  32. import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
  33. import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
  34. import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
  35. import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
  36. import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
  37. import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
  38. import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
  39. import org.apache.flink.streaming.api.windowing.evictors.Evictor;
  40. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  41. import org.apache.flink.streaming.api.windowing.windows.Window;
  42. import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
  43. import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
  44. import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
  45. import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
  46. import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
  47. /**
  48. * A {@code AllWindowedStream} represents a data stream where the stream of
  49. * elements is split into windows based on a
  50. * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
  51. * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  52. *
  53. * <p>
  54. * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
  55. * used to evict elements from the window after
  56. * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
  57. * When using an evictor window performance will degrade significantly, since
  58. * pre-aggregation of window results cannot be used.
  59. *
  60. * <p>
  61. * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
  62. * the {@code AllWindowedStream} will be collapsed together with the
  63. * operation over the window into one single operation.
  64. *
  65. * @param <T> The type of elements in the stream.
  66. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
  67. */
  68. @Public
  69. public class AllWindowedStream<T, W extends Window> {
  70. /** The data stream that is windowed by this stream */
  71. private final DataStream<T> input;
  72. /** The window assigner */
  73. private final WindowAssigner<? super T, W> windowAssigner;
  74. /** The trigger that is used for window evaluation/emission. */
  75. private Trigger<? super T, ? super W> trigger;
  76. /** The evictor that is used for evicting elements before window evaluation. */
  77. private Evictor<? super T, ? super W> evictor;
  78. @PublicEvolving
  79. public AllWindowedStream(DataStream<T> input,
  80. WindowAssigner<? super T, W> windowAssigner) {
  81. this.input = input;
  82. this.windowAssigner = windowAssigner;
  83. this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
  84. }
  85. /**
  86. * Sets the {@code Trigger} that should be used to trigger window emission.
  87. */
  88. @PublicEvolving
  89. public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
  90. this.trigger = trigger;
  91. return this;
  92. }
  93. /**
  94. * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
  95. *
  96. * <p>
  97. * Note: When using an evictor window performance will degrade significantly, since
  98. * pre-aggregation of window results cannot be used.
  99. */
  100. @PublicEvolving
  101. public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
  102. this.evictor = evictor;
  103. return this;
  104. }
  105. // ------------------------------------------------------------------------
  106. // Operations on the keyed windows
  107. // ------------------------------------------------------------------------
  108. /**
  109. * Applies a reduce function to the window. The window function is called for each evaluation
  110. * of the window for each key individually. The output of the reduce function is interpreted
  111. * as a regular non-windowed stream.
  112. * <p>
  113. * This window will try and pre-aggregate data as much as the window policies permit. For example,
  114. * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
  115. * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
  116. * so a few elements are stored per key (one per slide interval).
  117. * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
  118. * aggregation tree.
  119. *
  120. * @param function The reduce function.
  121. * @return The data stream that is the result of applying the reduce function to the window.
  122. */
  123. public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
  124. if (function instanceof RichFunction) {
  125. throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
  126. "Please use apply(ReduceFunction, WindowFunction) instead.");
  127. }
  128. //clean the closure
  129. function = input.getExecutionEnvironment().clean(function);
  130. String callLocation = Utils.getCallLocationName();
  131. String udfName = "AllWindowedStream." + callLocation;
  132. SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
  133. if (result != null) {
  134. return result;
  135. }
  136. String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
  137. OneInputStreamOperator<T, T> operator;
  138. if (evictor != null) {
  139. operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
  140. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  141. new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  142. new ReduceIterableAllWindowFunction<W, T>(function),
  143. trigger,
  144. evictor);
  145. } else {
  146. operator = new NonKeyedWindowOperator<>(windowAssigner,
  147. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  148. new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  149. new ReduceIterableAllWindowFunction<W, T>(function),
  150. trigger);
  151. }
  152. return input.transform(opName, input.getType(), operator).setParallelism(1);
  153. }
  154. /**
  155. * Applies the given fold function to each window. The window function is called for each
  156. * evaluation of the window for each key individually. The output of the reduce function is
  157. * interpreted as a regular non-windowed stream.
  158. *
  159. * @param function The fold function.
  160. * @return The data stream that is the result of applying the fold function to the window.
  161. */
  162. public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
  163. if (function instanceof RichFunction) {
  164. throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
  165. "Please use apply(FoldFunction, WindowFunction) instead.");
  166. }
  167. TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
  168. Utils.getCallLocationName(), true);
  169. return fold(initialValue, function, resultType);
  170. }
  171. /**
  172. * Applies the given fold function to each window. The window function is called for each
  173. * evaluation of the window for each key individually. The output of the reduce function is
  174. * interpreted as a regular non-windowed stream.
  175. *
  176. * @param function The fold function.
  177. * @return The data stream that is the result of applying the fold function to the window.
  178. */
  179. public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
  180. if (function instanceof RichFunction) {
  181. throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
  182. "Please use apply(FoldFunction, WindowFunction) instead.");
  183. }
  184. return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);
  185. }
  186. /**
  187. * Applies a window function to the window. The window function is called for each evaluation
  188. * of the window for each key individually. The output of the window function is interpreted
  189. * as a regular non-windowed stream.
  190. * <p>
  191. * Not that this function requires that all data in the windows is buffered until the window
  192. * is evaluated, as the function provides no means of pre-aggregation.
  193. *
  194. * @param function The window function.
  195. * @return The data stream that is the result of applying the window function to the window.
  196. */
  197. public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
  198. @SuppressWarnings("unchecked, rawtypes")
  199. TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
  200. function, AllWindowFunction.class, true, true, getInputType(), null, false);
  201. return apply(function, resultType);
  202. }
  203. /**
  204. * Applies the given window function to each window. The window function is called for each evaluation
  205. * of the window for each key individually. The output of the window function is interpreted
  206. * as a regular non-windowed stream.
  207. * <p>
  208. * Not that this function requires that all data in the windows is buffered until the window
  209. * is evaluated, as the function provides no means of pre-aggregation.
  210. *
  211. * @param function The window function.
  212. * @return The data stream that is the result of applying the window function to the window.
  213. */
  214. public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
  215. //clean the closure
  216. function = input.getExecutionEnvironment().clean(function);
  217. String callLocation = Utils.getCallLocationName();
  218. String udfName = "AllWindowedStream." + callLocation;
  219. SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
  220. if (result != null) {
  221. return result;
  222. }
  223. String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
  224. NonKeyedWindowOperator<T, T, R, W> operator;
  225. if (evictor != null) {
  226. operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
  227. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  228. new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  229. function,
  230. trigger,
  231. evictor);
  232. } else {
  233. operator = new NonKeyedWindowOperator<>(windowAssigner,
  234. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  235. new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  236. function,
  237. trigger);
  238. }
  239. return input.transform(opName, resultType, operator).setParallelism(1);
  240. }
  241. /**
  242. * Applies the given window function to each window. The window function is called for each
  243. * evaluation of the window for each key individually. The output of the window function is
  244. * interpreted as a regular non-windowed stream.
  245. *
  246. * <p>
  247. * Arriving data is pre-aggregated using the given pre-aggregation reducer.
  248. *
  249. * @param preAggregator The reduce function that is used for pre-aggregation
  250. * @param function The window function.
  251. * @return The data stream that is the result of applying the window function to the window.
  252. */
  253. public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
  254. TypeInformation<T> inType = input.getType();
  255. TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
  256. function, AllWindowFunction.class, true, true, inType, null, false);
  257. return apply(preAggregator, function, resultType);
  258. }
  259. /**
  260. * Applies the given window function to each window. The window function is called for each
  261. * evaluation of the window for each key individually. The output of the window function is
  262. * interpreted as a regular non-windowed stream.
  263. *
  264. * <p>
  265. * Arriving data is pre-aggregated using the given pre-aggregation reducer.
  266. *
  267. * @param preAggregator The reduce function that is used for pre-aggregation
  268. * @param function The window function.
  269. * @param resultType Type information for the result type of the window function
  270. * @return The data stream that is the result of applying the window function to the window.
  271. */
  272. public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
  273. if (preAggregator instanceof RichFunction) {
  274. throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");
  275. }
  276. //clean the closures
  277. function = input.getExecutionEnvironment().clean(function);
  278. preAggregator = input.getExecutionEnvironment().clean(preAggregator);
  279. String callLocation = Utils.getCallLocationName();
  280. String udfName = "AllWindowedStream." + callLocation;
  281. String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
  282. OneInputStreamOperator<T, R> operator;
  283. if (evictor != null) {
  284. operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
  285. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  286. new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  287. new ReduceApplyAllWindowFunction<>(preAggregator, function),
  288. trigger,
  289. evictor);
  290. } else {
  291. operator = new NonKeyedWindowOperator<>(windowAssigner,
  292. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  293. new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  294. function,
  295. trigger);
  296. }
  297. return input.transform(opName, resultType, operator).setParallelism(1);
  298. }
  299. /**
  300. * Applies the given window function to each window. The window function is called for each
  301. * evaluation of the window for each key individually. The output of the window function is
  302. * interpreted as a regular non-windowed stream.
  303. *
  304. * <p>
  305. * Arriving data is incrementally aggregated using the given fold function.
  306. *
  307. * @param initialValue The initial value of the fold.
  308. * @param foldFunction The fold function that is used for incremental aggregation.
  309. * @param function The window function.
  310. * @return The data stream that is the result of applying the window function to the window.
  311. */
  312. public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
  313. TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
  314. Utils.getCallLocationName(), true);
  315. return apply(initialValue, foldFunction, function, resultType);
  316. }
  317. /**
  318. * Applies the given window function to each window. The window function is called for each
  319. * evaluation of the window for each key individually. The output of the window function is
  320. * interpreted as a regular non-windowed stream.
  321. *
  322. * <p>
  323. * Arriving data is incrementally aggregated using the given fold function.
  324. *
  325. * @param initialValue The initial value of the fold.
  326. * @param foldFunction The fold function that is used for incremental aggregation.
  327. * @param function The window function.
  328. * @param resultType Type information for the result type of the window function
  329. * @return The data stream that is the result of applying the window function to the window.
  330. */
  331. public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {
  332. if (foldFunction instanceof RichFunction) {
  333. throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
  334. }
  335. //clean the closures
  336. function = input.getExecutionEnvironment().clean(function);
  337. foldFunction = input.getExecutionEnvironment().clean(foldFunction);
  338. String callLocation = Utils.getCallLocationName();
  339. String udfName = "AllWindowedStream." + callLocation;
  340. String opName;
  341. OneInputStreamOperator<T, R> operator;
  342. if (evictor != null) {
  343. opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")";
  344. operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
  345. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  346. new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
  347. new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
  348. trigger,
  349. evictor);
  350. } else {
  351. opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
  352. operator = new NonKeyedWindowOperator<>(windowAssigner,
  353. windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
  354. new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
  355. function,
  356. trigger);
  357. }
  358. return input.transform(opName, resultType, operator).setParallelism(1);
  359. }
  360. // ------------------------------------------------------------------------
  361. // Aggregations on the windows
  362. // ------------------------------------------------------------------------
  363. /**
  364. * Applies an aggregation that sums every window of the data stream at the
  365. * given position.
  366. *
  367. * @param positionToSum The position in the tuple/array to sum
  368. * @return The transformed DataStream.
  369. */
  370. public SingleOutputStreamOperator<T> sum(int positionToSum) {
  371. return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
  372. }
  373. /**
  374. * Applies an aggregation that sums every window of the pojo data stream at
  375. * the given field for every window.
  376. *
  377. * <p>
  378. * A field expression is either
  379. * the name of a public field or a getter method with parentheses of the
  380. * stream's underlying type. A dot can be used to drill down into objects,
  381. * as in {@code "field1.getInnerField2()" }.
  382. *
  383. * @param field The field to sum
  384. * @return The transformed DataStream.
  385. */
  386. public SingleOutputStreamOperator<T> sum(String field) {
  387. return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
  388. }
  389. /**
  390. * Applies an aggregation that that gives the minimum value of every window
  391. * of the data stream at the given position.
  392. *
  393. * @param positionToMin The position to minimize
  394. * @return The transformed DataStream.
  395. */
  396. public SingleOutputStreamOperator<T> min(int positionToMin) {
  397. return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
  398. }
  399. /**
  400. * Applies an aggregation that that gives the minimum value of the pojo data
  401. * stream at the given field expression for every window.
  402. *
  403. * <p>
  404. * A field
  405. * expression is either the name of a public field or a getter method with
  406. * parentheses of the {@link DataStream}S underlying type. A dot can be used
  407. * to drill down into objects, as in {@code "field1.getInnerField2()" }.
  408. *
  409. * @param field The field expression based on which the aggregation will be applied.
  410. * @return The transformed DataStream.
  411. */
  412. public SingleOutputStreamOperator<T> min(String field) {
  413. return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
  414. }
  415. /**
  416. * Applies an aggregation that gives the minimum element of every window of
  417. * the data stream by the given position. If more elements have the same
  418. * minimum value the operator returns the first element by default.
  419. *
  420. * @param positionToMinBy
  421. * The position to minimize by
  422. * @return The transformed DataStream.
  423. */
  424. public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
  425. return this.minBy(positionToMinBy, true);
  426. }
  427. /**
  428. * Applies an aggregation that gives the minimum element of every window of
  429. * the data stream by the given position. If more elements have the same
  430. * minimum value the operator returns the first element by default.
  431. *
  432. * @param positionToMinBy The position to minimize by
  433. * @return The transformed DataStream.
  434. */
  435. public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
  436. return this.minBy(positionToMinBy, true);
  437. }
  438. /**
  439. * Applies an aggregation that gives the minimum element of every window of
  440. * the data stream by the given position. If more elements have the same
  441. * minimum value the operator returns either the first or last one depending
  442. * on the parameter setting.
  443. *
  444. * @param positionToMinBy The position to minimize
  445. * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
  446. * @return The transformed DataStream.
  447. */
  448. public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
  449. return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
  450. }
  451. /**
  452. * Applies an aggregation that that gives the minimum element of the pojo
  453. * data stream by the given field expression for every window. A field
  454. * expression is either the name of a public field or a getter method with
  455. * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
  456. * to drill down into objects, as in {@code "field1.getInnerField2()" }.
  457. *
  458. * @param field The field expression based on which the aggregation will be applied.
  459. * @param first If True then in case of field equality the first object will be returned
  460. * @return The transformed DataStream.
  461. */
  462. public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
  463. return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
  464. }
  465. /**
  466. * Applies an aggregation that gives the maximum value of every window of
  467. * the data stream at the given position.
  468. *
  469. * @param positionToMax The position to maximize
  470. * @return The transformed DataStream.
  471. */
  472. public SingleOutputStreamOperator<T> max(int positionToMax) {
  473. return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
  474. }
  475. /**
  476. * Applies an aggregation that that gives the maximum value of the pojo data
  477. * stream at the given field expression for every window. A field expression
  478. * is either the name of a public field or a getter method with parentheses
  479. * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
  480. * down into objects, as in {@code "field1.getInnerField2()" }.
  481. *
  482. * @param field The field expression based on which the aggregation will be applied.
  483. * @return The transformed DataStream.
  484. */
  485. public SingleOutputStreamOperator<T> max(String field) {
  486. return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
  487. }
  488. /**
  489. * Applies an aggregation that gives the maximum element of every window of
  490. * the data stream by the given position. If more elements have the same
  491. * maximum value the operator returns the first by default.
  492. *
  493. * @param positionToMaxBy
  494. * The position to maximize by
  495. * @return The transformed DataStream.
  496. */
  497. public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
  498. return this.maxBy(positionToMaxBy, true);
  499. }
  500. /**
  501. * Applies an aggregation that gives the maximum element of every window of
  502. * the data stream by the given position. If more elements have the same
  503. * maximum value the operator returns the first by default.
  504. *
  505. * @param positionToMaxBy
  506. * The position to maximize by
  507. * @return The transformed DataStream.
  508. */
  509. public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
  510. return this.maxBy(positionToMaxBy, true);
  511. }
  512. /**
  513. * Applies an aggregation that gives the maximum element of every window of
  514. * the data stream by the given position. If more elements have the same
  515. * maximum value the operator returns either the first or last one depending
  516. * on the parameter setting.
  517. *
  518. * @param positionToMaxBy The position to maximize by
  519. * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
  520. * @return The transformed DataStream.
  521. */
  522. public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
  523. return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
  524. }
  525. /**
  526. * Applies an aggregation that that gives the maximum element of the pojo
  527. * data stream by the given field expression for every window. A field
  528. * expression is either the name of a public field or a getter method with
  529. * parentheses of the {@link DataStream}S underlying type. A dot can be used
  530. * to drill down into objects, as in {@code "field1.getInnerField2()" }.
  531. *
  532. * @param field The field expression based on which the aggregation will be applied.
  533. * @param first If True then in case of field equality the first object will be returned
  534. * @return The transformed DataStream.
  535. */
  536. public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
  537. return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
  538. }
  539. private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
  540. return reduce(aggregator);
  541. }
  542. // ------------------------------------------------------------------------
  543. // Utilities
  544. // ------------------------------------------------------------------------
  545. private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
  546. Function function,
  547. TypeInformation<R> resultType,
  548. String functionName) {
  549. // TODO: add once non-parallel fast aligned time windows operator is ready
  550. return null;
  551. }
  552. public StreamExecutionEnvironment getExecutionEnvironment() {
  553. return input.getExecutionEnvironment();
  554. }
  555. public TypeInformation<T> getInputType() {
  556. return input.getType();
  557. }
  558. }


  1. /**
  2. * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
  3. * over a key grouped stream. Elements are put into windows by a
  4. * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
  5. * elements is done both by key and by window.
  6. *
  7. * <p>
  8. * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
  9. * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
  10. * that is used if a {@code Trigger} is not specified.
  11. *
  12. * <p>
  13. * Note: This operation can be inherently non-parallel since all elements have to pass through
  14. * the same operator instance. (Only for special cases, such as aligned time windows is
  15. * it possible to perform this operation in parallel).
  16. *
  17. * @param assigner The {@code WindowAssigner} that assigns elements to windows.
  18. * @return The trigger windows data stream.
  19. */
  20. @PublicEvolving
  21. public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
  22. return new AllWindowedStream<>(this, assigner);
  23. }



  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.assigners;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.api.common.ExecutionConfig;
  21. import org.apache.flink.api.common.typeutils.TypeSerializer;
  22. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  23. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  24. import org.apache.flink.streaming.api.windowing.windows.Window;
  25. import java.io.Serializable;
  26. import java.util.Collection;
  27. /**
  28. * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
  29. *
  30. * <p>
  31. * In a window operation, elements are grouped by their key (if available) and by the windows to
  32. * which it was assigned. The set of elements with the same key and window is called a pane.
  33. * When a {@link Trigger} decides that a certain pane should fire the
  34. * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
  35. * to produce output elements for that pane.
  36. *
  37. * @param <T> The type of elements that this WindowAssigner can assign windows to.
  38. * @param <W> The type of {@code Window} that this assigner assigns.
  39. */
  40. @PublicEvolving
  41. public abstract class WindowAssigner<T, W extends Window> implements Serializable {
  42. private static final long serialVersionUID = 1L;
  43. /**
  44. * Returns a {@code Collection} of windows that should be assigned to the element.
  45. *
  46. * @param element The element to which windows should be assigned.
  47. * @param timestamp The timestamp of the element.
  48. */
  49. public abstract Collection<W> assignWindows(T element, long timestamp);
  50. /**
  51. * Returns the default trigger associated with this {@code WindowAssigner}.
  52. */
  53. public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
  54. /**
  55. * Returns a {@link TypeSerializer} for serializing windows that are assigned by
  56. * this {@code WindowAssigner}.
  57. */
  58. public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
  59. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.triggers;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.api.common.state.State;
  21. import org.apache.flink.api.common.state.StateDescriptor;
  22. import org.apache.flink.api.common.state.ValueState;
  23. import org.apache.flink.api.common.typeinfo.TypeInformation;
  24. import org.apache.flink.streaming.api.windowing.windows.Window;
  25. import java.io.Serializable;
  26. /**
  27. * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
  28. * results for that part of the window.
  29. *
  30. * <p>
  31. * A pane is the bucket of elements that have the same key (assigned by the
  32. * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
  33. * be in multiple panes of it was assigned to multiple windows by the
  34. * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
  35. * have their own instance of the {@code Trigger}.
  36. *
  37. * <p>
  38. * Triggers must not maintain state internally since they can be re-created or reused for
  39. * different keys. All necessary state should be persisted using the state abstraction
  40. * available on the {@link TriggerContext}.
  41. *
  42. * @param <T> The type of elements on which this {@code Trigger} works.
  43. * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
  44. */
  45. @PublicEvolving
  46. public abstract class Trigger<T, W extends Window> implements Serializable {
  47. private static final long serialVersionUID = -4104633972991191369L;
  48. /**
  49. * Called for every element that gets added to a pane. The result of this will determine
  50. * whether the pane is evaluated to emit results.
  51. *
  52. * @param element The element that arrived.
  53. * @param timestamp The timestamp of the element that arrived.
  54. * @param window The window to which this pane belongs.
  55. * @param ctx A context object that can be used to register timer callbacks.
  56. */
  57. public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
  58. /**
  59. * Called when a processing-time timer that was set using the trigger context fires.
  60. *
  61. * @param time The timestamp at which the timer fired.
  62. * @param ctx A context object that can be used to register timer callbacks.
  63. */
  64. public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
  65. /**
  66. * Called when an event-time timer that was set using the trigger context fires.
  67. *
  68. * @param time The timestamp at which the timer fired.
  69. * @param ctx A context object that can be used to register timer callbacks.
  70. */
  71. public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
  72. /**
  73. * Clears any state that the trigger might still hold for the given window. This is called
  74. * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
  75. * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
  76. * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
  77. *
  78. * <p>By default, this method does nothing.
  79. */
  80. public void clear(W window, TriggerContext ctx) throws Exception {}
  81. // ------------------------------------------------------------------------
  82. /**
  83. * A context object that is given to {@link Trigger} methods to allow them to register timer
  84. * callbacks and deal with state.
  85. */
  86. public interface TriggerContext {
  87. /**
  88. * Returns the current watermark time.
  89. */
  90. long getCurrentWatermark();
  91. /**
  92. * Register a system time callback. When the current system time passes the specified
  93. * time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the time specified here.
  94. *
  95. * @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window, TriggerContext)}
  96. */
  97. void registerProcessingTimeTimer(long time);
  98. /**
  99. * Register an event-time callback. When the current watermark passes the specified
  100. * time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time specified here.
  101. *
  102. * @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window, TriggerContext)}
  103. * @see org.apache.flink.streaming.api.watermark.Watermark
  104. */
  105. void registerEventTimeTimer(long time);
  106. /**
  107. * Delete the processing time trigger for the given time.
  108. */
  109. void deleteProcessingTimeTimer(long time);
  110. /**
  111. * Delete the event-time trigger for the given time.
  112. */
  113. void deleteEventTimeTimer(long time);
  114. /**
  115. * Retrieves an {@link State} object that can be used to interact with
  116. * fault-tolerant state that is scoped to the window and key of the current
  117. * trigger invocation.
  118. *
  119. * @param stateDescriptor The StateDescriptor that contains the name and type of the
  120. * state that is being accessed.
  121. * @param <S> The type of the state.
  122. * @return The partitioned state object.
  123. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
  124. * function (function is not part os a KeyedStream).
  125. */
  126. <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
  127. /**
  128. * Retrieves a {@link ValueState} object that can be used to interact with
  129. * fault-tolerant state that is scoped to the window and key of the current
  130. * trigger invocation.
  131. *
  132. * @param name The name of the key/value state.
  133. * @param stateType The class of the type that is stored in the state. Used to generate
  134. * serializers for managed memory and checkpointing.
  135. * @param defaultState The default state value, returned when the state is accessed and
  136. * no value has yet been set for the key. May be null.
  137. *
  138. * @param <S> The type of the state.
  139. * @return The partitioned state object.
  140. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
  141. * function (function is not part os a KeyedStream).
  142. */
  143. @Deprecated
  144. <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
  145. /**
  146. * Retrieves a {@link ValueState} object that can be used to interact with
  147. * fault-tolerant state that is scoped to the window and key of the current
  148. * trigger invocation.
  149. *
  150. * @param name The name of the key/value state.
  151. * @param stateType The type information for the type that is stored in the state.
  152. * Used to create serializers for managed memory and checkpoints.
  153. * @param defaultState The default state value, returned when the state is accessed and
  154. * no value has yet been set for the key. May be null.
  155. *
  156. * @param <S> The type of the state.
  157. * @return The partitioned state object.
  158. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
  159. * function (function is not part os a KeyedStream).
  160. */
  161. @Deprecated
  162. <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
  163. }
  164. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.assigners;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.api.common.ExecutionConfig;
  21. import org.apache.flink.api.common.typeutils.TypeSerializer;
  22. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  23. import org.apache.flink.streaming.api.windowing.time.Time;
  24. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  25. import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
  26. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  27. import java.util.Collection;
  28. import java.util.Collections;
  29. /**
  30. * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
  31. * elements. Windows cannot overlap.
  32. *
  33. * <p>
  34. * For example, in order to window into windows of 1 minute:
  35. * <pre> {@code
  36. * DataStream<Tuple2<String, Integer>> in = ...;
  37. * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
  38. * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
  39. * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
  40. * } </pre>
  41. */
  42. @PublicEvolving
  43. public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
  44. private static final long serialVersionUID = 1L;
  45. private long size;
  46. protected TumblingEventTimeWindows(long size) {
  47. this.size = size;
  48. }
  49. @Override
  50. public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
  51. if (timestamp > Long.MIN_VALUE) {
  52. // Long.MIN_VALUE is currently assigned when no timestamp is present
  53. long start = timestamp - (timestamp % size);
  54. return Collections.singletonList(new TimeWindow(start, start + size));
  55. } else {
  56. throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
  57. "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
  58. "'DataStream.assignTimestampsAndWatermarks(...)'?");
  59. }
  60. }
  61. public long getSize() {
  62. return size;
  63. }
  64. @Override
  65. public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
  66. return EventTimeTrigger.create();
  67. }
  68. @Override
  69. public String toString() {
  70. return "TumblingEventTimeWindows(" + size + ")";
  71. }
  72. /**
  73. * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
  74. * elements to time windows based on the element timestamp.
  75. *
  76. * @param size The size of the generated windows.
  77. * @return The time policy.
  78. */
  79. public static TumblingEventTimeWindows of(Time size) {
  80. return new TumblingEventTimeWindows(size.toMilliseconds());
  81. }
  82. @Override
  83. public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
  84. return new TimeWindow.Serializer();
  85. }
  86. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.triggers;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  21. /**
  22. * A {@link Trigger} that fires once the watermark passes the end of the window
  23. * to which a pane belongs.
  24. *
  25. * @see org.apache.flink.streaming.api.watermark.Watermark
  26. */
  27. @PublicEvolving
  28. public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
  29. private static final long serialVersionUID = 1L;
  30. private EventTimeTrigger() {}
  31. @Override
  32. public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  33. ctx.registerEventTimeTimer(window.maxTimestamp());
  34. return TriggerResult.CONTINUE;
  35. }
  36. @Override
  37. public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
  38. return TriggerResult.FIRE_AND_PURGE;
  39. }
  40. @Override
  41. public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
  42. return TriggerResult.CONTINUE;
  43. }
  44. @Override
  45. public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
  46. ctx.deleteEventTimeTimer(window.maxTimestamp());
  47. }
  48. @Override
  49. public String toString() {
  50. return "EventTimeTrigger()";
  51. }
  52. /**
  53. * Creates an event-time trigger that fires once the watermark passes the end of the window.
  54. *
  55. * <p>
  56. * Once the trigger fires all elements are discarded. Elements that arrive late immediately
  57. * trigger window evaluation with just this one element.
  58. */
  59. public static EventTimeTrigger create() {
  60. return new EventTimeTrigger();
  61. }
  62. }

  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.windows;
  19. import org.apache.flink.annotation.PublicEvolving;
  20. import org.apache.flink.api.common.typeutils.TypeSerializer;
  21. import org.apache.flink.core.memory.DataInputView;
  22. import org.apache.flink.core.memory.DataOutputView;
  23. import java.io.IOException;
  24. /**
  25. * A {@link Window} that represents a time interval from {@code start} (inclusive) to
  26. * {@code start + size} (exclusive).
  27. */
  28. @PublicEvolving
  29. public class TimeWindow extends Window {
  30. private final long start;
  31. private final long end;
  32. public TimeWindow(long start, long end) {
  33. this.start = start;
  34. this.end = end;
  35. }
  36. public long getStart() {
  37. return start;
  38. }
  39. public long getEnd() {
  40. return end;
  41. }
  42. @Override
  43. public long maxTimestamp() {
  44. return end - 1;
  45. }
  46. @Override
  47. public boolean equals(Object o) {
  48. if (this == o) {
  49. return true;
  50. }
  51. if (o == null || getClass() != o.getClass()) {
  52. return false;
  53. }
  54. TimeWindow window = (TimeWindow) o;
  55. return end == window.end && start == window.start;
  56. }
  57. @Override
  58. public int hashCode() {
  59. int result = (int) (start ^ (start >>> 32));
  60. result = 31 * result + (int) (end ^ (end >>> 32));
  61. return result;
  62. }
  63. @Override
  64. public String toString() {
  65. return "TimeWindow{" +
  66. "start=" + start +
  67. ", end=" + end +
  68. '}';
  69. }
  70. public static class Serializer extends TypeSerializer<TimeWindow> {
  71. private static final long serialVersionUID = 1L;
  72. @Override
  73. public boolean isImmutableType() {
  74. return true;
  75. }
  76. @Override
  77. public TypeSerializer<TimeWindow> duplicate() {
  78. return this;
  79. }
  80. @Override
  81. public TimeWindow createInstance() {
  82. return null;
  83. }
  84. @Override
  85. public TimeWindow copy(TimeWindow from) {
  86. return from;
  87. }
  88. @Override
  89. public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
  90. return from;
  91. }
  92. @Override
  93. public int getLength() {
  94. return 0;
  95. }
  96. @Override
  97. public void serialize(TimeWindow record, DataOutputView target) throws IOException {
  98. target.writeLong(record.start);
  99. target.writeLong(record.end);
  100. }
  101. @Override
  102. public TimeWindow deserialize(DataInputView source) throws IOException {
  103. long start = source.readLong();
  104. long end = source.readLong();
  105. return new TimeWindow(start, end);
  106. }
  107. @Override
  108. public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
  109. long start = source.readLong();
  110. long end = source.readLong();
  111. return new TimeWindow(start, end);
  112. }
  113. @Override
  114. public void copy(DataInputView source, DataOutputView target) throws IOException {
  115. target.writeLong(source.readLong());
  116. target.writeLong(source.readLong());
  117. }
  118. @Override
  119. public boolean equals(Object obj) {
  120. return obj instanceof Serializer;
  121. }
  122. @Override
  123. public boolean canEqual(Object obj) {
  124. return obj instanceof Serializer;
  125. }
  126. @Override
  127. public int hashCode() {
  128. return 0;
  129. }
  130. }
  131. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.windowing.triggers;
  19. import org.apache.flink.streaming.api.windowing.windows.Window;
  20. /**
  21. * Result type for trigger methods. This determines what happens with the window,
  22. * for example whether the window function should be called, or the window
  23. * should be discarded.
  24. */
  25. public enum TriggerResult {
  26. /**
  27. * No action is taken on the window.
  28. */
  29. CONTINUE(false, false),
  30. /**
  31. * {@code FIRE_AND_PURGE} evaluates the window function and emits the window
  32. * result.
  33. */
  34. FIRE_AND_PURGE(true, true),
  35. /**
  36. * On {@code FIRE}, the window is evaluated and results are emitted.
  37. * The window is not purged, though, all elements are retained.
  38. */
  39. FIRE(true, false),
  40. /**
  41. * All elements in the window are cleared and the window is discarded,
  42. * without evaluating the window function or emitting any elements.
  43. */
  44. PURGE(false, true);
  45. // ------------------------------------------------------------------------
  46. private final boolean fire;
  47. private final boolean purge;
  48. TriggerResult(boolean fire, boolean purge) {
  49. this.purge = purge;
  50. this.fire = fire;
  51. }
  52. public boolean isFire() {
  53. return fire;
  54. }
  55. public boolean isPurge() {
  56. return purge;
  57. }
  58. // ------------------------------------------------------------------------
  59. /**
  60. * Merges two {@code TriggerResults}. This specifies what should happen if we have
  61. * two results from a Trigger, for example as a result from
  62. * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and
  63. * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
  64. *
  65. * <p>
  66. * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
  67. * then {@code FIRE} is the combined result;
  68. */
  69. public static TriggerResult merge(TriggerResult a, TriggerResult b) {
  70. if (a.purge || b.purge) {
  71. if (a.fire || b.fire) {
  72. return FIRE_AND_PURGE;
  73. } else {
  74. return PURGE;
  75. }
  76. } else if (a.fire || b.fire) {
  77. return FIRE;
  78. } else {
  79. return CONTINUE;
  80. }
  81. }
  82. }

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.flink.streaming.api.functions;
  19. import org.apache.flink.streaming.api.watermark.Watermark;
  20. /**
  21. * A timestamp assigner that assigns timestamps based on the machine's wall clock.
  22. *
  23. * <p>If this assigner is used after a stream source, it realizes "ingestion time" semantics.
  24. *
  25. * @param <T> The elements that get timestamps assigned.
  26. */
  27. public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
  28. private static final long serialVersionUID = -4072216356049069301L;
  29. private long maxTimestamp;
  30. @Override
  31. public long extractTimestamp(T element, long previousElementTimestamp) {
  32. // make sure timestamps are monotonously increasing, even when the system clock re-syncs
  33. final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
  34. maxTimestamp = now;
  35. return now;
  36. }
  37. @Override
  38. public Watermark getCurrentWatermark() {
  39. // make sure timestamps are monotonously increasing, even when the system clock re-syncs
  40. final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
  41. maxTimestamp = now;
  42. return new Watermark(now - 1);
  43. }
  44. }



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/810095
推荐阅读
相关标签
  

闽ICP备14008679号