Skip to content

Commit

Permalink
[FLINK-4651] Ensure processing-time timers are set on restore
Browse files Browse the repository at this point in the history
This test ensures that we set a low-level processing time timer in case
we have processing-time timers set.
  • Loading branch information
aljoscha committed Jan 9, 2017
1 parent 9cff8c9 commit 36cf7eb
Showing 1 changed file with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,57 @@ public void testProcessingTimeTimersDontInterfere() throws Exception {
contains("ON_PROC_TIME:CIAO"));
}

/**
* Verify that a low-level timer is set for processing-time timers in case of restore.
*/
@Test
public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
TestOperator testOperator = new TestOperator();

KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);

testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);

testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);

testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);

OperatorStateHandles snapshot = testHarness.snapshot(0, 0);

TestOperator testOperator1 = new TestOperator();

KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
new KeyedOneInputStreamOperatorTestHarness<>(
testOperator1,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO);

testHarness1.setProcessingTime(0L);

testHarness1.setup();
testHarness1.initializeState(snapshot);
testHarness1.open();

testHarness1.setProcessingTime(10L);

assertThat(
extractResult(testHarness1),
contains("ON_PROC_TIME:HELLO"));

testHarness1.setProcessingTime(20L);

assertThat(
extractResult(testHarness1),
contains("ON_PROC_TIME:CIAO"));
}


/**
* Verify that timers for the different time domains don't clash.
*/
Expand Down

0 comments on commit 36cf7eb

Please sign in to comment.