From cf4f22ea5626c1948a69f07e4b7b3bcb4001aed7 Mon Sep 17 00:00:00 2001
From: Stephan Ewen <sewen@apache.org>
Date: Tue, 12 May 2015 22:59:48 +0200
Subject: [PATCH] [tests] Consolidate miscellaneous tests into one
 IntegrationTestCase to reuse minicluster and speed up tests

---
 .../test/misc/DisjointDataflowsITCase.java    |  37 ----
 .../flink/test/misc/GenericTypeInfoTest.java  |   8 +-
 .../test/misc/MiscellaneousIssuesITCase.java  | 177 ++++++++++++++++++
 .../flink/test/misc/NullValuesITCase.java     |  83 --------
 4 files changed, 183 insertions(+), 122 deletions(-)
 delete mode 100644 flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 delete mode 100644 flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java

diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
deleted file mode 100644
index 6f4baa3621bee..0000000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class DisjointDataflowsITCase extends JavaProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// generate two different flows
-		env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
-		env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
-		
-		env.execute();
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index 91c6baa95ffca..fa1fcb655e7eb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -20,9 +20,9 @@
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,7 +30,11 @@ public class GenericTypeInfoTest {
 
 	@Test
 	public void testSerializerTree() {
-		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+		@SuppressWarnings("unchecked")
+		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = 
+				(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) 
+						TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+		
 		String serTree = Utils.getSerializerTree(ti);
 		// We can not test against the entire output because the fields of 'String' differ
 		// between java versions
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
new file mode 100644
index 0000000000000..01e6f624cda04
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the system behavior in multiple corner cases
+ *   - when null records are passed through the system.
+ *   - when disjoint dataflows are executed
+ *   - when accumulators are used chained after a non-udf operator.
+ *   
+ * The tests are bundled into one class to reuse the same test cluster. This speeds
+ * up test execution, as the majority of the test time goes usually into starting/stopping the
+ * test cluster.
+ */
+@SuppressWarnings("serial")
+public class MiscellaneousIssuesITCase {
+
+	private static ForkableFlinkMiniCluster cluster;
+	
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+	
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNullValues() {
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(1);
+			env.getConfig().disableSysoutLogging();
+
+			DataSet<String> data = env.fromElements("hallo")
+					.map(new MapFunction<String, String>() {
+						@Override
+						public String map(String value) throws Exception {
+							return null;
+						}
+					});
+			data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
+
+			try {
+				env.execute();
+				fail("this should fail due to null values.");
+			}
+			catch (ProgramInvocationException e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getCause());
+				assertTrue(e.getCause().getCause() instanceof NullPointerException);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDisjointDataflows() {
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(5);
+			env.getConfig().disableSysoutLogging();
+
+			// generate two different flows
+			env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+			env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testAccumulatorsAfterNoOp() {
+		
+		final String ACC_NAME = "test_accumulator";
+		
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(6);
+			env.getConfig().disableSysoutLogging();
+			
+			env.generateSequence(1, 1000000)
+					.rebalance()
+					.flatMap(new RichFlatMapFunction<Long, Long>() {
+
+						private LongCounter counter;
+
+						@Override
+						public void open(Configuration parameters) {
+							counter = getRuntimeContext().getLongCounter(ACC_NAME);
+						}
+
+						@Override
+						public void flatMap(Long value, Collector<Long> out) {
+							counter.add(1L);
+						}
+					})
+					.output(new DiscardingOutputFormat<Long>());
+
+			JobExecutionResult result = env.execute();
+			
+			assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
deleted file mode 100644
index 6f7d002e99a05..0000000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests how the system behaves when null records are passed through the system.
- */
-@SuppressWarnings("serial")
-public class NullValuesITCase {
-
-	@Test
-	public void testNullValues() {
-		ForkableFlinkMiniCluster cluster = null;
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 7);
-			cluster = new ForkableFlinkMiniCluster(config, false);
-
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
-
-			env.setParallelism(1);
-			env.getConfig().disableSysoutLogging();
-
-			DataSet<String> data = env.fromElements("hallo")
-					.map(new MapFunction<String, String>() {
-						@Override
-						public String map(String value) throws Exception {
-							return null;
-						}
-					});
-			data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
-
-			try {
-				env.execute();
-				fail("this should fail due to null values.");
-			}
-			catch (ProgramInvocationException e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getCause());
-				assertTrue(e.getCause().getCause() instanceof NullPointerException);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (cluster != null) {
-				cluster.shutdown();
-			}
-		}
-	}
-}