Skip to content

Commit

Permalink
[FLINK-17498][tests] Increase CancelingTestBase rpc timeout to config…
Browse files Browse the repository at this point in the history
…ured Akka ask timeout

This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of
200s as the rpc timeout.

This closes apache#12531.
  • Loading branch information
tillrohrmann committed Jun 10, 2020
1 parent 20e82af commit 1839fa5
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand All @@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger {

protected static final int PARALLELISM = 4;

protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds
private static final Configuration configuration = getConfiguration();

// --------------------------------------------------------------------------------------------

@ClassRule
public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setConfiguration(configuration)
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(4)
.build());
Expand Down Expand Up @@ -93,15 +94,17 @@ protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxT
// submit job
final JobGraph jobGraph = getJobGraph(plan);

final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds();

ClusterClient<?> client = CLUSTER.getClusterClient();
JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph);

Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
Expand All @@ -113,10 +116,10 @@ protected void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxT

Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow();

JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) {
Thread.sleep(50);
jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS);
}
if (jobStatusAfterCancel != JobStatus.CANCELED) {
Assert.fail("Failed to cancel job with ID " + jobSubmissionResult.getJobID() + '.');
Expand Down

0 comments on commit 1839fa5

Please sign in to comment.