From 7143a24542fa67b905258be841e5dde32016a687 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 7 Sep 2016 13:51:53 +0200 Subject: [PATCH] [FLINK-4589] [DataStream API] Fix Merging of Covering Window in MergingWindowSet This also adds two new test cases for that problem. This closes #2476 --- .../operators/windowing/MergingWindowSet.java | 9 ++- .../windowing/MergingWindowSetTest.java | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java index c806d2d7a7180..4e19c31c8d393 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java @@ -201,7 +201,7 @@ public void merge(Collection toBeMerged, W mergeResult) { } // the new window created a new, self-contained window without merging - if (resultWindow.equals(newWindow)) { + if (resultWindow.equals(newWindow) && mergeResults.isEmpty()) { this.windows.put(resultWindow, resultWindow); } @@ -225,4 +225,11 @@ public interface MergeFunction { */ void merge(W mergeResult, Collection mergedWindows, W stateWindowResult, Collection mergedStateWindows) throws Exception; } + + @Override + public String toString() { + return "MergingWindowSet{" + + "windows=" + windows + + '}'; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index 939f13fd807ad..e2cb6c8ba25ae 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -205,6 +205,70 @@ public void testLateMerging() throws Exception { assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(0, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13)))); } + /** + * Test merging of a large new window that covers one existing windows. + */ + @Test + public void testMergeLargeWindowCoveringSingleWindow() throws Exception { + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3))); + + TestingMergeFunction mergeFunction = new TestingMergeFunction(); + + // add an initial small window + + mergeFunction.reset(); + assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2))); + + // add a new window that completely covers the existing window + + mergeFunction.reset(); + assertEquals(new TimeWindow(0, 3), windowSet.addWindow(new TimeWindow(0, 3), mergeFunction)); + assertTrue(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(0, 3))); + } + + /** + * Test merging of a large new window that covers multiple existing windows. + */ + @Test + public void testMergeLargeWindowCoveringMultipleWindows() throws Exception { + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3))); + + TestingMergeFunction mergeFunction = new TestingMergeFunction(); + + // add several non-overlapping initial windoww + + mergeFunction.reset(); + assertEquals(new TimeWindow(1, 3), windowSet.addWindow(new TimeWindow(1, 3), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(1, 3), windowSet.getStateWindow(new TimeWindow(1, 3))); + + mergeFunction.reset(); + assertEquals(new TimeWindow(5, 8), windowSet.addWindow(new TimeWindow(5, 8), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(5, 8), windowSet.getStateWindow(new TimeWindow(5, 8))); + + mergeFunction.reset(); + assertEquals(new TimeWindow(10, 13), windowSet.addWindow(new TimeWindow(10, 13), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(10, 13), windowSet.getStateWindow(new TimeWindow(10, 13))); + + // add a new window that completely covers the existing windows + + mergeFunction.reset(); + assertEquals(new TimeWindow(0, 13), windowSet.addWindow(new TimeWindow(0, 13), mergeFunction)); + assertTrue(mergeFunction.hasMerged()); + assertThat(mergeFunction.mergedStateWindows(), anyOf( + containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(5, 8)), + containsInAnyOrder(new TimeWindow(0, 3), new TimeWindow(10, 13)), + containsInAnyOrder(new TimeWindow(5, 8), new TimeWindow(10, 13)))); + assertThat(windowSet.getStateWindow(new TimeWindow(0, 13)), anyOf(is(new TimeWindow(1, 3)), is(new TimeWindow(5, 8)), is(new TimeWindow(10, 13)))); + + } + + private static class TestingMergeFunction implements MergingWindowSet.MergeFunction { private TimeWindow target = null; private Collection sources = null;