Skip to content

Commit

Permalink
[FLINK-2257] [streaming] Properly forward rich window function calls …
Browse files Browse the repository at this point in the history
…to wrapped functions

Closes apache#855
  • Loading branch information
mbalassi committed Jun 25, 2015
1 parent ba2796a commit 658a078
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.flink.streaming.api.operators.windowing;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
Expand Down Expand Up @@ -69,11 +70,31 @@ public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
return outputWindow;
}

// --------------------------------------------------------------------------------------------
// Forwarding calls to the wrapped folder
// --------------------------------------------------------------------------------------------

@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(folder, parameters);
}

@Override
public void close() throws Exception {
FunctionUtils.closeFunction(folder);
}

@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(folder, t);
}

@Override
public RuntimeContext getRuntimeContext() {
return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext());
}

// streaming does not use iteration runtime context, so that is omitted
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
Expand Down Expand Up @@ -63,11 +64,31 @@ public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
return outputWindow;
}

// --------------------------------------------------------------------------------------------
// Forwarding calls to the wrapped mapper
// --------------------------------------------------------------------------------------------

@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(mapper, parameters);
}

@Override
public void close() throws Exception {
FunctionUtils.closeFunction(mapper);
}

@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(mapper, t);
}

@Override
public RuntimeContext getRuntimeContext() {
return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext());
}

// streaming does not use iteration runtime context, so that is omitted
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
Expand Down Expand Up @@ -67,11 +68,32 @@ public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
return outputWindow;
}

// --------------------------------------------------------------------------------------------
// Forwarding calls to the wrapped reducer
// --------------------------------------------------------------------------------------------


@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(reducer, parameters);
}

@Override
public void close() throws Exception {
FunctionUtils.closeFunction(reducer);
}

@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(reducer, t);
}

@Override
public RuntimeContext getRuntimeContext() {
return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext());
}

// streaming does not use iteration runtime context, so that is omitted
}

}

0 comments on commit 658a078

Please sign in to comment.