Skip to content

Commit

Permalink
[FLINK-4589] [DataStream API] Fix Merging of Covering Window in Mergi…
Browse files Browse the repository at this point in the history
…ngWindowSet

This also adds two new test cases for that problem.

This closes apache#2476
  • Loading branch information
aljoscha authored and StephanEwen committed Sep 15, 2016
1 parent 0492dcc commit 7143a24
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void merge(Collection<W> toBeMerged, W mergeResult) {
}

// the new window created a new, self-contained window without merging
if (resultWindow.equals(newWindow)) {
if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) {
this.windows.put(resultWindow, resultWindow);
}

Expand All @@ -225,4 +225,11 @@ public interface MergeFunction<W> {
*/
void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
}

@Override
public String toString() {
return "MergingWindowSet{" +
"windows=" + windows +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,70 @@ public void testLateMerging() throws Exception {
assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));
}

/**
* Test merging of a large new window that covers one existing windows.
*/
@Test
public void testMergeLargeWindowCoveringSingleWindow() throws Exception {
MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));

TestingMergeFunction mergeFunction = new TestingMergeFunction();

// add an initial small window

mergeFunction.reset();
assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction));
assertFalse(mergeFunction.hasMerged());
assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2)));

// add a new window that completely covers the existing window

mergeFunction.reset();
assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction));
assertTrue(mergeFunction.hasMerged());
assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(0, 3)));
}

/**
* Test merging of a large new window that covers multiple existing windows.
*/
@Test
public void testMergeLargeWindowCoveringMultipleWindows() throws Exception {
MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)));

TestingMergeFunction mergeFunction = new TestingMergeFunction();

// add several non-overlapping initial windoww

mergeFunction.reset();
assertEquals(new TimeWindow(1, 3), windowSet.addWindow(new TimeWindow(1, 3), mergeFunction));
assertFalse(mergeFunction.hasMerged());
assertEquals(new TimeWindow(1, 3), windowSet.getStateWindow(new TimeWindow(1, 3)));

mergeFunction.reset();
assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction));
assertFalse(mergeFunction.hasMerged());
assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8)));

mergeFunction.reset();
assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction));
assertFalse(mergeFunction.hasMerged());
assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13)));

// add a new window that completely covers the existing windows

mergeFunction.reset();
assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(0, 13), mergeFunction));
assertTrue(mergeFunction.hasMerged());
assertThat(mergeFunction.mergedStateWindows(), anyOf(
containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 8)),
containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)),
containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13))));
assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13))));

}


private static class TestingMergeFunction implements MergingWindowSet.MergeFunction<TimeWindow> {
private TimeWindow target = null;
private Collection<TimeWindow> sources = null;
Expand Down

0 comments on commit 7143a24

Please sign in to comment.