Skip to content

Commit

Permalink
[FLINK-9503] Migrate integration tests for iterative aggregators
Browse files Browse the repository at this point in the history
This closes apache#6129.
  • Loading branch information
yanghua authored and tillrohrmann committed Jul 13, 2018
1 parent fc49801 commit d783c62
Showing 1 changed file with 39 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
Expand All @@ -48,6 +46,9 @@
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;

Expand All @@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
private static final int parallelism = 2;
private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";

private static String testString = "Et tu, Brute?";
private static String testName = "testing_caesar";
private static String testPath;

public AggregatorsITCase(TestExecutionMode mode){
super(mode);
}

private String resultPath;
private String expected;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testDistributedCacheWithIterations() throws Exception{
final String testString = "Et tu, Brute?";
final String testName = "testing_caesar";

@Before
public void before() throws Exception{
final File folder = tempFolder.newFolder();
final File resultFile = new File(folder, UUID.randomUUID().toString());
testPath = resultFile.toString();
resultPath = resultFile.toURI().toString();
}

@After
public void after() throws Exception{
compareResultsByLinesInMemory(expected, resultPath);
}
String testPath = resultFile.toString();
String resultPath = resultFile.toURI().toString();

@Test
public void testDistributedCacheWithIterations() throws Exception{
File tempFile = new File(testPath);
try (FileWriter writer = new FileWriter(tempFile)) {
writer.write(testString);
Expand All @@ -117,7 +107,6 @@ public boolean filter(Long value) throws Exception {
}
}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
env.execute();
expected = testString; // this will be a useless verification now.
}

@Test
Expand All @@ -141,12 +130,12 @@ public void testAggregatorWithoutParameterForIterate() throws Exception {
new NegativeElementsConvergenceCriterion());

DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
iteration.closeWith(updatedDs).writeAsText(resultPath);
env.execute();
List<Integer> result = iteration.closeWith(updatedDs).collect();
Collections.sort(result);

expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);

assertEquals(expected, result);
}

@Test
Expand All @@ -170,12 +159,12 @@ public void testAggregatorWithParameterForIterate() throws Exception {
new NegativeElementsConvergenceCriterion());

DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
iteration.closeWith(updatedDs).writeAsText(resultPath);
env.execute();
List<Integer> result = iteration.closeWith(updatedDs).collect();
Collections.sort(result);

expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);

assertEquals(expected, result);
}

@Test
Expand All @@ -199,12 +188,12 @@ public void testConvergenceCriterionWithParameterForIterate() throws Exception {
new NegativeElementsConvergenceCriterionWithParam(3));

DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
iteration.closeWith(updatedDs).writeAsText(resultPath);
env.execute();
List<Integer> result = iteration.closeWith(updatedDs).collect();
Collections.sort(result);

expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);

assertEquals(expected, result);
}

@Test
Expand All @@ -231,14 +220,12 @@ public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
.where(0).equalTo(0).flatMap(new UpdateFilter());

DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
result.writeAsText(resultPath);
List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
Collections.sort(result);

env.execute();
List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);

expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
assertEquals(expected, result);
}

@Test
Expand All @@ -265,14 +252,12 @@ public void testAggregatorWithParameterForIterateDelta() throws Exception {
.where(0).equalTo(0).flatMap(new UpdateFilter());

DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
result.writeAsText(resultPath);
List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
Collections.sort(result);

env.execute();
List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);

expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
assertEquals(result, expected);
}

@Test
Expand Down Expand Up @@ -303,14 +288,12 @@ public void testConvergenceCriterionWithParameterForIterateDelta() throws Except
.where(0).equalTo(0).projectFirst(0, 1);

DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
result.writeAsText(resultPath);
List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
Collections.sort(result);

env.execute();
List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);

expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
assertEquals(expected, result);
}

@SuppressWarnings("serial")
Expand Down

0 comments on commit d783c62

Please sign in to comment.