Skip to content

Commit

Permalink
Break out TriggerTester from ReduceFnTester
Browse files Browse the repository at this point in the history
----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=112009014
  • Loading branch information
kennknowles authored and lukecwik committed Jan 15, 2016
1 parent 951723f commit 44e3e98
Show file tree
Hide file tree
Showing 13 changed files with 1,151 additions and 942 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void prefetchOnElement(StateContext state) {
state.access(DELAYED_UNTIL_TAG).get();
}


@Override
public TriggerResult onElement(OnElementContext c)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.dataflow.sdk.util;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.common.base.MoreObjects;

Expand Down Expand Up @@ -125,9 +127,9 @@ public static class TimerData implements Comparable<TimerData> {
private final TimeDomain domain;

private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
this.namespace = namespace;
this.timestamp = timestamp;
this.domain = domain;
this.namespace = checkNotNull(namespace);
this.timestamp = checkNotNull(timestamp);
this.domain = checkNotNull(domain);
}

public StateNamespace getNamespace() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@

package com.google.cloud.dataflow.sdk.transforms.windowing;

import static com.google.cloud.dataflow.sdk.WindowMatchers.isSingleWindowedValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import com.google.cloud.dataflow.sdk.WindowMatchers;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.MergeResult;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.TriggerResult;
import com.google.cloud.dataflow.sdk.util.ReduceFnTester;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import com.google.cloud.dataflow.sdk.util.TriggerTester;
import com.google.cloud.dataflow.sdk.util.TriggerTester.SimpleTriggerTester;

import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
Expand All @@ -50,16 +48,14 @@ public class AfterAllTest {
@Mock private OnceTrigger<IntervalWindow> mockTrigger1;
@Mock private OnceTrigger<IntervalWindow> mockTrigger2;

private ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester;
private SimpleTriggerTester<IntervalWindow> tester;
private IntervalWindow firstWindow;

public void setUp(WindowFn<?, IntervalWindow> windowFn) throws Exception {
MockitoAnnotations.initMocks(this);
tester = ReduceFnTester.nonCombining(
windowFn,
tester = TriggerTester.forTrigger(
AfterAll.of(mockTrigger1, mockTrigger2),
AccumulationMode.DISCARDING_FIRED_PANES,
Duration.millis(100));
windowFn);
firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
}

Expand All @@ -75,36 +71,35 @@ private void injectElement(int element, TriggerResult result1, TriggerResult res
Mockito.<Trigger<IntervalWindow>.OnElementContext>any()))
.thenReturn(result2);
}
tester.injectElements(TimestampedValue.of(element, new Instant(element)));
tester.injectElements(element);
}

@Test
public void testOnElementT1FiresFirst() throws Exception {
setUp(FixedWindows.of(Duration.millis(10)));

injectElement(1, TriggerResult.CONTINUE, TriggerResult.CONTINUE);
assertThat(tester.extractOutput(), Matchers.emptyIterable());
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));

injectElement(2, TriggerResult.FIRE_AND_FINISH, TriggerResult.CONTINUE);
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));

injectElement(3, null, TriggerResult.FIRE_AND_FINISH);
assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1, 2, 3), 1, 0, 10)));
assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(firstWindow));

tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}

@Test
public void testOnElementT2FiresFirst() throws Exception {
setUp(FixedWindows.of(Duration.millis(10)));

injectElement(1, TriggerResult.CONTINUE, TriggerResult.FIRE_AND_FINISH);
assertThat(tester.extractOutput(), Matchers.emptyIterable());
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));

injectElement(2, TriggerResult.FIRE_AND_FINISH, null);
assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1, 2), 1, 0, 10)));
assertTrue(tester.isMarkedFinished(firstWindow));
assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE_AND_FINISH));

tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
assertTrue(tester.isMarkedFinished(firstWindow));
}

@SuppressWarnings("unchecked")
Expand All @@ -113,25 +108,22 @@ public void testOnTimerFire() throws Exception {
setUp(FixedWindows.of(Duration.millis(10)));

injectElement(1, TriggerResult.CONTINUE, TriggerResult.FIRE_AND_FINISH);
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));

