From 36cf7eb2de5be992f1716360b7ca4f010a7c2a58 Mon Sep 17 00:00:00 2001
From: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date: Mon, 9 Jan 2017 16:01:23 +0100
Subject: [PATCH] [FLINK-4651] Ensure processing-time timers are set on restore

This test ensures that we set a low-level processing time timer in case
we have processing-time timers set.
---
 .../operators/AbstractStreamOperatorTest.java | 51 +++++++++++++++++++
 1 file changed, 51 insertions(+)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 2fb0089ac2012..2844fbb6808a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -140,6 +140,57 @@ public void testProcessingTimeTimersDontInterfere() throws Exception {
 				contains("ON_PROC_TIME:CIAO"));
 	}
 
+	/**
+	 * Verify that a low-level timer is set for processing-time timers in case of restore.
+	 */
+	@Test
+	public void testEnsureProcessingTimeTimerRegisteredOnRestore() throws Exception {
+		TestOperator testOperator = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new Tuple2<>(1, "SET_PROC_TIME_TIMER:20"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_STATE:HELLO"), 0);
+		testHarness.processElement(new Tuple2<>(1, "SET_STATE:CIAO"), 0);
+
+		testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0);
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		TestOperator testOperator1 = new TestOperator();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> testHarness1 =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						testOperator1,
+						new TestKeySelector(),
+						BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness1.setProcessingTime(0L);
+
+		testHarness1.setup();
+		testHarness1.initializeState(snapshot);
+		testHarness1.open();
+
+		testHarness1.setProcessingTime(10L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:HELLO"));
+
+		testHarness1.setProcessingTime(20L);
+
+		assertThat(
+				extractResult(testHarness1),
+				contains("ON_PROC_TIME:CIAO"));
+	}
+
+
 	/**
 	 * Verify that timers for the different time domains don't clash.
 	 */