From d7bc8e29520907a2a7ac3b741d49784579d83316 Mon Sep 17 00:00:00 2001
From: David Moravek <dmvk@apache.org>
Date: Wed, 26 Jan 2022 11:57:09 +0100
Subject: [PATCH] [FLINK-25673][tests] Wait until all slots are freed

---
 .../runtime/minicluster/MiniCluster.java      | 21 ++++++++++
 .../testutils/MiniClusterResource.java        | 39 ++++++++++++++++---
 .../scheduling/AdaptiveSchedulerITCase.java   |  7 ++--
 3 files changed, 58 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 19aa299aebc06..fcbb05a24ca37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -77,6 +77,7 @@
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -828,6 +829,12 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 jobId, operatorId, serializedRequest, rpcTimeout));
     }
 
+    public CompletableFuture<ResourceOverview> getResourceOverview() {
+        return runResourceManagerCommand(
+                resourceManagerGateway ->
+                        resourceManagerGateway.requestResourceOverview(rpcTimeout));
+    }
+
     private <T> CompletableFuture<T> runDispatcherCommand(
             Function<DispatcherGateway, CompletableFuture<T>> dispatcherCommand) {
         return getDispatcherGatewayFuture()
@@ -835,6 +842,13 @@ private <T> CompletableFuture<T> runDispatcherCommand(
                 .thenCompose(Function.identity());
     }
 
+    private <T> CompletableFuture<T> runResourceManagerCommand(
+            Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
+        return getResourceManagerGatewayFuture()
+                .thenApply(resourceManagerCommand)
+                .thenCompose(Function.identity());
+    }
+
     // ------------------------------------------------------------------------
     //  running jobs
     // ------------------------------------------------------------------------
@@ -955,6 +969,13 @@ protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
         }
     }
 
+    private CompletableFuture<ResourceManagerGateway> getResourceManagerGatewayFuture() {
+        synchronized (lock) {
+            checkState(running, "MiniCluster is not yet running or has already been shut down.");
+            return resourceManagerGatewayRetriever.getFuture();
+        }
+    }
+
     private CompletableFuture<Void> uploadAndSetJobFiles(
             final CompletableFuture<InetSocketAddress> blobServerAddressFuture,
             final JobGraph job) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index 06f0736dbc2d8..1b30b262a8a7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
@@ -29,6 +30,7 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -99,15 +101,29 @@ public void before() throws Exception {
                         * miniClusterResourceConfiguration.getNumberTaskManagers();
     }
 
+    public void cancelAllJobsAndWaitUntilSlotsAreFreed() {
+        final long heartbeatTimeout =
+                miniCluster.getConfiguration().get(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
+        final long shutdownTimeout =
+                miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds();
+        Preconditions.checkState(
+                heartbeatTimeout < shutdownTimeout,
+                "Heartbeat timeout (%d) needs to be lower than the shutdown timeout (%d) in order to ensure reliable job cancellation and resource cleanup.",
+                heartbeatTimeout,
+                shutdownTimeout);
+        cancelAllJobs(true);
+    }
+
     public void cancelAllJobs() {
+        cancelAllJobs(false);
+    }
+
+    private void cancelAllJobs(boolean waitUntilSlotsAreFreed) {
         try {
+            final long shutdownTimeout =
+                    miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds();
             final Deadline jobCancellationDeadline =
-                    Deadline.fromNow(
-                            Duration.ofMillis(
-                                    miniClusterResourceConfiguration
-                                            .getShutdownTimeout()
-                                            .toMilliseconds()));
-
+                    Deadline.fromNow(Duration.ofMillis(shutdownTimeout));
             final List<CompletableFuture<Acknowledge>> jobCancellationFutures =
                     miniCluster.listJobs()
                             .get(
@@ -137,6 +153,17 @@ public void cancelAllJobs() {
                         return unfinishedJobs == 0;
                     },
                     jobCancellationDeadline);
+
+            if (waitUntilSlotsAreFreed) {
+                CommonTestUtils.waitUntilCondition(
+                        () -> {
+                            final ResourceOverview resourceOverview =
+                                    miniCluster.getResourceOverview().get();
+                            return resourceOverview.getNumberRegisteredSlots()
+                                    == resourceOverview.getNumberFreeSlots();
+                        },
+                        jobCancellationDeadline);
+            }
         } catch (Exception e) {
             log.warn("Exception while shutting down remaining jobs.", e);
         }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 13062e7e2f69e..0538d772ae30c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -26,6 +26,7 @@
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -81,9 +82,9 @@ public class AdaptiveSchedulerITCase extends TestLogger {
 
     private static Configuration getConfiguration() {
         final Configuration conf = new Configuration();
-
         conf.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
-
+        conf.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1_000L);
+        conf.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5_000L);
         return conf;
     }
 
@@ -103,7 +104,7 @@ public void ensureAdaptiveSchedulerEnabled() {
 
     @After
     public void cancelRunningJobs() {
-        MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobs();
+        MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobsAndWaitUntilSlotsAreFreed();
     }
 
     /** Tests that the adaptive scheduler can recover stateful operators. */