when(mockTrigger1.onTimer(Mockito.<Trigger<IntervalWindow>.OnTimerContext>any()))
.thenReturn(TriggerResult.FIRE_AND_FINISH);
tester.fireTimer(firstWindow, new Instant(11), TimeDomain.EVENT_TIME);
tester.advanceInputWatermark(new Instant(12));

assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1), 1, 0, 10)));
assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(firstWindow));

tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}

@SuppressWarnings("unchecked")
@Test
public void testOnTimerFireAndFinish() throws Exception {
setUp(FixedWindows.of(Duration.millis(10)));

injectElement(1, TriggerResult.CONTINUE, TriggerResult.CONTINUE);
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));

when(mockTrigger1.onTimer(Mockito.<Trigger<IntervalWindow>.OnTimerContext>any()))
.thenReturn(TriggerResult.CONTINUE);
Expand All @@ -140,16 +132,12 @@ public void testOnTimerFireAndFinish() throws Exception {
tester.fireTimer(firstWindow, new Instant(11), TimeDomain.EVENT_TIME);

tester.advanceInputWatermark(new Instant(12));
assertThat(tester.extractOutput(), Matchers.emptyIterable());
assertThat(tester.getLatestResult(), equalTo(TriggerResult.CONTINUE));
assertFalse(tester.isMarkedFinished(firstWindow));

injectElement(2, TriggerResult.FIRE_AND_FINISH, null);
assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1, 2), 1, 0, 10)));

assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(firstWindow));

tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}

@Test
Expand All @@ -160,22 +148,17 @@ public void testOnMergeFires() throws Exception {
.thenReturn(TriggerResult.CONTINUE);
when(mockTrigger2.onElement(Mockito.<Trigger<IntervalWindow>.OnElementContext>any()))
.thenReturn(TriggerResult.CONTINUE);
tester.injectElements(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(5, new Instant(12)));
tester.injectElements(1, 5);
assertThat(tester.getResultSequence(), everyItem(equalTo(TriggerResult.CONTINUE)));

when(mockTrigger1.onMerge(Mockito.<Trigger<IntervalWindow>.OnMergeContext>any()))
.thenReturn(MergeResult.ALREADY_FINISHED);
when(mockTrigger2.onMerge(Mockito.<Trigger<IntervalWindow>.OnMergeContext>any()))
.thenReturn(MergeResult.FIRE_AND_FINISH);
tester.injectElements(
TimestampedValue.of(12, new Instant(5)));
tester.mergeWindows();

assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1, 5, 12), 1, 1, 22)));
assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(22))));
tester.assertHasOnlyGlobalAndFinishedSetsFor(
new IntervalWindow(new Instant(1), new Instant(22)));
assertThat(tester.getLatestMergeResult(), equalTo(MergeResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(15))));
}

@Test
Expand All @@ -186,22 +169,18 @@ public void testOnMergeFiresNotAlreadyFinished() throws Exception {
.thenReturn(TriggerResult.CONTINUE);
when(mockTrigger2.onElement(Mockito.<Trigger<IntervalWindow>.OnElementContext>any()))
.thenReturn(TriggerResult.CONTINUE);
tester.injectElements(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(5, new Instant(12)));
tester.injectElements(1, 12);
assertThat(tester.getResultSequence(), everyItem(equalTo(TriggerResult.CONTINUE)));

when(mockTrigger1.onMerge(Mockito.<Trigger<IntervalWindow>.OnMergeContext>any()))
.thenReturn(MergeResult.ALREADY_FINISHED);
when(mockTrigger2.onMerge(Mockito.<Trigger<IntervalWindow>.OnMergeContext>any()))
.thenReturn(MergeResult.ALREADY_FINISHED);
tester.injectElements(
TimestampedValue.of(12, new Instant(5)));
tester.injectElements(5);
tester.mergeWindows();

assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(1, 5, 12), 1, 1, 22)));
assertThat(tester.getLatestMergeResult(), equalTo(MergeResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(22))));
tester.assertHasOnlyGlobalAndFinishedSetsFor(
new IntervalWindow(new Instant(1), new Instant(22)));
}

