Skip to content

Commit

Permalink
[FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows
Browse files Browse the repository at this point in the history
This makes AllWindowedStream.fold() take constant space, just like the
keyed WindowOperator.

Also this adds a new test case in EventTimeAllWindowCheckpointingITCase
to verify that the FoldingWindowBuffer works.

This also renames the preexisting window buffers to ReducingWindowBuffer
and ListWindowBuffer to make the naming scheme consistent.
  • Loading branch information
aljoscha committed Feb 26, 2016
1 parent 27b5c49 commit 20884c0
Show file tree
Hide file tree
Showing 15 changed files with 477 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* @param <O> Type of the elements that the group/list/stream contains
*/
@Public
public interface FoldFunction<O,T> extends Function, Serializable {
public interface FoldFunction<O, T> extends Function, Serializable {
/**
* The core method of FoldFunction, combining two values into one value of the same type.
* The fold function is consecutively applied to all values of a group until only a single value remains.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;

/**
* A {@code AllWindowedStream} represents a data stream where the stream of
Expand Down Expand Up @@ -157,15 +158,15 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger,
evictor);

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger);
}
Expand Down Expand Up @@ -255,20 +256,20 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {

String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";

NonKeyedWindowOperator<T, R, W> operator;
NonKeyedWindowOperator<T, T, R, W> operator;

if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
function,
trigger,
evictor);

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
function,
trigger);
}
Expand Down Expand Up @@ -329,16 +330,16 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new ReduceApplyAllWindowFunction<>(preAggregator, function),
trigger,
evictor);

} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
new ReduceApplyAllWindowFunction<>(preAggregator, function),
new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
function,
trigger);
}

Expand Down Expand Up @@ -400,7 +401,7 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {

operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
trigger,
evictor);
Expand All @@ -410,8 +411,8 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {

operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
function,
trigger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@
* @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
*
* @param <IN> The type of the incoming elements.
* @param <ACC> The type of elements stored in the window buffers.
* @param <OUT> The type of elements emitted by the {@code WindowFunction}.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/
@Internal
public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends NonKeyedWindowOperator<IN, ACC, OUT, W> {

private static final long serialVersionUID = 1L;

private final Evictor<? super IN, ? super W> evictor;

public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
TypeSerializer<W> windowSerializer,
WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
AllWindowFunction<IN, OUT, W> windowFunction,
WindowBufferFactory<? super IN, ACC, ? extends EvictingWindowBuffer<IN, ACC>> windowBufferFactory,
AllWindowFunction<ACC, OUT, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
Expand All @@ -60,7 +61,7 @@ public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssign
@SuppressWarnings("unchecked, rawtypes")
protected void emitWindow(Context context) throws Exception {
timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
EvictingWindowBuffer<IN, ACC> windowBuffer = (EvictingWindowBuffer<IN, ACC>) context.windowBuffer;

int toEvict = 0;
if (windowBuffer.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
Expand All @@ -72,12 +71,13 @@
* @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
*
* @param <IN> The type of the incoming elements.
* @param <ACC> The type of elements stored in the window buffers.
* @param <OUT> The type of elements emitted by the {@code WindowFunction}.
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/
@Internal
public class NonKeyedWindowOperator<IN, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {

private static final long serialVersionUID = 1L;
Expand All @@ -92,7 +92,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>

private final Trigger<? super IN, ? super W> trigger;

private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;

/**
* This is used to copy the incoming element because it can be put into several window
Expand Down Expand Up @@ -145,8 +145,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
*/
public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
TypeSerializer<W> windowSerializer,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
AllWindowFunction<IN, OUT, W> windowFunction,
WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory,
AllWindowFunction<ACC, OUT, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {

super(windowFunction);
Expand Down Expand Up @@ -180,9 +180,6 @@ public final void open() throws Exception {
throw new IllegalStateException("Input serializer was not set.");
}

windowBufferFactory.setRuntimeContext(getRuntimeContext());
windowBufferFactory.open(getUserFunctionParameters());

// these could already be initialized from restoreState()
if (watermarkTimers == null) {
watermarkTimers = new HashMap<>();
Expand Down Expand Up @@ -221,11 +218,6 @@ public final void open() throws Exception {
public final void dispose() {
super.dispose();
windows.clear();
try {
windowBufferFactory.close();
} catch (Exception e) {
throw new RuntimeException("Error while closing WindowBufferFactory.", e);
}
}

@Override
Expand All @@ -236,7 +228,7 @@ public final void processElement(StreamRecord<IN> element) throws Exception {
for (W window: elementWindows) {
Context context = windows.get(window);
if (context == null) {
WindowBuffer<IN> windowBuffer = windowBufferFactory.create();
WindowBuffer<IN, ACC> windowBuffer = windowBufferFactory.create();
context = new Context(window, windowBuffer);
windows.put(window, context);
}
Expand Down Expand Up @@ -356,7 +348,7 @@ public final void trigger(long time) throws Exception {
protected class Context implements TriggerContext {
protected W window;

protected WindowBuffer<IN> windowBuffer;
protected WindowBuffer<IN, ACC> windowBuffer;

protected HashMap<String, Serializable> state;

Expand All @@ -369,7 +361,7 @@ protected class Context implements TriggerContext {

public Context(
W window,
WindowBuffer<IN> windowBuffer) {
WindowBuffer<IN, ACC> windowBuffer) {
this.window = window;
this.windowBuffer = windowBuffer;
state = new HashMap<>();
Expand All @@ -394,12 +386,7 @@ protected Context(DataInputView in, ClassLoader userClassloader) throws Exceptio
in.read(stateData);
state = InstantiationUtil.deserializeObject(stateData, userClassloader);

this.windowBuffer = windowBufferFactory.create();
int numElements = in.readInt();
MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
for (int i = 0; i < numElements; i++) {
windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
}
this.windowBuffer = windowBufferFactory.restoreFromSnapshot(in);
}

protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException {
Expand All @@ -411,11 +398,7 @@ protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out)
out.writeInt(serializedState.length);
out.write(serializedState, 0, serializedState.length);

MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
out.writeInt(windowBuffer.size());
for (StreamRecord<IN> element: windowBuffer.getElements()) {
recordSerializer.serialize(element, out);
}
windowBuffer.snapshot(out);
}

@Override
Expand Down Expand Up @@ -635,7 +618,7 @@ public WindowAssigner<? super IN, W> getWindowAssigner() {
}

@VisibleForTesting
public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
return windowBufferFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
* the buffer.
*
* @param <T> The type of elements that this {@code WindowBuffer} can store.
* @param <O> The type of elements that this window buffer will return when asked for its contents.
*/
@Internal
public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> {

/**
* Removes the given number of elements, starting from the beginning.
Expand Down
Loading

0 comments on commit 20884c0

Please sign in to comment.