Skip to content

Commit

Permalink
[FLINK-3521] Make Iterable part of method signature for WindowFunction
Browse files Browse the repository at this point in the history
This closes apache#1723
  • Loading branch information
aljoscha authored and rmetzger committed Feb 26, 2016
1 parent 64519e1 commit 27b5c49
Show file tree
Hide file tree
Showing 41 changed files with 498 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTim
/**
* Builds up-to-date partial models on new training data.
*/
public static class PartialModelBuilder implements AllWindowFunction<Iterable<Integer>, Double[], TimeWindow> {
public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
private static final long serialVersionUID = 1L;

protected Double[] buildPartialModel(Iterable<Integer> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Key getKey(Type value) {
}
}

public static class SummingWindowFunction implements WindowFunction<Iterable<Tuple2<Long, Long>>, Tuple2<Long, Long>, Long, Window> {
public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {

@Override
public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
Expand Down Expand Up @@ -222,11 +221,10 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function) {
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
@SuppressWarnings("unchecked, rawtypes")
TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class);
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, iterTypeInfo, null, false);
function, AllWindowFunction.class, true, true, getInputType(), null, false);

return apply(function, resultType);
}
Expand All @@ -242,7 +240,7 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<Iterable<T>, R, W> function, TypeInformation<R> resultType) {
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<Iterable<TaggedUnion<T1, T2>>, T, KEY, W> {
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
Expand All @@ -52,6 +51,8 @@
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
Expand Down Expand Up @@ -218,11 +219,9 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function) {
@SuppressWarnings("unchecked, rawtypes")
TypeInformation<Iterable<T>> iterTypeInfo = new GenericTypeInfo<>((Class) Iterable.class);
public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) {
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, WindowFunction.class, true, true, iterTypeInfo, null, false);
function, WindowFunction.class, true, true, getInputType(), null, false);

return apply(function, resultType);
}
Expand All @@ -240,7 +239,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType) {
public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {

//clean the closure
function = input.getExecutionEnvironment().clean(function);
Expand Down Expand Up @@ -270,7 +269,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
new InternalIterableWindowFunction<>(function),
trigger,
evictor);

Expand All @@ -285,7 +284,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
new InternalIterableWindowFunction<>(function),
trigger);
}

Expand Down Expand Up @@ -350,13 +349,13 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new ReduceApplyWindowFunction<>(reduceFunction, function),
trigger,
evictor);
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor);

} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
Expand All @@ -370,7 +369,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
new InternalSingleValueWindowFunction<>(function),
trigger);
}

Expand Down Expand Up @@ -441,7 +440,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new FoldApplyWindowFunction<>(initialValue, foldFunction, function),
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
trigger,
evictor);

Expand All @@ -458,7 +457,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
new InternalSingleValueWindowFunction<>(function),
trigger);
}