@Test
Expand All @@ -219,83 +198,58 @@ public void testFireDeadline() throws Exception {

@Test
public void testAfterAllRealTriggersFixedWindow() throws Exception {
tester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(50)),
Repeatedly.<IntervalWindow>forever(
AfterAll.<IntervalWindow>of(
AfterPane.<IntervalWindow>elementCountAtLeast(5),
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)))),
AccumulationMode.DISCARDING_FIRED_PANES,
Duration.millis(100));
tester = TriggerTester.forTrigger(Repeatedly.<IntervalWindow>forever(
AfterAll.<IntervalWindow>of(
AfterPane.<IntervalWindow>elementCountAtLeast(5),
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)))),
FixedWindows.of(Duration.millis(50)));

tester.advanceProcessingTime(new Instant(0));
// 6 elements -> after pane fires
tester.injectElements(
TimestampedValue.of(0, new Instant(0)),
TimestampedValue.of(1, new Instant(0)),
TimestampedValue.of(2, new Instant(1)),
TimestampedValue.of(3, new Instant(1)),
TimestampedValue.of(4, new Instant(1)),
TimestampedValue.of(5, new Instant(2)));

assertThat(tester.extractOutput(), Matchers.emptyIterable());
tester.injectElements(0, 0, 1, 1, 1, 2);

assertThat(tester.getResultSequence(), everyItem(equalTo(TriggerResult.CONTINUE)));
tester.advanceProcessingTime(new Instant(6));

assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(0, 1, 2, 3, 4, 5), 0, 0, 50)));
assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE));

// 4 elements, advance processing time, then deliver the last elem
tester.clearResultSequence();
tester.advanceProcessingTime(new Instant(15));
tester.injectElements(
TimestampedValue.of(6, new Instant(2)),
TimestampedValue.of(7, new Instant(3)),
TimestampedValue.of(8, new Instant(4)),
TimestampedValue.of(9, new Instant(5)));
tester.injectElements(2, 3, 4, 5);
tester.advanceProcessingTime(new Instant(21));
assertThat(tester.extractOutput(), Matchers.emptyIterable());
tester.injectElements(
TimestampedValue.of(10, new Instant(6)));
assertThat(tester.extractOutput(), Matchers.contains(
isSingleWindowedValue(Matchers.containsInAnyOrder(6, 7, 8, 9, 10), 2, 0, 50)));
assertThat(tester.getResultSequence(), everyItem(equalTo(TriggerResult.CONTINUE)));
tester.injectElements(6);
assertThat(tester.getLatestResult(), equalTo(TriggerResult.FIRE));

assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(0), new Instant(50))));
// We're holding some finished bits, but that should be it.
tester.assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(
new IntervalWindow(new Instant(0), new Instant(50)));
}

@Test
public void testAfterAllMergingWindowSomeFinished() throws Exception {
Duration windowDuration = Duration.millis(10);
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
Sessions.withGapDuration(windowDuration),
tester = TriggerTester.forTrigger(
AfterAll.<IntervalWindow>of(
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
AfterPane.<IntervalWindow>elementCountAtLeast(5)),
AccumulationMode.ACCUMULATING_FIRED_PANES,
Duration.millis(100));
Sessions.withGapDuration(windowDuration));

tester.advanceProcessingTime(new Instant(10));
tester.injectElements(
TimestampedValue.of(1, new Instant(1))); // in [1, 11), timer for 15
tester.injectElements(1); // in [1, 11), timer for 15
tester.advanceProcessingTime(new Instant(16));
tester.injectElements(
TimestampedValue.of(2, new Instant(1)), // in [1, 11) count = 1
TimestampedValue.of(3, new Instant(2))); // in [2, 12), timer for 16
1, // in [1, 11) count = 1
2); // in [2, 12), timer for 16

// Enough data comes in for 2 that combined, we should fire
tester.injectElements(
TimestampedValue.of(4, new Instant(2)),
TimestampedValue.of(5, new Instant(2)));
tester.injectElements(2, 2);
tester.mergeWindows();

// This fires, because the earliest element in [1, 12) arrived at time 10
assertThat(tester.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(
Matchers.containsInAnyOrder(1, 2, 3, 4, 5), 1, 1, 12)));

assertThat(tester.getLatestMergeResult(), equalTo(MergeResult.FIRE_AND_FINISH));
assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
tester.assertHasOnlyGlobalAndFinishedSetsFor(
new IntervalWindow(new Instant(1), new Instant(12)));
}

@Test
Expand Down
Loading

0 comments on commit 44e3e98

Please sign in to comment.