diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java index bd42ac287a5c3..e449ab8d205c3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java @@ -35,10 +35,8 @@ import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -48,6 +46,9 @@ import java.io.File; import java.io.FileReader; import java.io.FileWriter; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.UUID; @@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { private static final int parallelism = 2; private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements"; - private static String testString = "Et tu, Brute?"; - private static String testName = "testing_caesar"; - private static String testPath; - public AggregatorsITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; - private String expected; + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + @Test + public void testDistributedCacheWithIterations() throws Exception{ + final String testString = "Et tu, Brute?"; + final String testName = "testing_caesar"; - @Before - public void before() throws Exception{ final File folder = tempFolder.newFolder(); final File resultFile = new File(folder, UUID.randomUUID().toString()); - testPath = resultFile.toString(); - resultPath = resultFile.toURI().toString(); - } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } + String testPath = resultFile.toString(); + String resultPath = resultFile.toURI().toString(); - @Test - public void testDistributedCacheWithIterations() throws Exception{ File tempFile = new File(testPath); try (FileWriter writer = new FileWriter(tempFile)) { writer.write(testString); @@ -117,7 +107,6 @@ public boolean filter(Long value) throws Exception { } }).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat()); env.execute(); - expected = testString; // this will be a useless verification now. } @Test @@ -141,12 +130,12 @@ public void testAggregatorWithoutParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMap()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); - expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + List expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1); + + assertEquals(expected, result); } @Test @@ -170,12 +159,12 @@ public void testAggregatorWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); - expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + List expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1); + + assertEquals(expected, result); } @Test @@ -199,12 +188,12 @@ public void testConvergenceCriterionWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterionWithParam(3)); DataSet updatedDs = iteration.map(new SubtractOneMap()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); - expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + List expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1); + + assertEquals(expected, result); } @Test @@ -231,14 +220,12 @@ public void testAggregatorWithoutParameterForIterateDelta() throws Exception { .where(0).equalTo(0).flatMap(new UpdateFilter()); DataSet> iterationRes = iteration.closeWith(newElements, newElements); - DataSet result = iterationRes.map(new ProjectSecondMapper()); - result.writeAsText(resultPath); + List result = iterationRes.map(new ProjectSecondMapper()).collect(); + Collections.sort(result); - env.execute(); + List expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5); - expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" - + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" - + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; + assertEquals(expected, result); } @Test @@ -265,14 +252,12 @@ public void testAggregatorWithParameterForIterateDelta() throws Exception { .where(0).equalTo(0).flatMap(new UpdateFilter()); DataSet> iterationRes = iteration.closeWith(newElements, newElements); - DataSet result = iterationRes.map(new ProjectSecondMapper()); - result.writeAsText(resultPath); + List result = iterationRes.map(new ProjectSecondMapper()).collect(); + Collections.sort(result); - env.execute(); + List expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5); - expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" - + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" - + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; + assertEquals(result, expected); } @Test @@ -303,14 +288,12 @@ public void testConvergenceCriterionWithParameterForIterateDelta() throws Except .where(0).equalTo(0).projectFirst(0, 1); DataSet> iterationRes = iteration.closeWith(newElements, newElements); - DataSet result = iterationRes.map(new ProjectSecondMapper()); - result.writeAsText(resultPath); + List result = iterationRes.map(new ProjectSecondMapper()).collect(); + Collections.sort(result); - env.execute(); + List expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1); - expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + assertEquals(expected, result); } @SuppressWarnings("serial")