Expand Down Expand Up @@ -694,7 +693,7 @@ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
}
else if (function instanceof WindowFunction) {
@SuppressWarnings("unchecked")
WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;

OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(),
Expand Down Expand Up @@ -726,7 +725,7 @@ else if (function instanceof WindowFunction) {
}
else if (function instanceof WindowFunction) {
@SuppressWarnings("unchecked")
WindowFunction<Iterable<T>, R, K, TimeWindow> wf = (WindowFunction<Iterable<T>, R, K, TimeWindow>) function;
WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;

OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@Public
public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable {
public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable {

/**
* Evaluates the window and outputs none or several elements.
Expand All @@ -43,5 +44,5 @@ public interface AllWindowFunction<IN, OUT, W extends Window> extends Function,
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(W window, IN values, Collector<OUT> out) throws Exception;
void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;

@Internal
public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
extends WrappingFunction<AllWindowFunction<ACC, ACC, W>>
implements AllWindowFunction<Iterable<T>, ACC, W>, OutputTypeConfigurable<ACC> {
implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -75,7 +76,7 @@ public void apply(W window, Iterable<T> values, Collector<ACC> out) throws Excep
result = foldFunction.fold(result, val);
}

wrappedFunction.apply(window, result, out);
wrappedFunction.apply(window, Collections.singletonList(result), out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;

@Internal
public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
extends WrappingFunction<WindowFunction<ACC, ACC, K, W>>
implements WindowFunction<Iterable<T>, ACC, K, W>, OutputTypeConfigurable<ACC> {
implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -75,7 +76,7 @@ public void apply(K key, W window, Iterable<T> values, Collector<ACC> out) throw
result = foldFunction.fold(result, val);
}

wrappedFunction.apply(key, window, result, out);
wrappedFunction.apply(key, window, Collections.singletonList(result), out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class PassThroughAllWindowFunction<W extends Window, T> implements AllWin
private static final long serialVersionUID = 1L;

@Override
public void apply(W window, T input, Collector<T> out) throws Exception {
out.collect(input);
public void apply(W window, Iterable<T> input, Collector<T> out) throws Exception {
for (T in: input) {
out.collect(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class PassThroughWindowFunction<K, W extends Window, T> implements Window
private static final long serialVersionUID = 1L;

@Override
public void apply(K k, W window, T input, Collector<T> out) throws Exception {
out.collect(input);
public void apply(K k, W window, Iterable<T> input, Collector<T> out) throws Exception {
for (T in: input) {
out.collect(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

import java.util.Collections;

@Internal
public class ReduceApplyAllWindowFunction<W extends Window, T, R>
extends WrappingFunction<AllWindowFunction<T, R, W>>
implements AllWindowFunction<Iterable<T>, R, W> {
implements AllWindowFunction<T, R, W> {

private static final long serialVersionUID = 1L;

Expand All @@ -51,6 +53,6 @@ public void apply(W window, Iterable<T> input, Collector<R> out) throws Exceptio
curr = reduceFunction.reduce(curr, val);
}
}
windowFunction.apply(window, curr, out);
windowFunction.apply(window, Collections.singletonList(curr), out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

import java.util.Collections;

@Internal
public class ReduceApplyWindowFunction<K, W extends Window, T, R>
extends WrappingFunction<WindowFunction<T, R, K, W>>
implements WindowFunction<Iterable<T>, R, K, W> {
implements WindowFunction<T, R, K, W> {

private static final long serialVersionUID = 1L;

Expand All @@ -51,6 +53,6 @@ public void apply(K k, W window, Iterable<T> input, Collector<R> out) throws Exc
curr = reduceFunction.reduce(curr, val);
}
}
windowFunction.apply(k, window, curr, out);
windowFunction.apply(k, window, Collections.singletonList(curr), out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.util.Collector;

@Internal
public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<Iterable<T>, T, W> {
public class ReduceIterableAllWindowFunction<W extends Window, T> implements AllWindowFunction<T, T, W> {
private static final long serialVersionUID = 1L;

private final ReduceFunction<T> reduceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.util.Collector;

@Internal
public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<Iterable<T>, T, K, W> {
public class ReduceIterableWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> {
private static final long serialVersionUID = 1L;

private final ReduceFunction<T> reduceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
Expand All @@ -45,5 +46,5 @@ public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed

private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();

private final WindowFunction<Iterable<Type>, Result, Key, Window> function;
private final WindowFunction<Type, Result, Key, Window> function;

/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
private long evaluationPass = 1L;

// ------------------------------------------------------------------------

public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Iterable<Type>, Result, Key, Window> function) {
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
}
Expand Down Expand Up @@ -86,7 +86,7 @@ public void evaluateWindow(Collector<Result> out, TimeWindow window,

static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {

private final WindowFunction<Iterable<Type>, Result, Key, Window> function;
private final WindowFunction<Type, Result, Key, Window> function;

private final UnionIterator<Type> unionIterator;

Expand All @@ -99,7 +99,7 @@ static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.
private Key currentKey;


WindowFunctionTraversal(WindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window,
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
this.function = function;
this.out = out;
Expand Down
Loading

0 comments on commit 27b5c49

Please sign in to comment.