Skip to content

Commit

Permalink
[FLINK-24038] Replace ZooKeeperHaServices with ZooKeeperMultipleCompo…
Browse files Browse the repository at this point in the history
…nentLeaderElectionHaServices
  • Loading branch information
tillrohrmann committed Jan 26, 2022
1 parent bba7c41 commit 391ce7c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperClientHAServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.AddressResolution;
Expand Down Expand Up @@ -65,13 +67,7 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices(
return new EmbeddedHaServices(executor);

case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(config, fatalErrorHandler),
executor,
config,
blobStoreService);
return createZooKeeperHaServices(config, executor, fatalErrorHandler);

case FACTORY_CLASS:
return createCustomHAServices(config, executor);
Expand All @@ -82,6 +78,30 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices(
}
}

private static HighAvailabilityServices createZooKeeperHaServices(
Configuration configuration, Executor executor, FatalErrorHandler fatalErrorHandler)
throws Exception {
final boolean useOldHaServices =
configuration.get(HighAvailabilityOptions.USE_OLD_HA_SERVICES);

BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);

final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper =
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler);

if (useOldHaServices) {
return new ZooKeeperHaServices(
curatorFrameworkWrapper, executor, configuration, blobStoreService);
} else {
return new ZooKeeperMultipleComponentLeaderElectionHaServices(
curatorFrameworkWrapper,
configuration,
executor,
blobStoreService,
fatalErrorHandler);
}
}

public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
Expand Down Expand Up @@ -117,14 +137,7 @@ public static HighAvailabilityServices createHighAvailabilityServices(
return new StandaloneHaServices(
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
BlobStoreService blobStoreService =
BlobUtils.createBlobStoreFromConfig(configuration);

return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler),
executor,
configuration,
blobStoreService);
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);

case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@
* <p>In the case of a standalone cluster, that cluster-id needs to be configured via {@link
* HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
* cluster and participate in the execution of the same set of jobs.
*
* @deprecated in favour of {@link ZooKeeperMultipleComponentLeaderElectionHaServices}
*/
@Deprecated
public class ZooKeeperHaServices extends AbstractZooKeeperHaServices {

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
* {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected
* using ZooKeeper. The current leader's address as well as its leader session ID is published via
* ZooKeeper.
*
* @deprecated in favour of {@link ZooKeeperMultipleComponentLeaderElectionDriver}
*/
@Deprecated
public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;

/** {@link LeaderElectionDriverFactory} implementation for Zookeeper. */
/**
* {@link LeaderElectionDriverFactory} implementation for Zookeeper.
*
* @deprecated in favour of {@link ZooKeeperMultipleComponentLeaderElectionDriverFactory}
*/
public class ZooKeeperLeaderElectionDriverFactory implements LeaderElectionDriverFactory {

private final CuratorFramework client;
Expand Down

0 comments on commit 391ce7c

Please sign in to comment.