Skip to content

Commit

Permalink
[FLINK-34286][k8s] Attach cluster config map labels at creation time
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 31, 2024
1 parent 0ba3e76 commit b5a2ee4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.KubernetesUtils.getOnlyConfigMap;

/** {@link LeaderElectionDriver} for Kubernetes. */
Expand All @@ -63,9 +62,6 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {

private final KubernetesLeaderElector leaderElector;

// Labels will be used to clean up the ha related ConfigMaps.
private final Map<String, String> configMapLabels;

private final KubernetesSharedWatcher.Watch kubernetesWatch;

private final AtomicBoolean running = new AtomicBoolean(true);
Expand All @@ -89,11 +85,6 @@ public KubernetesLeaderElectionDriver(
kubeClient.createLeaderElector(
leaderElectionConfiguration, new LeaderCallbackHandlerImpl());

this.configMapLabels =
KubernetesUtils.getConfigMapLabels(
leaderElectionConfiguration.getClusterId(),
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);

kubernetesWatch =
configMapSharedWatcher.watch(
configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);
Expand Down Expand Up @@ -174,7 +165,6 @@ public void deleteLeaderInformation(String componentId) {
KubernetesUtils.encodeLeaderInformation(leaderInformation));
}

kubernetesConfigMap.getLabels().putAll(configMapLabels);
return Optional.of(kubernetesConfigMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
Expand All @@ -34,6 +36,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;

/**
* Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a
* blocking call. It should be run in the IO executor, not the main thread. The lifecycle is bound
Expand Down Expand Up @@ -71,8 +75,16 @@ public KubernetesLeaderElector(
.withLeaseDuration(leaderConfig.getLeaseDuration())
.withLock(
new ConfigMapLock(
kubernetesClient.getNamespace(),
leaderConfig.getConfigMapName(),
new ObjectMetaBuilder()
.withNamespace(kubernetesClient.getNamespace())
.withName(leaderConfig.getConfigMapName())
// Labels will be used to clean up the ha related
// ConfigMaps.
.withLabels(
KubernetesUtils.getConfigMapLabels(
leaderConfig.getClusterId(),
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
.build(),
leaderConfig.getLockIdentity()))
.withRenewDeadline(leaderConfig.getRenewDeadline())
.withRetryPeriod(leaderConfig.getRetryPeriod())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -254,26 +251,6 @@ void testErrorForwarding() throws Exception {
};
}

@Test
void testHighAvailabilityLabelsCorrectlySet() throws Exception {
new Context() {
{
runTest(
() -> {
leaderCallbackGrantLeadership();

final Map<String, String> leaderLabels =
getLeaderConfigMap().getLabels();
assertThat(leaderLabels).hasSize(3);
assertThat(leaderLabels)
.containsEntry(
LABEL_CONFIGMAP_TYPE_KEY,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
});
}
};
}

@Test
void testToStringContainingConfigMap() throws Exception {
new Context() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.util.UUID;

import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -110,4 +112,44 @@ void testMultipleKubernetesLeaderElectors() throws Exception {
kubernetesExtension.getFlinkKubeClient().deleteConfigMap(leaderConfigMapName).get();
}
}

@Test
void testClusterConfigMapLabelsAreSet() throws Exception {
final Configuration configuration = kubernetesExtension.getConfiguration();

final String leaderConfigMapName =
LEADER_CONFIGMAP_NAME_PREFIX + System.currentTimeMillis();

final TestingLeaderCallbackHandler leaderCallbackHandler =
new TestingLeaderCallbackHandler(UUID.randomUUID().toString());
final KubernetesLeaderElectionConfiguration leaderConfig =
new KubernetesLeaderElectionConfiguration(
leaderConfigMapName,
leaderCallbackHandler.getLockIdentity(),
configuration);

try (FlinkKubeClient kubeClient =
kubeClientFactory.fromConfiguration(configuration, "testing")) {
final KubernetesLeaderElector leaderElector =
kubeClient.createLeaderElector(leaderConfig, leaderCallbackHandler);
try {
leaderElector.run();

TestingLeaderCallbackHandler.waitUntilNewLeaderAppears();

assertThat(kubeClient.getConfigMap(leaderConfigMapName))
.hasValueSatisfying(
configMap ->
assertThat(configMap.getLabels())
.hasSize(3)
.containsEntry(
LABEL_CONFIGMAP_TYPE_KEY,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
} finally {
leaderElector.stop();
}
} finally {
kubernetesExtension.getFlinkKubeClient().deleteConfigMap(leaderConfigMapName).get();
}
}
}

0 comments on commit b5a2ee4

Please sign in to comment.