Skip to content

Commit

Permalink
[FLINK-24038] Replace old KubernetesHaServices with KubernetesMultipl…
Browse files Browse the repository at this point in the history
…eComponentLeaderElectionHaServices
  • Loading branch information
tillrohrmann committed Jan 26, 2022
1 parent 0afe353 commit bba7c41
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@
* k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
*
* <p>Note that underline("_") is not allowed in Kubernetes ConfigMap name.
*
* @deprecated in favour of {@link KubernetesMultipleComponentLeaderElectionHaServices}
*/
@Deprecated
public class KubernetesHaServices extends AbstractHaServices {

private final String clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.flink.kubernetes.highavailability;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.util.FatalExitExceptionHandler;

import java.util.concurrent.Executor;

Expand All @@ -32,11 +34,26 @@ public class KubernetesHaServicesFactory implements HighAvailabilityServicesFact
@Override
public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor)
throws Exception {
return new KubernetesHaServices(
FlinkKubeClientFactory.getInstance()
.fromConfiguration(configuration, "kubernetes-ha-services"),
executor,
configuration,
BlobUtils.createBlobStoreFromConfig(configuration));
final boolean useOldHaServices =
configuration.get(HighAvailabilityOptions.USE_OLD_HA_SERVICES);

if (useOldHaServices) {
return new KubernetesHaServices(
FlinkKubeClientFactory.getInstance()
.fromConfiguration(configuration, "kubernetes-ha-services"),
executor,
configuration,
BlobUtils.createBlobStoreFromConfig(configuration));
} else {
return new KubernetesMultipleComponentLeaderElectionHaServices(
FlinkKubeClientFactory.getInstance()
.fromConfiguration(configuration, "kubernetes-ha-services"),
executor,
configuration,
BlobUtils.createBlobStoreFromConfig(configuration),
error ->
FatalExitExceptionHandler.INSTANCE.uncaughtException(
Thread.currentThread(), error));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
* Kubernetes ConfigMap. Note that the contending lock and leader storage are using the same
* ConfigMap. And every component(e.g. ResourceManager, Dispatcher, RestEndpoint, JobManager for
* each job) will have a separate ConfigMap.
*
* @deprecated in favour of {@link KubernetesMultipleComponentLeaderElectionDriver}
*/
@Deprecated
public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {

private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

import java.util.concurrent.ExecutorService;

/** {@link LeaderElectionDriverFactory} implementation for Kubernetes. */
/**
* {@link LeaderElectionDriverFactory} implementation for Kubernetes.
*
* @deprecated in favour of {@link KubernetesMultipleComponentLeaderElectionDriverFactory}
*/
public class KubernetesLeaderElectionDriverFactory implements LeaderElectionDriverFactory {

private final FlinkKubeClient kubeClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@

import java.util.concurrent.Executor;

/** {@link LeaderRetrievalDriverFactory} implementation for Kubernetes. */
/**
* {@link LeaderRetrievalDriverFactory} implementation for Kubernetes.
*
* @deprecated in favour of {@link KubernetesMultipleComponentLeaderRetrievalDriverFactory}
*/
@Deprecated
public class KubernetesLeaderRetrievalDriverFactory implements LeaderRetrievalDriverFactory {

private final FlinkKubeClient kubeClient;
Expand Down

0 comments on commit bba7c41

Please sign in to comment.