diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 19aa299aebc06..fcbb05a24ca37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -77,6 +77,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -828,6 +829,12 @@ public CompletableFuture deliverCoordinationRequestToCoord jobId, operatorId, serializedRequest, rpcTimeout)); } + public CompletableFuture getResourceOverview() { + return runResourceManagerCommand( + resourceManagerGateway -> + resourceManagerGateway.requestResourceOverview(rpcTimeout)); + } + private CompletableFuture runDispatcherCommand( Function> dispatcherCommand) { return getDispatcherGatewayFuture() @@ -835,6 +842,13 @@ private CompletableFuture runDispatcherCommand( .thenCompose(Function.identity()); } + private CompletableFuture runResourceManagerCommand( + Function> resourceManagerCommand) { + return getResourceManagerGatewayFuture() + .thenApply(resourceManagerCommand) + .thenCompose(Function.identity()); + } + // ------------------------------------------------------------------------ // running jobs // ------------------------------------------------------------------------ @@ -955,6 +969,13 @@ protected CompletableFuture getDispatcherGatewayFuture() { } } + private CompletableFuture getResourceManagerGatewayFuture() { + synchronized (lock) { + checkState(running, "MiniCluster is not yet running or has already been shut down."); + return resourceManagerGatewayRetriever.getFuture(); + } + } + private CompletableFuture uploadAndSetJobFiles( final CompletableFuture blobServerAddressFuture, final JobGraph job) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java index 06f0736dbc2d8..1b30b262a8a7c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceOverview; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; @@ -99,15 +101,29 @@ public void before() throws Exception { * miniClusterResourceConfiguration.getNumberTaskManagers(); } + public void cancelAllJobsAndWaitUntilSlotsAreFreed() { + final long heartbeatTimeout = + miniCluster.getConfiguration().get(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); + final long shutdownTimeout = + miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(); + Preconditions.checkState( + heartbeatTimeout < shutdownTimeout, + "Heartbeat timeout (%d) needs to be lower than the shutdown timeout (%d) in order to ensure reliable job cancellation and resource cleanup.", + heartbeatTimeout, + shutdownTimeout); + cancelAllJobs(true); + } + public void cancelAllJobs() { + cancelAllJobs(false); + } + + private void cancelAllJobs(boolean waitUntilSlotsAreFreed) { try { + final long shutdownTimeout = + miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(); final Deadline jobCancellationDeadline = - Deadline.fromNow( - Duration.ofMillis( - miniClusterResourceConfiguration - .getShutdownTimeout() - .toMilliseconds())); - + Deadline.fromNow(Duration.ofMillis(shutdownTimeout)); final List> jobCancellationFutures = miniCluster.listJobs() .get( @@ -137,6 +153,17 @@ public void cancelAllJobs() { return unfinishedJobs == 0; }, jobCancellationDeadline); + + if (waitUntilSlotsAreFreed) { + CommonTestUtils.waitUntilCondition( + () -> { + final ResourceOverview resourceOverview = + miniCluster.getResourceOverview().get(); + return resourceOverview.getNumberRegisteredSlots() + == resourceOverview.getNumberFreeSlots(); + }, + jobCancellationDeadline); + } } catch (Exception e) { log.warn("Exception while shutting down remaining jobs.", e); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index 13062e7e2f69e..0538d772ae30c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -81,9 +82,9 @@ public class AdaptiveSchedulerITCase extends TestLogger { private static Configuration getConfiguration() { final Configuration conf = new Configuration(); - conf.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); - + conf.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1_000L); + conf.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5_000L); return conf; } @@ -103,7 +104,7 @@ public void ensureAdaptiveSchedulerEnabled() { @After public void cancelRunningJobs() { - MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobs(); + MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobsAndWaitUntilSlotsAreFreed(); } /** Tests that the adaptive scheduler can recover stateful operators. */