Skip to content

Commit

Permalink
[hotfix] [tests] Add re-tries to the result verification via files.
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Dec 12, 2016
1 parent 609c046 commit 57f7747
Showing 1 changed file with 56 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import akka.util.Timeout;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
Expand All @@ -32,10 +34,14 @@
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;

import org.apache.hadoop.fs.FileSystem;

import org.junit.Assert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
Expand Down Expand Up @@ -213,8 +219,11 @@ public static BufferedReader[] getResultReader(String resultPath) throws IOExcep
return getResultReader(resultPath, new String[]{}, false);
}

public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {
public static BufferedReader[] getResultReader(
String resultPath,
String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {

File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);

if (inOrderOfFiles) {
Expand Down Expand Up @@ -268,8 +277,11 @@ public static void readAllResultLines(List<String> target, String resultPath, St
readAllResultLines(target, resultPath, excludePrefixes, false);
}

public static void readAllResultLines(List<String> target, String resultPath,
String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
public static void readAllResultLines(
List<String> target,
String resultPath,
String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {

final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
try {
Expand All @@ -282,12 +294,7 @@ public static void readAllResultLines(List<String> target, String resultPath,
}
finally {
for (BufferedReader reader : readers) {
try {
reader.close();
}
catch (Exception e) {
// ignore, this is best-effort cleanup
}
org.apache.flink.util.IOUtils.closeQuietly(reader);
}
}
}
Expand All @@ -296,19 +303,42 @@ public static void compareResultsByLinesInMemory(String expectedResultStr, Strin
compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[0]);
}

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, resultPath, excludePrefixes, false);
public static void compareResultsByLinesInMemory(
String expectedResultStr,
String resultPath,
String[] excludePrefixes) throws Exception {

String[] result = list.toArray(new String[list.size()]);
Arrays.sort(result);
// because of some strange I/O inconsistency effects on CI infrastructure, we need
// to retry this a few times
final int numAttempts = 5;
int attempt = 0;
while (true) {
try {
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, resultPath, excludePrefixes, false);

String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
Arrays.sort(expected);
String[] result = list.toArray(new String[list.size()]);
Arrays.sort(result);

Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);
String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
Arrays.sort(expected);

Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);

break;
}
catch (AssertionError e) {
if (++attempt > numAttempts) {
throw e;
}

// else wait, then fall through the loop and try again
// on normal setups, this should change nothing, but it seems to help the
// Travis CI container infrastructure
Thread.sleep(100);
}
}
}

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
Expand Down Expand Up @@ -390,19 +420,17 @@ public static <X> void compareResultCollections(List<X> expected, List<X> actual
}
}

private static File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
final String[] exPrefs = excludePrefixes;
File result = asFile(resultPath);
if (!result.exists()) {
Assert.fail("Result file was not written");
}
private static File[] getAllInvolvedFiles(String resultPath, final String[] excludePrefixes) {
final File result = asFile(resultPath);
assertTrue("Result file was not written", result.exists());

if (result.isDirectory()) {
return result.listFiles(new FilenameFilter() {

@Override
public boolean accept(File dir, String name) {
for(String p: exPrefs) {
if(name.startsWith(p)) {
for (String p: excludePrefixes) {
if (name.startsWith(p)) {
return false;
}
}
Expand Down

0 comments on commit 57f7747

Please sign in to comment.