Skip to content

Commit

Permalink
[FLINK-6464][streaming] Stabilize default window operator names
Browse files Browse the repository at this point in the history
This closes apache#5332.
  • Loading branch information
zentol committed Jan 30, 2018
1 parent 67ab701 commit 7d4bd4b
Showing 1 changed file with 51 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
Expand Down Expand Up @@ -67,6 +68,10 @@
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.lang.reflect.Type;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -266,10 +271,7 @@ public <R> SingleOutputStreamOperator<R> reduce(
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -282,8 +284,6 @@ public <R> SingleOutputStreamOperator<R> reduce(
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand All @@ -301,8 +301,6 @@ public <R> SingleOutputStreamOperator<R> reduce(
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand Down Expand Up @@ -359,10 +357,7 @@ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -375,8 +370,6 @@ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand All @@ -394,8 +387,6 @@ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand Down Expand Up @@ -523,10 +514,7 @@ public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, foldFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -539,8 +527,6 @@ public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand All @@ -556,8 +542,6 @@ public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand Down Expand Up @@ -637,10 +621,7 @@ public <R, ACC> SingleOutputStreamOperator<R> fold(
windowFunction = input.getExecutionEnvironment().clean(windowFunction);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, foldFunction, windowFunction);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -653,8 +634,6 @@ public <R, ACC> SingleOutputStreamOperator<R> fold(
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand All @@ -673,8 +652,6 @@ public <R, ACC> SingleOutputStreamOperator<R> fold(
foldFunction,
foldResultType.createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand Down Expand Up @@ -832,10 +809,7 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
windowFunction = input.getExecutionEnvironment().clean(windowFunction);
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, aggregateFunction, windowFunction);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -848,8 +822,6 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand All @@ -865,8 +837,6 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand Down Expand Up @@ -991,10 +961,7 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
windowFunction = input.getExecutionEnvironment().clean(windowFunction);
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, aggregateFunction, windowFunction);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -1007,8 +974,6 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand All @@ -1024,8 +989,6 @@ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand Down Expand Up @@ -1074,9 +1037,8 @@ public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> functi
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableWindowFunction<>(function), resultType, callLocation);
return apply(new InternalIterableWindowFunction<>(function), resultType, function);
}

/**
Expand Down Expand Up @@ -1111,16 +1073,13 @@ public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K,
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableProcessWindowFunction<>(function), resultType, callLocation);
return apply(new InternalIterableProcessWindowFunction<>(function), resultType, function);
}

private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, String callLocation) {

String udfName = "WindowedStream." + callLocation;
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
KeySelector<T, K> keySel = input.getKeySelector();

WindowOperator<K, T, Iterable<T>, R, W> operator;
Expand All @@ -1133,8 +1092,6 @@ private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand All @@ -1151,8 +1108,6 @@ private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand Down Expand Up @@ -1213,10 +1168,7 @@ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction,
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -1229,8 +1181,6 @@ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction,
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand All @@ -1248,8 +1198,6 @@ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction,
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
Expand Down Expand Up @@ -1316,10 +1264,7 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
function = input.getExecutionEnvironment().clean(function);
foldFunction = input.getExecutionEnvironment().clean(foldFunction);

String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

String opName;
final String opName = generateOperatorName(windowAssigner, trigger, evictor, foldFunction, function);
KeySelector<T, K> keySel = input.getKeySelector();

OneInputStreamOperator<T, R> operator;
Expand All @@ -1332,8 +1277,6 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand All @@ -1349,8 +1292,6 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";

operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
Expand All @@ -1365,6 +1306,40 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
return input.transform(opName, resultType, operator);
}

private static String generateFunctionName(Function function) {
Class<? extends Function> functionClass = function.getClass();
if (functionClass.isAnonymousClass()) {
// getSimpleName returns an empty String for anonymous classes
Type[] interfaces = functionClass.getInterfaces();
if (interfaces.length == 0) {
// extends an existing class (like RichMapFunction)
Class<?> functionSuperClass = functionClass.getSuperclass();
return functionSuperClass.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
} else {
// implements a Function interface
Class<?> functionInterface = functionClass.getInterfaces()[0];
return functionInterface.getSimpleName() + functionClass.getName().substring(functionClass.getEnclosingClass().getName().length());
}
} else {
return functionClass.getSimpleName();
}
}

private static String generateOperatorName(
WindowAssigner<?, ?> assigner,
Trigger<?, ?> trigger,
@Nullable Evictor<?, ?> evictor,
Function function1,
@Nullable Function function2) {
return "Window(" +
assigner + ", " +
trigger.getClass().getSimpleName() + ", " +
(evictor == null ? "" : (evictor.getClass().getSimpleName() + ", ")) +
generateFunctionName(function1) +
(function2 == null ? "" : (", " + generateFunctionName(function2))) +
")";
}

// ------------------------------------------------------------------------
// Pre-defined aggregations on the keyed windows
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 7d4bd4b

Please sign in to comment.