From 27b5c49e76f58992fd5575959a7dea7088505e12 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 26 Feb 2016 15:19:50 +0100 Subject: [PATCH] [FLINK-3521] Make Iterable part of method signature for WindowFunction This closes #1723 --- .../ml/IncrementalLearningSkeleton.java | 2 +- .../GroupedProcessingTimeWindowExample.java | 2 +- .../api/datastream/AllWindowedStream.java | 8 +- .../api/datastream/CoGroupedStreams.java | 2 +- .../api/datastream/WindowedStream.java | 39 +++++----- .../windowing/AllWindowFunction.java | 5 +- .../windowing/FoldApplyAllWindowFunction.java | 5 +- .../windowing/FoldApplyWindowFunction.java | 5 +- .../PassThroughAllWindowFunction.java | 6 +- .../windowing/PassThroughWindowFunction.java | 6 +- .../ReduceApplyAllWindowFunction.java | 6 +- .../windowing/ReduceApplyWindowFunction.java | 6 +- .../ReduceIterableAllWindowFunction.java | 2 +- .../ReduceIterableWindowFunction.java | 2 +- .../functions/windowing/WindowFunction.java | 3 +- .../windowing/AccumulatingKeyedTimePanes.java | 8 +- ...umulatingProcessingTimeWindowOperator.java | 6 +- .../EvictingNonKeyedWindowOperator.java | 2 +- .../windowing/EvictingWindowOperator.java | 6 +- .../windowing/NonKeyedWindowOperator.java | 4 +- .../operators/windowing/WindowOperator.java | 12 +-- .../InternalIterableWindowFunction.java | 72 ++++++++++++++++++ .../InternalSingleValueWindowFunction.java | 74 +++++++++++++++++++ .../functions/InternalWindowFunction.java | 47 ++++++++++++ .../flink/streaming/api/DataStreamTest.java | 2 +- .../FoldApplyWindowFunctionTest.java | 6 +- ...ignedProcessingTimeWindowOperatorTest.java | 10 +-- .../windowing/AllWindowTranslationTest.java | 6 +- .../windowing/EvictingWindowOperatorTest.java | 7 +- .../windowing/TimeWindowTranslationTest.java | 6 +- .../windowing/WindowOperatorTest.java | 16 ++-- .../windowing/WindowTranslationTest.java | 7 +- .../api/scala/AllWindowedStream.scala | 43 +++++++---- .../streaming/api/scala/WindowedStream.scala | 47 ++++++++---- .../scala/function/AllWindowFunction.scala | 45 +++++++++++ .../api/scala/function/WindowFunction.scala | 47 ++++++++++++ .../api/scala/AllWindowTranslationTest.scala | 12 +-- .../api/scala/WindowTranslationTest.scala | 12 +-- ...EventTimeAllWindowCheckpointingITCase.java | 22 ++++-- .../EventTimeWindowCheckpointingITCase.java | 24 ++++-- .../WindowCheckpointingITCase.java | 4 +- 41 files changed, 498 insertions(+), 148 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index acbc5d671a353..41084852f6486 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -167,7 +167,7 @@ public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTim /** * Builds up-to-date partial models on new training data. */ - public static class PartialModelBuilder implements AllWindowFunction, Double[], TimeWindow> { + public static class PartialModelBuilder implements AllWindowFunction { private static final long serialVersionUID = 1L; protected Double[] buildPartialModel(Iterable values) { diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 196b73e0d14bc..f08069b310ec9 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -104,7 +104,7 @@ public Key getKey(Type value) { } } - public static class SummingWindowFunction implements WindowFunction>, Tuple2, Long, Window> { + public static class SummingWindowFunction implements WindowFunction, Tuple2, Long, Window> { @Override public void apply(Long key, Window window, Iterable> values, Collector> out) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 56640d31b31ef..6b32880252856 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -222,11 +221,10 @@ public AllWindowedStream evictor(Evictor evictor) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(AllWindowFunction, R, W> function) { + public SingleOutputStreamOperator apply(AllWindowFunction function) { @SuppressWarnings("unchecked, rawtypes") - TypeInformation> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, iterTypeInfo, null, false); + function, AllWindowFunction.class, true, true, getInputType(), null, false); return apply(function, resultType); } @@ -242,7 +240,7 @@ public AllWindowedStream evictor(Evictor evictor) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(AllWindowFunction, R, W> function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(AllWindowFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index e921940f531c8..713433c7ce65d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -556,7 +556,7 @@ public KEY getKey(TaggedUnion value) throws Exception{ private static class CoGroupWindowFunction extends WrappingFunction> - implements WindowFunction>, T, KEY, W> { + implements WindowFunction, T, KEY, W> { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 2ced99d3283ec..5c92fe0f1008c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; @@ -52,6 +51,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; @@ -218,11 +219,9 @@ public WindowedStream evictor(Evictor evictor) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(WindowFunction, R, K, W> function) { - @SuppressWarnings("unchecked, rawtypes") - TypeInformation> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class); + public SingleOutputStreamOperator apply(WindowFunction function) { TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, iterTypeInfo, null, false); + function, WindowFunction.class, true, true, getInputType(), null, false); return apply(function, resultType); } @@ -240,7 +239,7 @@ public WindowedStream evictor(Evictor evictor) { * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(WindowFunction, R, K, W> function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(WindowFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -270,7 +269,7 @@ public WindowedStream evictor(Evictor evictor) { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalIterableWindowFunction<>(function), trigger, evictor); @@ -285,7 +284,7 @@ public WindowedStream evictor(Evictor evictor) { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalIterableWindowFunction<>(function), trigger); } @@ -350,13 +349,13 @@ public WindowedStream evictor(Evictor evictor) { opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new ReduceApplyWindowFunction<>(reduceFunction, function), - trigger, - evictor); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)), + trigger, + evictor); } else { ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("window-contents", @@ -370,7 +369,7 @@ public WindowedStream evictor(Evictor evictor) { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalSingleValueWindowFunction<>(function), trigger); } @@ -441,7 +440,7 @@ public WindowedStream evictor(Evictor evictor) { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - new FoldApplyWindowFunction<>(initialValue, foldFunction, function), + new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)), trigger, evictor); @@ -458,7 +457,7 @@ public WindowedStream evictor(Evictor evictor) { keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, - function, + new InternalSingleValueWindowFunction<>(function), trigger); } @@ -694,7 +693,7 @@ public WindowedStream evictor(Evictor evictor) { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction, R, K, TimeWindow> wf = (WindowFunction, R, K, TimeWindow>) function; + WindowFunction wf = (WindowFunction) function; OneInputStreamOperator op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), @@ -726,7 +725,7 @@ else if (function instanceof WindowFunction) { } else if (function instanceof WindowFunction) { @SuppressWarnings("unchecked") - WindowFunction, R, K, TimeWindow> wf = (WindowFunction, R, K, TimeWindow>) function; + WindowFunction wf = (WindowFunction) function; OneInputStreamOperator op = new AccumulatingProcessingTimeWindowOperator<>( wf, input.getKeySelector(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java index 62e86ca15b323..c497b4a8faf36 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java @@ -30,9 +30,10 @@ * * @param The type of the input value. * @param The type of the output value. + * @param The type of {@code Window} that this window function can be applied on. */ @Public -public interface AllWindowFunction extends Function, Serializable { +public interface AllWindowFunction extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. @@ -43,5 +44,5 @@ public interface AllWindowFunction extends Function, * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(W window, IN values, Collector out) throws Exception; + void apply(W window, Iterable values, Collector out) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index 76fd562c5546b..a5bc0a1b2126f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -33,11 +33,12 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; @Internal public class FoldApplyAllWindowFunction extends WrappingFunction> - implements AllWindowFunction, ACC, W>, OutputTypeConfigurable { + implements AllWindowFunction, OutputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -75,7 +76,7 @@ public void apply(W window, Iterable values, Collector out) throws Excep result = foldFunction.fold(result, val); } - wrappedFunction.apply(window, result, out); + wrappedFunction.apply(window, Collections.singletonList(result), out); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 40e8830031644..756a6833566cd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -33,11 +33,12 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; @Internal public class FoldApplyWindowFunction extends WrappingFunction> - implements WindowFunction, ACC, K, W>, OutputTypeConfigurable { + implements WindowFunction, OutputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -75,7 +76,7 @@ public void apply(K key, W window, Iterable values, Collector out) throw result = foldFunction.fold(result, val); } - wrappedFunction.apply(key, window, result, out); + wrappedFunction.apply(key, window, Collections.singletonList(result), out); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java index 3e3ffca052b5b..443564488f5aa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java @@ -27,7 +27,9 @@ public class PassThroughAllWindowFunction implements AllWin private static final long serialVersionUID = 1L; @Override - public void apply(W window, T input, Collector out) throws Exception { - out.collect(input); + public void apply(W window, Iterable input, Collector out) throws Exception { + for (T in: input) { + out.collect(in); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java index 21709b9cb0b03..319acb68ef7f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java @@ -27,7 +27,9 @@ public class PassThroughWindowFunction implements Window private static final long serialVersionUID = 1L; @Override - public void apply(K k, W window, T input, Collector out) throws Exception { - out.collect(input); + public void apply(K k, W window, Iterable input, Collector out) throws Exception { + for (T in: input) { + out.collect(in); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java index ce1615bf82080..5b8dd700f0d04 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java @@ -23,10 +23,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + @Internal public class ReduceApplyAllWindowFunction extends WrappingFunction> - implements AllWindowFunction, R, W> { + implements AllWindowFunction { private static final long serialVersionUID = 1L; @@ -51,6 +53,6 @@ public void apply(W window, Iterable input, Collector out) throws Exceptio curr = reduceFunction.reduce(curr, val); } } - windowFunction.apply(window, curr, out); + windowFunction.apply(window, Collections.singletonList(curr), out); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java index 75ea2d2bc27d8..f896282e3b8ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java @@ -23,10 +23,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + @Internal public class ReduceApplyWindowFunction extends WrappingFunction> - implements WindowFunction, R, K, W> { + implements WindowFunction { private static final long serialVersionUID = 1L; @@ -51,6 +53,6 @@ public void apply(K k, W window, Iterable input, Collector out) throws Exc curr = reduceFunction.reduce(curr, val); } } - windowFunction.apply(k, window, curr, out); + windowFunction.apply(k, window, Collections.singletonList(curr), out); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java index a3b35ae98305c..8ec5809ac93c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableAllWindowFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.util.Collector; @Internal -public class ReduceIterableAllWindowFunction implements AllWindowFunction, T, W> { +public class ReduceIterableAllWindowFunction implements AllWindowFunction { private static final long serialVersionUID = 1L; private final ReduceFunction reduceFunction; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java index e29641137b375..afb021950dcf4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceIterableWindowFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.util.Collector; @Internal -public class ReduceIterableWindowFunction implements WindowFunction, T, K, W> { +public class ReduceIterableWindowFunction implements WindowFunction { private static final long serialVersionUID = 1L; private final ReduceFunction reduceFunction; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java index 83ef18e0b86f6..154fe88d13aea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java @@ -31,6 +31,7 @@ * @param The type of the input value. * @param The type of the output value. * @param The type of the key. + * @param The type of {@code Window} that this window function can be applied on. */ @Public public interface WindowFunction extends Function, Serializable { @@ -45,5 +46,5 @@ public interface WindowFunction extends Function * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - void apply(KEY key, W window, IN input, Collector out) throws Exception; + void apply(KEY key, W window, Iterable input, Collector out) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index b8307892ec6e3..9b353fead8d3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes extends AbstractKeyed private final KeyMap.LazyFactory> listFactory = getListFactory(); - private final WindowFunction, Result, Key, Window> function; + private final WindowFunction function; /** * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */ @@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes extends AbstractKeyed // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector keySelector, WindowFunction, Result, Key, Window> function) { + public AccumulatingKeyedTimePanes(KeySelector keySelector, WindowFunction function) { this.keySelector = keySelector; this.function = function; } @@ -86,7 +86,7 @@ public void evaluateWindow(Collector out, TimeWindow window, static final class WindowFunctionTraversal implements KeyMap.TraversalEvaluator> { - private final WindowFunction, Result, Key, Window> function; + private final WindowFunction function; private final UnionIterator unionIterator; @@ -99,7 +99,7 @@ static final class WindowFunctionTraversal implements KeyMap. private Key currentKey; - WindowFunctionTraversal(WindowFunction, Result, Key, Window> function, TimeWindow window, + WindowFunctionTraversal(WindowFunction function, TimeWindow window, Collector out, AbstractStreamOperator contextOperator) { this.function = function; this.out = out; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java index 2f0d4fe3a565b..9ea294972d002 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -33,13 +33,13 @@ @Internal public class AccumulatingProcessingTimeWindowOperator - extends AbstractAlignedProcessingTimeWindowOperator, WindowFunction, OUT, KEY, TimeWindow>> { + extends AbstractAlignedProcessingTimeWindowOperator, WindowFunction> { private static final long serialVersionUID = 7305948082830843475L; public AccumulatingProcessingTimeWindowOperator( - WindowFunction, OUT, KEY, TimeWindow> function, + WindowFunction function, KeySelector keySelector, TypeSerializer keySerializer, TypeSerializer valueSerializer, @@ -53,7 +53,7 @@ public AccumulatingProcessingTimeWindowOperator( @Override protected AccumulatingKeyedTimePanes createPanes(KeySelector keySelector, Function function) { @SuppressWarnings("unchecked") - WindowFunction, OUT, KEY, Window> windowFunction = (WindowFunction, OUT, KEY, Window>) function; + WindowFunction windowFunction = (WindowFunction) function; return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java index 510ebb2245380..221367d5b7d57 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java @@ -49,7 +49,7 @@ public class EvictingNonKeyedWindowOperator extends N public EvictingNonKeyedWindowOperator(WindowAssigner windowAssigner, TypeSerializer windowSerializer, WindowBufferFactory> windowBufferFactory, - AllWindowFunction, OUT, W> windowFunction, + AllWindowFunction windowFunction, Trigger trigger, Evictor evictor) { super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index cfab3d573563e..16ca488a503e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -27,12 +27,12 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; @@ -48,7 +48,7 @@ * * @param The type of key returned by the {@code KeySelector}. * @param The type of the incoming elements. - * @param The type of elements emitted by the {@code WindowFunction}. + * @param The type of elements emitted by the {@code InternalWindowFunction}. * @param The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal @@ -65,7 +65,7 @@ public EvictingWindowOperator(WindowAssigner windowAssigner, KeySelector keySelector, TypeSerializer keySerializer, StateDescriptor>, ?> windowStateDescriptor, - WindowFunction, OUT, K, W> windowFunction, + InternalWindowFunction, OUT, K, W> windowFunction, Trigger trigger, Evictor evictor) { super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index e42d7b40f9f8c..95feadc0de655 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -77,7 +77,7 @@ */ @Internal public class NonKeyedWindowOperator - extends AbstractUdfStreamOperator, OUT, W>> + extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -146,7 +146,7 @@ public class NonKeyedWindowOperator public NonKeyedWindowOperator(WindowAssigner windowAssigner, TypeSerializer windowSerializer, WindowBufferFactory> windowBufferFactory, - AllWindowFunction, OUT, W> windowFunction, + AllWindowFunction windowFunction, Trigger trigger) { super(windowFunction); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 289492b9c23ef..9b7b347e8ea09 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -34,7 +34,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -47,6 +46,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; @@ -74,7 +74,7 @@ *

* Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when * the contents of the pane should be processed to emit results. When a trigger fires, - * the given {@link WindowFunction} is invoked to produce the results that are emitted for + * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for * the pane to which the {@code Trigger} belongs. * *

@@ -83,12 +83,12 @@ * * @param The type of key returned by the {@code KeySelector}. * @param The type of the incoming elements. - * @param The type of elements emitted by the {@code WindowFunction}. + * @param The type of elements emitted by the {@code InternalWindowFunction}. * @param The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal public class WindowOperator - extends AbstractUdfStreamOperator> + extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -126,7 +126,7 @@ public class WindowOperator // ------------------------------------------------------------------------ /** - * This is given to the {@code WindowFunction} for emitting elements with a given timestamp. + * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector timestampedCollector; @@ -162,7 +162,7 @@ public WindowOperator(WindowAssigner windowAssigner, KeySelector keySelector, TypeSerializer keySerializer, StateDescriptor, ?> windowStateDescriptor, - WindowFunction windowFunction, + InternalWindowFunction windowFunction, Trigger trigger) { super(windowFunction); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java new file mode 100644 index 0000000000000..32318eacd1ffa --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * when the window state also is an {@code Iterable}. + */ +public final class InternalIterableWindowFunction extends InternalWindowFunction, OUT, KEY, W> implements RichFunction { + private static final long serialVersionUID = 1L; + + protected WindowFunction wrappedFunction; + + public InternalIterableWindowFunction(WindowFunction wrappedFunction) { + this.wrappedFunction = wrappedFunction; + } + + @Override + public void apply(KEY key, W window, Iterable input, Collector out) throws Exception { + wrappedFunction.apply(key, window, input, out); + } + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(this.wrappedFunction, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.wrappedFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java new file mode 100644 index 0000000000000..fd10e5c058004 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.util.Collections; + +/** + * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable} + * when the window state is a single value. + */ +public final class InternalSingleValueWindowFunction extends InternalWindowFunction implements RichFunction { + private static final long serialVersionUID = 1L; + + protected WindowFunction wrappedFunction; + + public InternalSingleValueWindowFunction(WindowFunction wrappedFunction) { + this.wrappedFunction = wrappedFunction; + } + + @Override + public void apply(KEY key, W window, IN input, Collector out) throws Exception { + wrappedFunction.apply(key, window, Collections.singletonList(input), out); + } + + @Override + public void open(Configuration parameters) throws Exception { + FunctionUtils.openFunction(this.wrappedFunction, parameters); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(this.wrappedFunction); + } + + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); + } + + @Override + public RuntimeContext getRuntimeContext() { + throw new RuntimeException("This should never be called."); + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + throw new RuntimeException("This should never be called."); + + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java new file mode 100644 index 0000000000000..e75f3beb024cf --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.functions; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Internal interface for functions that are evaluated over keyed (grouped) windows. + * + * @param The type of the input value. + * @param The type of the output value. + * @param The type of the key. + */ +public abstract class InternalWindowFunction implements Function, Serializable { + private static final long serialVersionUID = 1L; + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public abstract void apply(KEY key, W window, IN input, Collector out) throws Exception; +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 502198c3a631e..7a4d6f84e6653 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -518,7 +518,7 @@ public Tuple2 map(Long value) throws Exception { DataStream window = map .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(5))) - .apply(new AllWindowFunction>, String, GlobalWindow>() { + .apply(new AllWindowFunction, String, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable> values, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index b8d57a6e8b003..0b0ab9ee3cb63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -70,9 +70,11 @@ public Integer fold(Integer accumulator, Integer value) throws Exception { @Override public void apply(Integer integer, TimeWindow window, - Integer input, + Iterable input, Collector out) throws Exception { - out.collect(input); + for (Integer in: input) { + out.collect(in); + } } } ); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index b6e51c6aafd54..dff118424631a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -67,7 +67,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") - private final WindowFunction, String, String, TimeWindow> mockFunction = mock(WindowFunction.class); + private final WindowFunction mockFunction = mock(WindowFunction.class); @SuppressWarnings("unchecked") private final KeySelector mockKeySelector = mock(KeySelector.class); @@ -79,8 +79,8 @@ public Integer getKey(Integer value) { } }; - private final WindowFunction, Integer, Integer, TimeWindow> validatingIdentityFunction = - new WindowFunction, Integer, Integer, TimeWindow>() + private final WindowFunction validatingIdentityFunction = + new WindowFunction() { @Override public void apply(Integer key, @@ -723,7 +723,7 @@ private void assertInvalidParameter(long windowSize, long windowSlide) { // ------------------------------------------------------------------------ - private static class FailingFunction implements WindowFunction, Integer, Integer, TimeWindow> { + private static class FailingFunction implements WindowFunction { private final int failAfterElements; @@ -751,7 +751,7 @@ public void apply(Integer integer, // ------------------------------------------------------------------------ - private static class StatefulFunction extends RichWindowFunction, Integer, Integer, TimeWindow> { + private static class StatefulFunction extends RichWindowFunction { // we use a concurrent map here even though there is no concurrency, to // get "volatile" style access to entries diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 05832905e5243..42f452c6c9a75 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -76,7 +76,7 @@ public void testEventTime() throws Exception { DataStream> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new AllWindowFunction>, Tuple2, TimeWindow>() { + .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -123,7 +123,7 @@ public void testNonEvicting() throws Exception { DataStream> window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction>, Tuple2, TimeWindow>() { + .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -172,7 +172,7 @@ public void testEvicting() throws Exception { .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction>, Tuple2, TimeWindow>() { + .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 2f1dce5567dea..6af7ac4ce1f44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -70,7 +71,7 @@ public void testCountTrigger() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceIterableWindowFunction>(new SumReducer()), + new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction>(new SumReducer())), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); @@ -141,7 +142,7 @@ public void testCountTriggerWithApply() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer(closeCalled), + new InternalIterableWindowFunction<>(new RichSumReducer(closeCalled)), CountTrigger.of(WINDOW_SLIDE), CountEvictor.of(WINDOW_SIZE)); @@ -208,7 +209,7 @@ public Tuple2 reduce(Tuple2 value1, } } - public static class RichSumReducer extends RichWindowFunction>, Tuple2, String, W> { + public static class RichSumReducer extends RichWindowFunction, Tuple2, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index a5a8df33178b9..f2149417691f2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -74,7 +74,7 @@ public void testFastTimeWindows() throws Exception { DataStream> window2 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new WindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -117,7 +117,7 @@ public void testEventTimeWindows() throws Exception { DataStream> window2 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new WindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -165,7 +165,7 @@ public void testNonParallelFastTimeWindows() throws Exception { DataStream> window2 = source .timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS)) - .apply(new AllWindowFunction>, Tuple2, TimeWindow>() { + .apply(new AllWindowFunction, Tuple2, TimeWindow>() { private static final long serialVersionUID = 1L; @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index c1111a08753e4..a1f08ad019b26 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -41,6 +41,8 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -140,7 +142,7 @@ public void testSlidingEventTimeWindowsReduce() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create()); operator.setInputType(inputType, new ExecutionConfig()); @@ -176,7 +178,7 @@ public void testSlidingEventTimeWindowsApply() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer(), + new InternalIterableWindowFunction<>(new RichSumReducer()), EventTimeTrigger.create()); operator.setInputType(inputType, new ExecutionConfig()); @@ -271,7 +273,7 @@ public void testTumblingEventTimeWindowsReduce() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); @@ -306,7 +308,7 @@ public void testTumblingEventTimeWindowsApply() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new RichSumReducer(), + new InternalIterableWindowFunction<>(new RichSumReducer()), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); @@ -344,7 +346,7 @@ public void testContinuousWatermarkTrigger() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); @@ -434,7 +436,7 @@ public void testCountTrigger() throws Exception { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new PassThroughWindowFunction>(), + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); operator.setInputType(TypeInfoParser.>parse( @@ -497,7 +499,7 @@ public Tuple2 reduce(Tuple2 value1, } - public static class RichSumReducer extends RichWindowFunction>, Tuple2, String, W> { + public static class RichSumReducer extends RichWindowFunction, Tuple2, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index c57da8a7a3ff3..30bb840db0d55 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -104,7 +104,7 @@ public void testEventTime() throws Exception { DataStream> window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -112,7 +112,6 @@ public void apply(Tuple tuple, TimeWindow window, Iterable> values, Collector> out) throws Exception { - } }); @@ -153,7 +152,7 @@ public void testNonEvicting() throws Exception { .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -204,7 +203,7 @@ public void testEvicting() throws Exception { .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index e36542e027b65..3c91529bebea5 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window @@ -179,10 +180,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( - function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = { + function: AllWindowFunction[T, R, W]): DataStream[R] = { val cleanedFunction = clean(function) - val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { + val javaFunction = new JAllWindowFunction[T, R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } @@ -205,7 +206,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { val cleanedFunction = clean(function) - val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { + val applyFunction = new JAllWindowFunction[T, R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } @@ -228,8 +229,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { preAggregator: ReduceFunction[T], function: AllWindowFunction[T, R, W]): DataStream[R] = { + val cleanedFunction = clean(function) + val applyFunction = new JAllWindowFunction[T, R, W] { + def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanedFunction(window, elements.asScala, out) + } + } + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType)) + asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType)) } /** @@ -245,7 +253,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (W, T, Collector[R]) => Unit): DataStream[R] = { + function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -259,9 +267,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } val cleanApply = clean(function) - val applyFunction = new AllWindowFunction[T, R, W] { - def apply(window: W, input: T, out: Collector[R]): Unit = { - cleanApply(window, input, out) + val applyFunction = new JAllWindowFunction[T, R, W] { + def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(window, input.asScala, out) } } @@ -285,11 +293,18 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { initialValue: R, preAggregator: FoldFunction[T, R], function: AllWindowFunction[R, R, W]): DataStream[R] = { + + val cleanedFunction = clean(function) + val applyFunction = new JAllWindowFunction[R, R, W] { + def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanedFunction(window, elements.asScala, out) + } + } asScalaStream(javaStream.apply( initialValue, clean(preAggregator), - clean(function), + applyFunction, implicitly[TypeInformation[R]])) } @@ -308,7 +323,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def apply[R: TypeInformation]( initialValue: R, preAggregator: (R, T) => R, - function: (W, R, Collector[R]) => Unit): DataStream[R] = { + function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -322,9 +337,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } val cleanApply = clean(function) - val applyFunction = new AllWindowFunction[R, R, W] { - def apply(window: W, input: R, out: Collector[R]): Unit = { - cleanApply(window, input, out) + val applyFunction = new JAllWindowFunction[R, R, W] { + def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanApply(window, input.asScala, out) } } val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 53f033c1e5b85..b7f9e004d1dd9 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.scala.function.WindowFunction +import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction} import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window @@ -182,10 +183,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( - function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = { + function: WindowFunction[T, R, K, W]): DataStream[R] = { val cleanFunction = clean(function) - val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { + val javaFunction = new JWindowFunction[T, R, K, W] { def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { cleanFunction.apply(key, window, input.asScala, out) } @@ -212,7 +213,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanedFunction = clean(function) - val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { + val applyFunction = new JWindowFunction[T, R, K, W] { def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(key, window, elements.asScala, out) } @@ -235,8 +236,16 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] = { + val cleanedFunction = clean(function) + + val applyFunction = new JWindowFunction[T, R, K, W] { + def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanedFunction.apply(key, window, elements.asScala, out) + } + } + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType)) + asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType)) } /** @@ -252,7 +261,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (K, W, T, Collector[R]) => Unit): DataStream[R] = { + function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") @@ -267,9 +276,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanApply = clean(function) - val applyFunction = new WindowFunction[T, R, K, W] { - def apply(key: K, window: W, input: T, out: Collector[R]): Unit = { - cleanApply(key, window, input, out) + val applyFunction = new JWindowFunction[T, R, K, W] { + def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanApply(key, window, input.asScala, out) } } @@ -292,11 +301,19 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { initialValue: R, foldFunction: FoldFunction[T, R], function: WindowFunction[R, R, K, W]): DataStream[R] = { - + + val cleanedFunction = clean(function) + + val applyFunction = new JWindowFunction[R, R, K, W] { + def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanedFunction.apply(key, window, elements.asScala, out) + } + } + asScalaStream(javaStream.apply( initialValue, clean(foldFunction), - clean(function), + applyFunction, implicitly[TypeInformation[R]])) } @@ -314,7 +331,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { def apply[R: TypeInformation]( initialValue: R, foldFunction: (R, T) => R, - function: (K, W, R, Collector[R]) => Unit): DataStream[R] = { + function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Fold function must not be null.") @@ -329,9 +346,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanApply = clean(function) - val applyFunction = new WindowFunction[R, R, K, W] { - def apply(key: K, window: W, input: R, out: Collector[R]): Unit = { - cleanApply(key, window, input, out) + val applyFunction = new JWindowFunction[R, R, K, W] { + def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { + cleanApply(key, window, input.asScala, out) } } val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala new file mode 100644 index 0000000000000..4e77d8366f060 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + */ +@Public +trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + def apply(window: W, input: Iterable[IN], out: Collector[OUT]) +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala new file mode 100644 index 0000000000000..67236b7115b97 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam KEY The type of the key. + */ +@Public +trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) +} diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index dcdfa91826a14..f73307cb8fb56 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.ReducingStateDescriptor import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction} +import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} @@ -76,7 +76,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -122,7 +122,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -173,7 +173,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -211,7 +211,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -236,7 +236,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 673d7b3aa1c7a..48ff6402dedfc 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} @@ -65,7 +65,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) .timeWindow(Time.minutes(1)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( key: Tuple, window: TimeWindow, @@ -114,7 +114,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -168,7 +168,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -207,7 +207,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -232,7 +232,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 436dd0d412273..d18a45e9b63d6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -105,7 +105,7 @@ public void testTumblingTimeWindow() { NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichAllWindowFunction>, Tuple4, TimeWindow>() { + .apply(new RichAllWindowFunction, Tuple4, TimeWindow>() { private boolean open = false; @@ -167,7 +167,7 @@ public void testSlidingTimeWindow() { .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichAllWindowFunction>, Tuple4, TimeWindow>() { + .apply(new RichAllWindowFunction, Tuple4, TimeWindow>() { private boolean open = false; @@ -254,13 +254,18 @@ public void open(Configuration parameters) { @Override public void apply( TimeWindow window, - Tuple2 input, + Iterable> input, Collector> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2 in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -323,13 +328,18 @@ public void open(Configuration parameters) { @Override public void apply( TimeWindow window, - Tuple2 input, + Iterable> input, Collector> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2 in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 6f178d849941e..ce705e1b4aa25 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -152,7 +152,7 @@ public void testTumblingTimeWindow() { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction>, Tuple4, Tuple, TimeWindow>() { + .apply(new RichWindowFunction, Tuple4, Tuple, TimeWindow>() { private boolean open = false; @@ -216,7 +216,7 @@ public void testTumblingTimeWindowWithKVState() { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction>, Tuple4, Tuple, TimeWindow>() { + .apply(new RichWindowFunction, Tuple4, Tuple, TimeWindow>() { private boolean open = false; @@ -285,7 +285,7 @@ public void testSlidingTimeWindow() { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichWindowFunction>, Tuple4, Tuple, TimeWindow>() { + .apply(new RichWindowFunction, Tuple4, Tuple, TimeWindow>() { private boolean open = false; @@ -373,13 +373,18 @@ public void open(Configuration parameters) { public void apply( Tuple tuple, TimeWindow window, - Tuple2 input, + Iterable> input, Collector> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2 in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -443,13 +448,18 @@ public void open(Configuration parameters) { public void apply( Tuple tuple, TimeWindow window, - Tuple2 input, + Iterable> input, Collector> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2 in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index eb5ef5a03246b..aa5ff3b238379 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -117,7 +117,7 @@ public void testTumblingProcessingTimeWindow() { .rebalance() .keyBy(0) .timeWindow(Time.of(100, MILLISECONDS)) - .apply(new RichWindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new RichWindowFunction, Tuple2, Tuple, TimeWindow>() { private boolean open = false; @@ -175,7 +175,7 @@ public void testSlidingProcessingTimeWindow() { .rebalance() .keyBy(0) .timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS)) - .apply(new RichWindowFunction>, Tuple2, Tuple, TimeWindow>() { + .apply(new RichWindowFunction, Tuple2, Tuple, TimeWindow>() { private boolean open = false;