Skip to content

Commit

Permalink
[FLINK-25673][tests] Wait until all slots are freed
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk authored Jan 26, 2022
1 parent 0f701b5 commit d7bc8e2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -828,13 +829,26 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
jobId, operatorId, serializedRequest, rpcTimeout));
}

public CompletableFuture<ResourceOverview> getResourceOverview() {
return runResourceManagerCommand(
resourceManagerGateway ->
resourceManagerGateway.requestResourceOverview(rpcTimeout));
}

private <T> CompletableFuture<T> runDispatcherCommand(
Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) {
return getDispatcherGatewayFuture()
.thenApply(dispatcherCommand)
.thenCompose(Function.identity());
}

private <T> CompletableFuture<T> runResourceManagerCommand(
Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
return getResourceManagerGatewayFuture()
.thenApply(resourceManagerCommand)
.thenCompose(Function.identity());
}

// ------------------------------------------------------------------------
// running jobs
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -955,6 +969,13 @@ protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
}
}

private CompletableFuture<ResourceManagerGateway> getResourceManagerGatewayFuture() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running or has already been shut down.");
return resourceManagerGatewayRetriever.getFuture();
}
}

private CompletableFuture<Void> uploadAndSetJobFiles(
final CompletableFuture<InetSocketAddress> blobServerAddressFuture,
final JobGraph job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
Expand Down Expand Up @@ -99,15 +101,29 @@ public void before() throws Exception {
* miniClusterResourceConfiguration.getNumberTaskManagers();
}

public void cancelAllJobsAndWaitUntilSlotsAreFreed() {
final long heartbeatTimeout =
miniCluster.getConfiguration().get(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
final long shutdownTimeout =
miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds();
Preconditions.checkState(
heartbeatTimeout < shutdownTimeout,
"Heartbeat timeout (%d) needs to be lower than the shutdown timeout (%d) in order to ensure reliable job cancellation and resource cleanup.",
heartbeatTimeout,
shutdownTimeout);
cancelAllJobs(true);
}

public void cancelAllJobs() {
cancelAllJobs(false);
}

private void cancelAllJobs(boolean waitUntilSlotsAreFreed) {
try {
final long shutdownTimeout =
miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds();
final Deadline jobCancellationDeadline =
Deadline.fromNow(
Duration.ofMillis(
miniClusterResourceConfiguration
.getShutdownTimeout()
.toMilliseconds()));

Deadline.fromNow(Duration.ofMillis(shutdownTimeout));
final List<CompletableFuture<Acknowledge>> jobCancellationFutures =
miniCluster.listJobs()
.get(
Expand Down Expand Up @@ -137,6 +153,17 @@ public void cancelAllJobs() {
return unfinishedJobs == 0;
},
jobCancellationDeadline);

if (waitUntilSlotsAreFreed) {
CommonTestUtils.waitUntilCondition(
() -> {
final ResourceOverview resourceOverview =
miniCluster.getResourceOverview().get();
return resourceOverview.getNumberRegisteredSlots()
== resourceOverview.getNumberFreeSlots();
},
jobCancellationDeadline);
}
} catch (Exception e) {
log.warn("Exception while shutting down remaining jobs.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.state.FunctionInitializationContext;
Expand Down Expand Up @@ -81,9 +82,9 @@ public class AdaptiveSchedulerITCase extends TestLogger {

private static Configuration getConfiguration() {
final Configuration conf = new Configuration();

conf.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

conf.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1_000L);
conf.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5_000L);
return conf;
}

Expand All @@ -103,7 +104,7 @@ public void ensureAdaptiveSchedulerEnabled() {

@After
public void cancelRunningJobs() {
MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobs();
MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobsAndWaitUntilSlotsAreFreed();
}

/** Tests that the adaptive scheduler can recover stateful operators. */
Expand Down

0 comments on commit d7bc8e2

Please sign in to comment.