Skip to content

Commit

Permalink
[FLINK-20461][tests] Check replication factor before asking for JobRe…
Browse files Browse the repository at this point in the history
…sult

This commit hardens the YARNFileReplicationITCase by checking the replication factor before
asking for the JobResult. If done in the reverse order, then it can happen that the Flink application
has already terminated before doing the file replication check because the per-job mode has already
delivered the JobResult.

This closes apache#16917.
  • Loading branch information
tillrohrmann committed Aug 24, 2021
1 parent fb7fbd8 commit b82acf9
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Test cases for the deployment of Yarn Flink clusters with customized file replication numbers.
Expand Down Expand Up @@ -108,6 +109,8 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws

ApplicationId applicationId = clusterClient.getClusterId();

extraVerification(configuration, applicationId);

final CompletableFuture<JobResult> jobResultCompletableFuture =
clusterClient.requestJobResult(jobGraph.getJobID());

Expand All @@ -125,8 +128,6 @@ private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws
.getClassLoader()));
});

extraVerification(configuration, applicationId);

waitApplicationFinishedElseKillIt(
applicationId,
yarnAppTerminateTimeout,
Expand Down Expand Up @@ -162,6 +163,13 @@ private void extraVerification(Configuration configuration, ApplicationId applic
String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();

Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);

assertTrue(
"The Flink uber jar needs to exist. If it does not exist, then this "
+ "indicates that the Flink cluster has already terminated and Yarn has "
+ "already deleted the working directory.",
fs.exists(uberJarHDFSPath));

FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);

final int flinkFileReplication =
Expand Down

0 comments on commit b82acf9

Please sign in to comment.