赞
踩
flink提供时间和事件的滑动和跳动的四种窗口。来看下滑动时间窗口的实现。
flink中支持三种时间语义,
1:系统时间,也就是operator在处理数据的时候,当时机器上的时间,性能最高,ProcessingTime。
2:采集时间,也就是flink第一次采集到数据的时间,数据一进来的时候就指定好了。性能一般,还需要把source的时间往下传,IngestionTime
3:事件时间:也就是说,为每个记录,指定一个时间的抽取逻辑。可以实现真正的业务时间。性能代价也最大。EventTime
trigger用来描述复杂情况特定的触发行为。提供几种行为结果。TriggerResult的四种结果。
通过datastream对象来设置3不同的语义。AssignerWithPeriodicWatermarks注入这个对象。指定如何为每一个事件,指定时间。以及如何使用这个时间。
- /**
- * Windows this {@code DataStream} into sliding time windows.
- *
- * <p>
- * This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
- * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
- * set using
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
- *
- * <p>
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param size The size of the window.
- */
- public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
- if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
- return windowAll(SlidingProcessingTimeWindows.of(size, slide));
- } else {
- return windowAll(SlidingEventTimeWindows.of(size, slide));
- }
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api;
-
-
- import org.apache.flink.annotation.PublicEvolving;
-
- /**
- * The time characteristic defines how the system determines time for time-dependent
- * order and operations that depend on time (such as time windows).
- */
- @PublicEvolving
- public enum TimeCharacteristic {
-
- /**
- * Processing time for operators means that the operator uses the system clock of the machine
- * to determine the current time of the data stream. Processing-time windows trigger based
- * on wall-clock time and include whatever elements happen to have arrived at the operator at
- * that point in time.
- * <p>
- * Using processing time for window operations results in general in quite non-deterministic results,
- * because the contents of the windows depends on the speed in which elements arrive. It is, however,
- * the cheapest method of forming windows and the method that introduces the least latency.
- */
- ProcessingTime,
-
- /**
- * Ingestion time means that the time of each individual element in the stream is determined
- * when the element enters the Flink streaming data flow. Operations like windows group the
- * elements based on that time, meaning that processing speed within the streaming dataflow
- * does not affect windowing, but only the speed at which sources receive elements.
- * <p>
- * Ingestion time is often a good compromise between processing time and event time.
- * It does not need and special manual form of watermark generation, and events are typically
- * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
- * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
- * are not very much out-of-order means that the latency increase is moderate, compared to event
- * time.
- */
- IngestionTime,
-
- /**
- * Event time means that the time of each individual element in the stream (also called event)
- * is determined by the event's individual custom timestamp. These timestamps either exist in the
- * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
- * The big implication of this is that it allows for elements to arrive in the sources and in
- * all operators out of order, meaning that elements with earlier timestamps may arrive after
- * elements with later timestamps.
- * <p>
- * Operators that window or order data with respect to event time must buffer data until they can
- * be sure that all timestamps for a certain time interval have been received. This is handled by
- * the so called "time watermarks".
- * <p>
- * Operations based on event time are very predictable - the result of windowing operations
- * is typically identical no matter when the window is executed and how fast the streams operate.
- * At the same time, the buffering and tracking of event time is also costlier than operating
- * with processing time, and typically also introduces more latency. The amount of extra
- * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
- * between the arrival of early and late elements is. With respect to the "time watermarks", this
- * means that the cost typically depends on how early or late the watermarks can be generated
- * for their timestamp.
- * <p>
- * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
- * original time, rather than the time assigned at the data source. Practically, that means that
- * event time has generally more meaning, but also that it takes longer to determine that all
- * elements for a certain time have arrived.
- */
- EventTime
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.operators;
-
- import org.apache.flink.annotation.Internal;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
-
- /**
- * {@link StreamOperator} for streaming sources.
- *
- * @param <OUT> Type of the output elements
- * @param <SRC> Type of the source function of this stream source operator
- */
- @Internal
- public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
- extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private transient SourceFunction.SourceContext<OUT> ctx;
-
- private transient volatile boolean canceledOrStopped = false;
-
-
- public StreamSource(SRC sourceFunction) {
- super(sourceFunction);
-
- this.chainingStrategy = ChainingStrategy.HEAD;
- }
-
-
- public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
- final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
- final SourceFunction.SourceContext<OUT> ctx;
-
- switch (timeCharacteristic) {
- case EventTime:
- ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
- break;
- case IngestionTime:
- ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
- getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
- break;
- case ProcessingTime:
- ctx = new NonTimestampContext<>(this, lockingObject, collector);
- break;
- default:
- throw new Exception(String.valueOf(timeCharacteristic));
- }
-
- // copy to a field to give the 'cancel()' method access
- this.ctx = ctx;
-
- try {
- userFunction.run(ctx);
-
- // if we get here, then the user function either exited after being done (finite source)
- // or the function was canceled or stopped. For the finite source case, we should emit
- // a final watermark that indicates that we reached the end of event-time
- if (!isCanceledOrStopped()) {
- ctx.emitWatermark(Watermark.MAX_WATERMARK);
- }
- } finally {
- // make sure that the context is closed in any case
- ctx.close();
- }
- }
-
- public void cancel() {
- // important: marking the source as stopped has to happen before the function is stopped.
- // the flag that tracks this status is volatile, so the memory model also guarantees
- // the happens-before relationship
- markCanceledOrStopped();
- userFunction.cancel();
-
- // the context may not be initialized if the source was never running.
- if (ctx != null) {
- ctx.close();
- }
- }
-
- /**
- * Marks this source as canceled or stopped.
- *
- * <p>This indicates that any exit of the {@link #run(Object, Output)} method
- * cannot be interpreted as the result of a finite source.
- */
- protected void markCanceledOrStopped() {
- this.canceledOrStopped = true;
- }
-
- /**
- * Checks whether the source has been canceled or stopped.
- * @return True, if the source is canceled or stopped, false is not.
- */
- protected boolean isCanceledOrStopped() {
- return canceledOrStopped;
- }
-
- /**
- * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
- * has caused an exception. If one of these threads caused an exception, this method will
- * throw that exception.
- */
- void checkAsyncException() {
- getContainingTask().checkTimerException();
- }
-
- // ------------------------------------------------------------------------
- // Source contexts for various stream time characteristics
- // ------------------------------------------------------------------------
-
- /**
- * A source context that attached {@code -1} as a timestamp to all records, and that
- * does not forward watermarks.
- */
- public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
- this.owner = owner;
- this.lockingObject = lockingObject;
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
- synchronized (lockingObject) {
- output.collect(reuse.replace(element));
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- // ignore the timestamp
- collect(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
- // do nothing else
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
- * and watermark emission.
- */
- public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- private final ScheduledExecutorService scheduleExecutor;
- private final ScheduledFuture<?> watermarkTimer;
- private final long watermarkInterval;
-
- private volatile long nextWatermarkTime;
-
- public AutomaticWatermarkContext(
- final StreamSource<?, ?> owner,
- final Object lockingObjectParam,
- final Output<StreamRecord<T>> outputParam,
- final long watermarkInterval) {
-
- if (watermarkInterval < 1L) {
- throw new IllegalArgumentException("The watermark interval cannot be smaller than one.");
- }
-
- this.owner = owner;
- this.lockingObject = lockingObjectParam;
- this.output = outputParam;
- this.watermarkInterval = watermarkInterval;
- this.reuse = new StreamRecord<T>(null);
-
- this.scheduleExecutor = Executors.newScheduledThreadPool(1);
-
- this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- final long currentTime = System.currentTimeMillis();
-
- if (currentTime > nextWatermarkTime) {
- // align the watermarks across all machines. this will ensure that we
- // don't have watermarks that creep along at different intervals because
- // the machine clocks are out of sync
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-
- synchronized (lockingObjectParam) {
- if (currentTime > nextWatermarkTime) {
- outputParam.emitWatermark(new Watermark(watermarkTime));
- nextWatermarkTime += watermarkInterval;
- }
- }
- }
- }
- }, 0, watermarkInterval, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- final long currentTime = System.currentTimeMillis();
- output.collect(reuse.replace(element, currentTime));
-
- if (currentTime > nextWatermarkTime) {
- // in case we jumped some watermarks, recompute the next watermark time
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
- nextWatermarkTime = watermarkTime + watermarkInterval;
- output.emitWatermark(new Watermark(watermarkTime));
- }
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- collect(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
-
- if (mark.getTimestamp() == Long.MAX_VALUE) {
- // allow it since this is the special end-watermark that for example the Kafka source emits
- synchronized (lockingObject) {
- nextWatermarkTime = Long.MAX_VALUE;
- output.emitWatermark(mark);
- }
-
- // we can shutdown the timer now, no watermarks will be needed any more
- watermarkTimer.cancel(true);
- scheduleExecutor.shutdownNow();
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {
- watermarkTimer.cancel(true);
- scheduleExecutor.shutdownNow();
- }
- }
-
- /**
- * A SourceContext for event time. Sources may directly attach timestamps and generate
- * watermarks, but if records are emitted without timestamps, no timetamps are automatically
- * generated and attached. The records will simply have no timestamp in that case.
- *
- * Streaming topologies can use timestamp assigner functions to override the timestamps
- * assigned here.
- */
- public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
- this.owner = owner;
- this.lockingObject = lockingObject;
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.collect(reuse.replace(element));
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.collect(reuse.replace(element, timestamp));
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.emitWatermark(mark);
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.datastream;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.annotation.Public;
- import org.apache.flink.api.common.functions.FoldFunction;
- import org.apache.flink.api.common.functions.Function;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.common.functions.RichFunction;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.Utils;
- import org.apache.flink.api.java.typeutils.TypeExtractor;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
- import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
- import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
- import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
- import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
- import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
- import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
- import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
- import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
- import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
- import org.apache.flink.streaming.api.windowing.evictors.Evictor;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.windows.Window;
- import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
- import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
- import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
- import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
- import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
-
- /**
- * A {@code AllWindowedStream} represents a data stream where the stream of
- * elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
- * the {@code AllWindowedStream} will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
- @Public
- public class AllWindowedStream<T, W extends Window> {
-
- /** The data stream that is windowed by this stream */
- private final DataStream<T> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- @PublicEvolving
- public AllWindowedStream(DataStream<T> input,
- WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- @PublicEvolving
- public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- @PublicEvolving
- public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
- if (function instanceof RichFunction) {
- throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
- "Please use apply(ReduceFunction, WindowFunction) instead.");
- }
-
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
-
- SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, T> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceIterableAllWindowFunction<W, T>(function),
- trigger,
- evictor);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceIterableAllWindowFunction<W, T>(function),
- trigger);
- }
-
- return input.transform(opName, input.getType(), operator).setParallelism(1);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
- if (function instanceof RichFunction) {
- throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
- "Please use apply(FoldFunction, WindowFunction) instead.");
- }
-
- TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
- Utils.getCallLocationName(), true);
-
- return fold(initialValue, function, resultType);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
- if (function instanceof RichFunction) {
- throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
- "Please use apply(FoldFunction, WindowFunction) instead.");
- }
-
- return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);
- }
-
- /**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
- @SuppressWarnings("unchecked, rawtypes")
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, getInputType(), null, false);
-
- return apply(function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
-
- SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- NonKeyedWindowOperator<T, T, R, W> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger,
- evictor);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger);
- }
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
-
- public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, inType, null, false);
-
- return apply(preAggregator, function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @param resultType Type information for the result type of the window function
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
- if (preAggregator instanceof RichFunction) {
- throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");
- }
-
- //clean the closures
- function = input.getExecutionEnvironment().clean(function);
- preAggregator = input.getExecutionEnvironment().clean(preAggregator);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, R> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceApplyAllWindowFunction<>(preAggregator, function),
- trigger,
- evictor);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger);
- }
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is incrementally aggregated using the given fold function.
- *
- * @param initialValue The initial value of the fold.
- * @param foldFunction The fold function that is used for incremental aggregation.
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
- TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
- Utils.getCallLocationName(), true);
-
- return apply(initialValue, foldFunction, function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is incrementally aggregated using the given fold function.
- *
- * @param initialValue The initial value of the fold.
- * @param foldFunction The fold function that is used for incremental aggregation.
- * @param function The window function.
- * @param resultType Type information for the result type of the window function
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {
- if (foldFunction instanceof RichFunction) {
- throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
- }
-
- //clean the closures
- function = input.getExecutionEnvironment().clean(function);
- foldFunction = input.getExecutionEnvironment().clean(foldFunction);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
-
- String opName;
-
- OneInputStreamOperator<T, R> operator;
-
- if (evictor != null) {
- opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")";
-
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
- trigger,
- evictor);
-
- } else {
- opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger);
- }
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- // ------------------------------------------------------------------------
- // Aggregations on the windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies an aggregation that sums every window of the data stream at the
- * given position.
- *
- * @param positionToSum The position in the tuple/array to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> sum(int positionToSum) {
- return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that sums every window of the pojo data stream at
- * the given field for every window.
- *
- * <p>
- * A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * stream's underlying type. A dot can be used to drill down into objects,
- * as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> sum(String field) {
- return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of every window
- * of the data stream at the given position.
- *
- * @param positionToMin The position to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> min(int positionToMin) {
- return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of the pojo data
- * stream at the given field expression for every window.
- *
- * <p>
- * A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> min(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy
- * The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMinBy The position to minimize
- * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum value of every window of
- * the data stream at the given position.
- *
- * @param positionToMax The position to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> max(int positionToMax) {
- return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum value of the pojo data
- * stream at the given field expression for every window. A field expression
- * is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
- * down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> max(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMaxBy The position to maximize by
- * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
- return reduce(aggregator);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
-
- private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- // TODO: add once non-parallel fast aligned time windows operator is ready
- return null;
- }
-
- public StreamExecutionEnvironment getExecutionEnvironment() {
- return input.getExecutionEnvironment();
- }
-
- public TypeInformation<T> getInputType() {
- return input.getType();
- }
- }
- /**
- * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
- * over a key grouped stream. Elements are put into windows by a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
- * elements is done both by key and by window.
- *
- * <p>
- * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
- * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
- * that is used if a {@code Trigger} is not specified.
- *
- * <p>
- * Note: This operation can be inherently non-parallel since all elements have to pass through
- * the same operator instance. (Only for special cases, such as aligned time windows is
- * it possible to perform this operation in parallel).
- *
- * @param assigner The {@code WindowAssigner} that assigns elements to windows.
- * @return The trigger windows data stream.
- */
- @PublicEvolving
- public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
- return new AllWindowedStream<>(this, assigner);
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.windowing.assigners;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.ExecutionConfig;
- import org.apache.flink.api.common.typeutils.TypeSerializer;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.windows.Window;
- import java.io.Serializable;
-
- import java.util.Collection;
-
- /**
- * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
- *
- * <p>
- * In a window operation, elements are grouped by their key (if available) and by the windows to
- * which it was assigned. The set of elements with the same key and window is called a pane.
- * When a {@link Trigger} decides that a certain pane should fire the
- * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
- * to produce output elements for that pane.
- *
- * @param <T> The type of elements that this WindowAssigner can assign windows to.
- * @param <W> The type of {@code Window} that this assigner assigns.
- */
- @PublicEvolving
- public abstract class WindowAssigner<T, W extends Window> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Returns a {@code Collection} of windows that should be assigned to the element.
- *
- * @param element The element to which windows should be assigned.
- * @param timestamp The timestamp of the element.
- */
- public abstract Collection<W> assignWindows(T element, long timestamp);
-
- /**
- * Returns the default trigger associated with this {@code WindowAssigner}.
- */
- public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
-
- /**
- * Returns a {@link TypeSerializer} for serializing windows that are assigned by
- * this {@code WindowAssigner}.
- */
- public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.windowing.triggers;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.state.State;
- import org.apache.flink.api.common.state.StateDescriptor;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.streaming.api.windowing.windows.Window;
-
- import java.io.Serializable;
-
- /**
- * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
- * results for that part of the window.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Trigger}.
- *
- * <p>
- * Triggers must not maintain state internally since they can be re-created or reused for
- * different keys. All necessary state should be persisted using the state abstraction
- * available on the {@link TriggerContext}.
- *
- * @param <T> The type of elements on which this {@code Trigger} works.
- * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
- */
- @PublicEvolving
- public abstract class Trigger<T, W extends Window> implements Serializable {
-
- private static final long serialVersionUID = -4104633972991191369L;
-
- /**
- * Called for every element that gets added to a pane. The result of this will determine
- * whether the pane is evaluated to emit results.
- *
- * @param element The element that arrived.
- * @param timestamp The timestamp of the element that arrived.
- * @param window The window to which this pane belongs.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
-
- /**
- * Called when a processing-time timer that was set using the trigger context fires.
- *
- * @param time The timestamp at which the timer fired.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
-
- /**
- * Called when an event-time timer that was set using the trigger context fires.
- *
- * @param time The timestamp at which the timer fired.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
-
- /**
- * Clears any state that the trigger might still hold for the given window. This is called
- * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
- * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
- * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
- *
- * <p>By default, this method does nothing.
- */
- public void clear(W window, TriggerContext ctx) throws Exception {}
-
- // ------------------------------------------------------------------------
-
- /**
- * A context object that is given to {@link Trigger} methods to allow them to register timer
- * callbacks and deal with state.
- */
- public interface TriggerContext {
-
- /**
- * Returns the current watermark time.
- */
- long getCurrentWatermark();
-
- /**
- * Register a system time callback. When the current system time passes the specified
- * time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the time specified here.
- *
- * @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window, TriggerContext)}
- */
- void registerProcessingTimeTimer(long time);
-
- /**
- * Register an event-time callback. When the current watermark passes the specified
- * time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time specified here.
- *
- * @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window, TriggerContext)}
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- void registerEventTimeTimer(long time);
-
- /**
- * Delete the processing time trigger for the given time.
- */
- void deleteProcessingTimeTimer(long time);
-
- /**
- * Delete the event-time trigger for the given time.
- */
- void deleteEventTimeTimer(long time);
-
- /**
- * Retrieves an {@link State} object that can be used to interact with
- * fault-tolerant state that is scoped to the window and key of the current
- * trigger invocation.
- *
- * @param stateDescriptor The StateDescriptor that contains the name and type of the
- * state that is being accessed.
- * @param <S> The type of the state.
- * @return The partitioned state object.
- * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
- * function (function is not part os a KeyedStream).
- */
- <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
-
- /**
- * Retrieves a {@link ValueState} object that can be used to interact with
- * fault-tolerant state that is scoped to the window and key of the current
- * trigger invocation.
- *
- * @param name The name of the key/value state.
- * @param stateType The class of the type that is stored in the state. Used to generate
- * serializers for managed memory and checkpointing.
- * @param defaultState The default state value, returned when the state is accessed and
- * no value has yet been set for the key. May be null.
- *
- * @param <S> The type of the state.
- * @return The partitioned state object.
- * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
- * function (function is not part os a KeyedStream).
- */
- @Deprecated
- <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
-
-
- /**
- * Retrieves a {@link ValueState} object that can be used to interact with
- * fault-tolerant state that is scoped to the window and key of the current
- * trigger invocation.
- *
- * @param name The name of the key/value state.
- * @param stateType The type information for the type that is stored in the state.
- * Used to create serializers for managed memory and checkpoints.
- * @param defaultState The default state value, returned when the state is accessed and
- * no value has yet been set for the key. May be null.
- *
- * @param <S> The type of the state.
- * @return The partitioned state object.
- * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
- * function (function is not part os a KeyedStream).
- */
- @Deprecated
- <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
- }
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.windowing.assigners;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.ExecutionConfig;
- import org.apache.flink.api.common.typeutils.TypeSerializer;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- import java.util.Collection;
- import java.util.Collections;
-
- /**
- * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
- * elements. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- * keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
- * } </pre>
- */
- @PublicEvolving
- public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private long size;
-
- protected TumblingEventTimeWindows(long size) {
- this.size = size;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- if (timestamp > Long.MIN_VALUE) {
- // Long.MIN_VALUE is currently assigned when no timestamp is present
- long start = timestamp - (timestamp % size);
- return Collections.singletonList(new TimeWindow(start, start + size));
- } else {
- throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
- "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
- "'DataStream.assignTimestampsAndWatermarks(...)'?");
- }
- }
-
- public long getSize() {
- return size;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- return EventTimeTrigger.create();
- }
-
- @Override
- public String toString() {
- return "TumblingEventTimeWindows(" + size + ")";
- }
-
- /**
- * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
- * elements to time windows based on the element timestamp.
- *
- * @param size The size of the generated windows.
- * @return The time policy.
- */
- public static TumblingEventTimeWindows of(Time size) {
- return new TumblingEventTimeWindows(size.toMilliseconds());
- }
-
- @Override
- public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new TimeWindow.Serializer();
- }
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.windowing.triggers;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- /**
- * A {@link Trigger} that fires once the watermark passes the end of the window
- * to which a pane belongs.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- @PublicEvolving
- public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private EventTimeTrigger() {}
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
- ctx.registerEventTimeTimer(window.maxTimestamp());
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
- return TriggerResult.FIRE_AND_PURGE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
- ctx.deleteEventTimeTimer(window.maxTimestamp());
- }
-
- @Override
- public String toString() {
- return "EventTimeTrigger()";
- }
-
- /**
- * Creates an event-time trigger that fires once the watermark passes the end of the window.
- *
- * <p>
- * Once the trigger fires all elements are discarded. Elements that arrive late immediately
- * trigger window evaluation with just this one element.
- */
- public static EventTimeTrigger create() {
- return new EventTimeTrigger();
- }
- }
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.flink.streaming.api.windowing.windows;
-
- import org.apache.flink.annotation.PublicEvolving;
- import org.apache.flink.api.common.typeutils.TypeSerializer;
- import org.apache.flink.core.memory.DataInputView;
- import org.apache.flink.core.memory.DataOutputView;
-
- import java.io.IOException;
-
- /**
- * A {@link Window} that represents a time interval from {@code start} (inclusive) to
- * {@code start + size} (exclusive).
- */
- @PublicEvolving
- public class TimeWindow extends Window {
-
- private final long start;
- private final long end;
-
- public TimeWindow(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
-
- @Override
- public long maxTimestamp() {
- return end - 1;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TimeWindow window = (TimeWindow) o;
-
- return end == window.end && start == window.start;
- }
-
- @Override
- public int hashCode() {
- int result = (int) (start ^ (start >>> 32));
- result = 31 * result + (int) (end ^ (end >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TimeWindow{" +
- "start=" + start +
- ", end=" + end +
- '}';
- }
-
- public static class Serializer extends TypeSerializer<TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public TypeSerializer<TimeWindow> duplicate() {
- return this;
- }
-
- @Override
- public TimeWindow createInstance() {
- return null;
- }
-
- @Override
- public TimeWindow copy(TimeWindow from) {
- return from;
- }
-
- @Override
- public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
- return from;
- }
-
- @Override
- public int getLength() {
- return 0;
- }
-
- @Override
- public void serialize(TimeWindow record, DataOutputView target) throws IOException {
- target.writeLong(record.start);
- target.writeLong(record.end);
- }
-
- @Override
- public TimeWindow deserialize(DataInputView source) throws IOException {
- long start = source.readLong();
- long end = source.readLong();
- return new TimeWindow(start, end);
- }
-
- @Override
- public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
- long start = source.readLong();
- long end = source.readLong();
- return new TimeWindow(start, end);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeLong(source.readLong());
- target.writeLong(source.readLong());
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
- }
-
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.windowing.triggers;
-
- import org.apache.flink.streaming.api.windowing.windows.Window;
-
- /**
- * Result type for trigger methods. This determines what happens with the window,
- * for example whether the window function should be called, or the window
- * should be discarded.
- */
- public enum TriggerResult {
-
- /**
- * No action is taken on the window.
- */
- CONTINUE(false, false),
-
- /**
- * {@code FIRE_AND_PURGE} evaluates the window function and emits the window
- * result.
- */
- FIRE_AND_PURGE(true, true),
-
- /**
- * On {@code FIRE}, the window is evaluated and results are emitted.
- * The window is not purged, though, all elements are retained.
- */
- FIRE(true, false),
-
- /**
- * All elements in the window are cleared and the window is discarded,
- * without evaluating the window function or emitting any elements.
- */
- PURGE(false, true);
-
- // ------------------------------------------------------------------------
-
- private final boolean fire;
- private final boolean purge;
-
- TriggerResult(boolean fire, boolean purge) {
- this.purge = purge;
- this.fire = fire;
- }
-
- public boolean isFire() {
- return fire;
- }
-
- public boolean isPurge() {
- return purge;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Merges two {@code TriggerResults}. This specifies what should happen if we have
- * two results from a Trigger, for example as a result from
- * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and
- * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}.
- *
- * <p>
- * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
- * then {@code FIRE} is the combined result;
- */
- public static TriggerResult merge(TriggerResult a, TriggerResult b) {
- if (a.purge || b.purge) {
- if (a.fire || b.fire) {
- return FIRE_AND_PURGE;
- } else {
- return PURGE;
- }
- } else if (a.fire || b.fire) {
- return FIRE;
- } else {
- return CONTINUE;
- }
- }
- }
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.flink.streaming.api.functions;
-
- import org.apache.flink.streaming.api.watermark.Watermark;
-
- /**
- * A timestamp assigner that assigns timestamps based on the machine's wall clock.
- *
- * <p>If this assigner is used after a stream source, it realizes "ingestion time" semantics.
- *
- * @param <T> The elements that get timestamps assigned.
- */
- public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
- private static final long serialVersionUID = -4072216356049069301L;
-
- private long maxTimestamp;
-
- @Override
- public long extractTimestamp(T element, long previousElementTimestamp) {
- // make sure timestamps are monotonously increasing, even when the system clock re-syncs
- final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
- maxTimestamp = now;
- return now;
- }
-
- @Override
- public Watermark getCurrentWatermark() {
- // make sure timestamps are monotonously increasing, even when the system clock re-syncs
- final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
- maxTimestamp = now;
- return new Watermark(now - 1);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。