Skip to content

Commit

Permalink
[FLINK-33598][k8s] Watch HA configmap via name instead of lables to r…
Browse files Browse the repository at this point in the history
…educe pressure on APIserver
  • Loading branch information
Myasuka committed Nov 24, 2023
1 parent a1d4dc0 commit 608546e
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -74,9 +73,7 @@ public class KubernetesLeaderElectionHaServices extends AbstractHaServices {
this(
kubeClient,
kubeClient.createConfigMapSharedWatcher(
KubernetesUtils.getConfigMapLabels(
configuration.get(KubernetesConfigOptions.CLUSTER_ID),
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)),
getClusterConfigMap(configuration.get(KubernetesConfigOptions.CLUSTER_ID))),
Executors.newCachedThreadPool(
new ExecutorThreadFactory("config-map-watch-handler")),
ioExecutor,
Expand Down Expand Up @@ -202,11 +199,7 @@ public void internalCleanup() throws Exception {
exception = e;
}

kubeClient
.deleteConfigMapsByLabels(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
.get();
kubeClient.deleteConfigMap(getClusterConfigMap()).get();

ExceptionUtils.tryRethrowException(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,6 @@ private CompletableFuture<Boolean> attemptCheckAndUpdateConfigMap(
kubeClientExecutorService);
}

@Override
public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, String> labels) {
// the only time, the delete method returns false is due to a 404 HTTP status which is
// returned if the underlying resource doesn't exist
return CompletableFuture.runAsync(
() -> this.internalClient.configMaps().withLabels(labels).delete(),
kubeClientExecutorService);
}

@Override
public CompletableFuture<Void> deleteConfigMap(String configMapName) {
// the only time, the delete method returns false is due to a 404 HTTP status which is
Expand All @@ -360,9 +351,9 @@ public CompletableFuture<Void> deleteConfigMap(String configMapName) {
}

@Override
public KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(
Map<String, String> labels) {
return new KubernetesConfigMapSharedInformer(this.internalClient, labels);
public KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(String name) {
LOG.info("Creating configmap shared watcher for {}.", name);
return new KubernetesConfigMapSharedInformer(this.internalClient, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,6 @@ CompletableFuture<Boolean> checkAndUpdateConfigMap(
String configMapName,
Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateFunction);

/**
* Delete the Kubernetes ConfigMaps by labels. This will be used by the HA service to clean up
* all data.
*
* @param labels labels to filter the resources. e.g. type: high-availability
* @return Return the delete future that only completes successfully, if the resources that are
* subject to deletion are actually gone.
*/
CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, String> labels);

/**
* Delete a Kubernetes ConfigMap by name.
*
Expand All @@ -183,13 +173,12 @@ CompletableFuture<Boolean> checkAndUpdateConfigMap(
CompletableFuture<Void> deleteConfigMap(String configMapName);

/**
* Create a shared watcher for ConfigMaps with specified labels.
* Create a shared watcher for ConfigMaps with specified name.
*
* @param labels labels to filter ConfigMaps. If the labels is null or empty, all ConfigMaps
* will be taken in account, or it will be rejected if the implementation does not allow it.
* @param name name of the ConfigMap to watch.
* @return Return a shared watcher.
*/
KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(Map<String, String> labels);
KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(String name);

/** Close the Kubernetes client with no exception. */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@
package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.Informable;

import java.util.Map;

/** The shared informer for {@link ConfigMap}, it can be used as a shared watcher. */
public class KubernetesConfigMapSharedInformer
extends KubernetesSharedInformer<ConfigMap, KubernetesConfigMap>
implements KubernetesConfigMapSharedWatcher {

public KubernetesConfigMapSharedInformer(
NamespacedKubernetesClient client, Map<String, String> labels) {
super(client, getInformableConfigMaps(client, labels), KubernetesConfigMap::new);
public KubernetesConfigMapSharedInformer(NamespacedKubernetesClient client, String name) {
super(client, getInformabaleConfigMaps(client, name), KubernetesConfigMap::new);
}

private static Informable<ConfigMap> getInformableConfigMaps(
NamespacedKubernetesClient client, Map<String, String> labels) {
private static Informable<ConfigMap> getInformabaleConfigMaps(
NamespacedKubernetesClient client, String name) {
Preconditions.checkArgument(
!CollectionUtil.isNullOrEmpty(labels), "Labels must not be null or empty");
return client.configMaps().withLabels(labels);
!StringUtils.isNullOrWhitespaceOnly(name), "Name must not be null or empty");
return client.configMaps().withName(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -89,7 +88,6 @@ protected class Context {
final Configuration configuration;

final CompletableFuture<Void> closeKubeClientFuture;
final CompletableFuture<Map<String, String>> deleteConfigMapByLabelsFuture;

Context() throws Exception {
kubernetesTestFixture =
Expand All @@ -102,8 +100,6 @@ protected class Context {
flinkKubeClient = kubernetesTestFixture.getFlinkKubeClient();
configuration = kubernetesTestFixture.getConfiguration();
closeKubeClientFuture = kubernetesTestFixture.getCloseKubeClientFuture();
deleteConfigMapByLabelsFuture =
kubernetesTestFixture.getDeleteConfigMapByLabelsFuture();
electionEventHandler = new TestingLeaderElectionListener();
leaderElectionDriver = createLeaderElectionDriver();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionEvent;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener;
Expand All @@ -43,7 +42,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -87,9 +85,7 @@ void testLeaderElectionAndRetrieval() throws Exception {
final List<AutoCloseable> closeables = new ArrayList<>();

final KubernetesConfigMapSharedWatcher configMapSharedWatcher =
flinkKubeClient.createConfigMapSharedWatcher(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
flinkKubeClient.createConfigMapSharedWatcher(configMapName);
closeables.add(configMapSharedWatcher);

final TestingLeaderElectionListener electionEventHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.concurrent.FutureUtils;

import java.util.ArrayList;
Expand All @@ -38,7 +37,6 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector.LEADER_ANNOTATION_KEY;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.assertj.core.api.Assertions.assertThat;

/** Test fixture for Kubernetes tests that sets up a mock {@link FlinkKubeClient}. */
Expand All @@ -58,8 +56,6 @@ class KubernetesTestFixture {
private final List<TestingFlinkKubeClient.MockKubernetesWatch> configMapWatches =
new ArrayList<>();

private final CompletableFuture<Map<String, String>> deleteConfigMapByLabelsFuture =
new CompletableFuture<>();
private final CompletableFuture<Void> closeKubeClientFuture = new CompletableFuture<>();

private final CompletableFuture<KubernetesLeaderElector.LeaderCallbackHandler>
Expand All @@ -76,10 +72,7 @@ class KubernetesTestFixture {
configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);

flinkKubeClient = createFlinkKubeClient();
configMapSharedWatcher =
flinkKubeClient.createConfigMapSharedWatcher(
KubernetesUtils.getConfigMapLabels(
clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
configMapSharedWatcher = flinkKubeClient.createConfigMapSharedWatcher(leaderConfigmapName);
}

void close() {
Expand All @@ -94,10 +87,6 @@ CompletableFuture<Void> getCloseKubeClientFuture() {
return closeKubeClientFuture;
}

CompletableFuture<Map<String, String>> getDeleteConfigMapByLabelsFuture() {
return deleteConfigMapByLabelsFuture;
}

KubernetesConfigMapSharedWatcher getConfigMapSharedWatcher() {
return configMapSharedWatcher;
}
Expand Down Expand Up @@ -178,19 +167,6 @@ TestingFlinkKubeClient.Builder createFlinkKubeClientBuilder() {
configMapStore.remove(name);
return FutureUtils.completedVoidFuture();
})
.setDeleteConfigMapByLabelFunction(
labels -> {
if (deleteConfigMapByLabelsFuture.isDone()) {
return FutureUtils.completedExceptionally(
new KubernetesException(
"ConfigMap with labels "
+ labels
+ " has already be deleted."));
} else {
deleteConfigMapByLabelsFuture.complete(labels);
return FutureUtils.completedVoidFuture();
}
})
.setCloseConsumer(closeKubeClientFuture::complete)
.setCreateLeaderElectorFunction(
(leaderConfig, callbackHandler) -> {
Expand All @@ -199,12 +175,11 @@ TestingFlinkKubeClient.Builder createFlinkKubeClientBuilder() {
leaderConfig, callbackHandler);
})
.setCreateConfigMapSharedWatcherFunction(
(labels) -> {
name -> {
final TestingFlinkKubeClient.TestingKubernetesConfigMapSharedWatcher
watcher =
new TestingFlinkKubeClient
.TestingKubernetesConfigMapSharedWatcher(
labels);
.TestingKubernetesConfigMapSharedWatcher(name);
watcher.setWatchFunction(
(ignore, handler) -> {
final CompletableFuture<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,21 +490,6 @@ void testCreateConfigMapAlreadyExisting() throws Exception {
.containsEntry(TESTING_CONFIG_MAP_KEY, TESTING_CONFIG_MAP_VALUE);
}

@Test
void testDeleteConfigMapByLabels() throws Exception {
this.flinkKubeClient.createConfigMap(buildTestingConfigMap()).get();
assertThat(this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME)).isPresent();
this.flinkKubeClient.deleteConfigMapsByLabels(TESTING_LABELS).get();
assertThat(this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME)).isNotPresent();
}

@Test
void testDeleteNotExistingConfigMapByLabels() throws Exception {
assertThat(this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME)).isNotPresent();
this.flinkKubeClient.deleteConfigMapsByLabels(TESTING_LABELS).get();
assertThat(this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME)).isNotPresent();
}

@Test
void testDeleteConfigMapByName() throws Exception {
this.flinkKubeClient.createConfigMap(buildTestingConfigMap()).get();
Expand Down
Loading

0 comments on commit 608546e

Please sign in to comment.