Skip to content

Commit

Permalink
[FLINK-32438][runtime] Merges AbstractZooKeeperHaServices with ZooKee…
Browse files Browse the repository at this point in the history
…perMultipleComponentLeaderElectionHaServices

Signed-off-by: Matthias Pohl <[email protected]>
  • Loading branch information
XComp committed Jul 3, 2023
1 parent 310d859 commit a7bd6de
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 186 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.AbstractHaServices;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionService;
Expand All @@ -32,12 +37,17 @@
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.util.concurrent.Executor;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* ZooKeeper HA services that only use a single leader election per process.
*
Expand All @@ -59,13 +69,12 @@
* | | /checkpoint_id_counter
* </pre>
*/
public class ZooKeeperMultipleComponentLeaderElectionHaServices
extends AbstractZooKeeperHaServices {
public class ZooKeeperMultipleComponentLeaderElectionHaServices extends AbstractHaServices {
/** The curator resource to use. */
private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;

private final Object lock = new Object();

private final CuratorFramework leaderNamespacedCuratorFramework;

private final FatalErrorHandler fatalErrorHandler;

@Nullable
Expand All @@ -74,18 +83,129 @@ public class ZooKeeperMultipleComponentLeaderElectionHaServices

public ZooKeeperMultipleComponentLeaderElectionHaServices(
CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
Configuration config,
Executor ioExecutor,
Configuration configuration,
Executor executor,
BlobStoreService blobStoreService,
FatalErrorHandler fatalErrorHandler)
throws Exception {
super(curatorFrameworkWrapper, ioExecutor, config, blobStoreService);
this.leaderNamespacedCuratorFramework =
ZooKeeperUtils.useNamespaceAndEnsurePath(
getCuratorFramework(), ZooKeeperUtils.getLeaderPath());
throws IOException {
super(
configuration,
executor,
blobStoreService,
FileSystemJobResultStore.fromConfiguration(configuration));
this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper);

this.fatalErrorHandler = fatalErrorHandler;
}

@Override
public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception {
return new ZooKeeperCheckpointRecoveryFactory(
ZooKeeperUtils.useNamespaceAndEnsurePath(
curatorFrameworkWrapper.asCuratorFramework(), ZooKeeperUtils.getJobsPath()),
configuration,
ioExecutor);
}

@Override
public JobGraphStore createJobGraphStore() throws Exception {
return ZooKeeperUtils.createJobGraphs(
curatorFrameworkWrapper.asCuratorFramework(), configuration);
}

@Override
protected void internalClose() throws Exception {
Exception exception = null;
synchronized (lock) {
if (multipleComponentLeaderElectionService != null) {
try {
multipleComponentLeaderElectionService.close();
} catch (Exception e) {
exception = e;
}
multipleComponentLeaderElectionService = null;
}
}

try {
curatorFrameworkWrapper.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

ExceptionUtils.tryRethrowException(exception);
}

@Override
protected void internalCleanup() throws Exception {
cleanupZooKeeperPaths();
}

@Override
protected void internalCleanupJobData(JobID jobID) throws Exception {
deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID));
}

/** Cleans up leftover ZooKeeper paths. */
private void cleanupZooKeeperPaths() throws Exception {
deleteOwnedZNode();
tryDeleteEmptyParentZNodes();
}

private void deleteOwnedZNode() throws Exception {
deleteZNode("/");
}

protected void deleteZNode(String path) throws Exception {
ZooKeeperUtils.deleteZNode(curatorFrameworkWrapper.asCuratorFramework(), path);
}

/**
* Tries to delete empty parent znodes.
*
* <p>IMPORTANT: This method can be removed once all supported ZooKeeper versions support the
* container {@link org.apache.zookeeper.CreateMode}.
*
* @throws Exception if the deletion fails for other reason than {@link
* KeeperException.NotEmptyException}
*/
private void tryDeleteEmptyParentZNodes() throws Exception {
// try to delete the parent znodes if they are empty
String remainingPath =
getParentPath(
getNormalizedPath(
curatorFrameworkWrapper.asCuratorFramework().getNamespace()));
final CuratorFramework nonNamespaceClient =
curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null);

while (!isRootPath(remainingPath)) {
try {
nonNamespaceClient.delete().forPath(remainingPath);
} catch (KeeperException.NotEmptyException ignored) {
// We can only delete empty znodes
break;
}

remainingPath = getParentPath(remainingPath);
}
}

private static boolean isRootPath(String remainingPath) {
return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
}

private static String getNormalizedPath(String path) {
return ZKPaths.makePath(path, "");
}

private static String getParentPath(String path) {
return ZKPaths.getPathAndNode(path).getPath();
}

// ///////////////////////////////////////////////
// LeaderElection/-Retrieval-related methods
// ///////////////////////////////////////////////

@Override
protected MultipleComponentLeaderElectionDriverFactory createLeaderElectionDriverFactory(
String leaderName) {
Expand All @@ -100,7 +220,9 @@ private MultipleComponentLeaderElectionService getOrInitializeSingleLeaderElecti
new DefaultMultipleComponentLeaderElectionService(
fatalErrorHandler,
new ZooKeeperMultipleComponentLeaderElectionDriverFactory(
leaderNamespacedCuratorFramework));
ZooKeeperUtils.useNamespaceAndEnsurePath(
curatorFrameworkWrapper.asCuratorFramework(),
ZooKeeperUtils.getLeaderPath())));
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format(
Expand All @@ -119,30 +241,9 @@ private MultipleComponentLeaderElectionService getOrInitializeSingleLeaderElecti
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
// Maybe use a single service for leader retrieval
return ZooKeeperUtils.createLeaderRetrievalService(
leaderNamespacedCuratorFramework, leaderPath, configuration);
}

@Override
protected void internalClose() throws Exception {
Exception exception = null;
synchronized (lock) {
if (multipleComponentLeaderElectionService != null) {
try {
multipleComponentLeaderElectionService.close();
} catch (Exception e) {
exception = e;
}
multipleComponentLeaderElectionService = null;
}
}

try {
super.internalClose();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

ExceptionUtils.tryRethrowException(exception);
curatorFrameworkWrapper.asCuratorFramework(),
ZooKeeperUtils.generateZookeeperPath(ZooKeeperUtils.getLeaderPath(), leaderPath),
configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -264,8 +265,7 @@ private LeaderInformation tryReadingLeaderInformation(ChildData childData, Strin

/**
* This selector finds all connection info nodes. See {@link
* org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices}
* for more details on the Znode layout.
* ZooKeeperMultipleComponentLeaderElectionHaServices} for more details on the Znode layout.
*/
private static class ConnectionInfoNodeSelector implements TreeCacheSelector {
@Override
Expand Down

0 comments on commit a7bd6de

Please sign in to comment.