Skip to content

Commit

Permalink
[hotfix][runtime] Adds shutdown logic for the watchExecutorService to…
Browse files Browse the repository at this point in the history
… the k8s HA services

Signed-off-by: Matthias Pohl <[email protected]>
  • Loading branch information
XComp committed Jul 6, 2023
1 parent a46cf26 commit 08d0e6b
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand All @@ -50,6 +53,9 @@
/** Kubernetes HA services that use a single leader election service per JobManager. */
public class KubernetesMultipleComponentLeaderElectionHaServices extends AbstractHaServices {

private static final Logger LOG =
LoggerFactory.getLogger(KubernetesMultipleComponentLeaderElectionHaServices.class);

private final String clusterId;

private final FlinkKubeClient kubeClient;
Expand Down Expand Up @@ -178,6 +184,12 @@ public void internalClose() throws Exception {

private void closeK8sServices() {
configMapSharedWatcher.close();
final int outstandingTaskCount = watchExecutorService.shutdownNow().size();
if (outstandingTaskCount != 0) {
LOG.debug(
"The k8s HA services were closed with {} event(s) still not being processed. No further action necessary.",
outstandingTaskCount);
}
}

@Override
Expand Down

0 comments on commit 08d0e6b

Please sign in to comment.