Skip to content

Commit

Permalink
[FLINK-10185] Make ZooKeeperStateHandleStore#releaseAndTryRemove sync…
Browse files Browse the repository at this point in the history
…hronous

Remove the asynchronous callback from ZooKeeperStateHandleStore#releaseAndTryRemove.
Instead we can execute the callback after having executed the releaseAndTryRemove
method successfully. This separates concerns better because we don't mix storage
with business logic. Furthermore, we can still avoid blocking operations if we use a
separate thread to call into ZooKeeperStateHandleStore#releaseAndTryRemove.

This closes apache#6586.
  • Loading branch information
tillrohrmann committed Sep 12, 2018
1 parent 0a345eb commit a0b6685
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 511 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec

ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
"/workers",
stateStorageHelper,
executor);
stateStorageHelper);

ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ConsumerWithException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -86,6 +85,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
*/
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;

private final Executor executor;

/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
*
Expand All @@ -98,7 +99,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* start with a '/')
* @param stateStorage State storage to be used to persist the completed
* checkpoint
* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
* @param executor to execute blocking calls
* @throws Exception
*/
public ZooKeeperCompletedCheckpointStore(
Expand All @@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);

this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);

this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);

this.executor = checkNotNull(executor);

LOG.info("Initialized in '{}'.", checkpointsPath);
}

Expand Down Expand Up @@ -236,16 +239,30 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception

// Everything worked, let's remove a previous checkpoint if necessary.
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
removeSubsumed(completedCheckpoints.removeFirst());
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
}

LOG.debug("Added {} to {}.", checkpoint, path);
}

private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> discardCallback) {
try {
if (tryRemove(completedCheckpoint.getCheckpointID())) {
executor.execute(() -> {
try {
discardCallback.accept(completedCheckpoint);
} catch (Exception e) {
LOG.warn("Could not discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
}
});

}
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
}

@Override
public CompletedCheckpoint getLatestCheckpoint() {
if (completedCheckpoints.isEmpty()) {
Expand Down Expand Up @@ -278,11 +295,9 @@ public void shutdown(JobStatus jobStatus) throws Exception {
LOG.info("Shutting down");

for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try {
removeShutdown(checkpoint, jobStatus);
} catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
tryRemoveCompletedCheckpoint(
checkpoint,
completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
}

completedCheckpoints.clear();
Expand All @@ -305,59 +320,13 @@ public void shutdown(JobStatus jobStatus) throws Exception {
// ------------------------------------------------------------------------

/**
* Removes a subsumed checkpoint from ZooKeeper and drops the state.
*/
private void removeSubsumed(
final CompletedCheckpoint completedCheckpoint) throws Exception {

if (completedCheckpoint == null) {
return;
}

ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
if (value != null) {
try {
completedCheckpoint.discardOnSubsume();
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
}
}
};

checkpointsInZooKeeper.releaseAndTryRemove(
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
action);
}

/**
* Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state.
* Tries to remove the checkpoint identified by the given checkpoint id.
*
* @param checkpointId identifying the checkpoint to remove
* @return true if the checkpoint could be removed
*/
private void removeShutdown(
final CompletedCheckpoint completedCheckpoint,
final JobStatus jobStatus) throws Exception {

if (completedCheckpoint == null) {
return;
}

ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
try {
completedCheckpoint.discardOnShutdown(jobStatus);
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
}
};

checkpointsInZooKeeper.releaseAndTryRemove(
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
removeAction);
private boolean tryRemove(long checkpointId) throws Exception {
return checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {

@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -99,14 +98,12 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
* @param client ZooKeeper client
* @param currentJobsPath ZooKeeper path for current job graphs
* @param stateStorage State storage used to persist the submitted jobs
* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks
* @throws Exception
*/
public ZooKeeperSubmittedJobGraphStore(
CuratorFramework client,
String currentJobsPath,
RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage,
Executor executor) throws Exception {
RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {

checkNotNull(currentJobsPath, "Current jobs path");
checkNotNull(stateStorage, "State storage");
Expand All @@ -123,7 +120,7 @@ public ZooKeeperSubmittedJobGraphStore(
CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);

this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);

this.pathCache = new PathChildrenCache(facade, "/", false);
pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,12 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService(
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @param executor to run ZooKeeper callbacks
* @return {@link ZooKeeperSubmittedJobGraphStore} instance
* @throws Exception if the submitted job graph store cannot be created
*/
public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
CuratorFramework client,
Configuration configuration,
Executor executor) throws Exception {
Configuration configuration) throws Exception {

checkNotNull(configuration, "Configuration");

Expand All @@ -244,7 +242,9 @@ public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);

return new ZooKeeperSubmittedJobGraphStore(
client, zooKeeperSubmittedJobsPath, stateStorage, executor);
client,
zooKeeperSubmittedJobsPath,
stateStorage);
}

/**
Expand Down
Loading

0 comments on commit a0b6685

Please sign in to comment.