Skip to content

Commit

Permalink
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes
Browse files Browse the repository at this point in the history
In order to guard against deletions of ZooKeeper nodes which are still being used
by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.
  • Loading branch information
tillrohrmann authored and StefanRRichter committed May 19, 2017
1 parent b8f8524 commit 3d119e1
Show file tree
Hide file tree
Showing 8 changed files with 854 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void stop(boolean cleanup) throws Exception {
totalTaskCountInZooKeeper.close();

if(cleanup) {
workersInZooKeeper.removeAndDiscardAllState();
workersInZooKeeper.releaseAndTryRemoveAll();
}

isRunning = false;
Expand Down Expand Up @@ -169,7 +169,7 @@ public List<MesosWorkerStore.Worker> recoverWorkers() throws Exception {
synchronized (startStopLock) {
verifyIsRunning();

List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAll();
List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAllAndLock();

if(handles.size() != 0) {
List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size());
Expand Down Expand Up @@ -199,7 +199,7 @@ public void putWorker(MesosWorkerStore.Worker worker) throws Exception {
int currentVersion = workersInZooKeeper.exists(path);
if (currentVersion == -1) {
try {
workersInZooKeeper.add(path, worker);
workersInZooKeeper.addAndLock(path, worker);
LOG.debug("Added {} in ZooKeeper.", worker);
} catch (KeeperException.NodeExistsException ex) {
throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
Expand Down Expand Up @@ -227,7 +227,7 @@ public boolean removeWorker(Protos.TaskID taskID) throws Exception {
return false;
}

workersInZooKeeper.removeAndDiscardState(path);
workersInZooKeeper.releaseAndTryRemove(path);
LOG.debug("Removed worker {} from ZooKeeper.", taskID);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
Expand All @@ -34,12 +31,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -155,7 +152,7 @@ public void recover() throws Exception {
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock();
break;
}
catch (ConcurrentModificationException e) {
Expand All @@ -178,7 +175,7 @@ public void recover() throws Exception {
"checkpoint store.", e);

// remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpointStateHandle);
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
}

if (completedCheckpoint != null) {
Expand All @@ -201,7 +198,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
final RetrievableStateHandle<CompletedCheckpoint> stateHandle;

// First add the new one. If it fails, we don't want to loose existing data.
stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);

checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));

Expand All @@ -211,7 +208,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if necessary.
while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
try {
removeSubsumed(checkpointStateHandles.removeFirst(), sharedStateRegistry);
removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
Expand All @@ -237,7 +234,8 @@ public CompletedCheckpoint getLatestCheckpoint() {

try {
// remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpointStateHandles.pollLast());
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
} catch (Exception removeException) {
LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
}
Expand Down Expand Up @@ -265,7 +263,7 @@ public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {

// remove the checkpoint with broken state handle
stateHandleIterator.remove();
removeBrokenStateHandle(stateHandlePath);
removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
}
}

Expand All @@ -289,7 +287,7 @@ public void shutdown(JobStatus jobStatus) throws Exception {

for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
try {
removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
} catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
Expand All @@ -306,117 +304,87 @@ public void shutdown(JobStatus jobStatus) throws Exception {

// Clear the local handles, but don't remove any state
checkpointStateHandles.clear();

// Release the state handle locks in ZooKeeper such that they can be deleted
checkpointsInZooKeeper.releaseAll();
}
}

// ------------------------------------------------------------------------

private void removeSubsumed(
final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
final String pathInZooKeeper,
final SharedStateRegistry sharedStateRegistry) throws Exception {

Callable<Void> action = new Callable<Void>() {
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public Void call() throws Exception {
CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);

if (completedCheckpoint != null) {
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
}
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
if (value != null) {
final CompletedCheckpoint completedCheckpoint;
try {
completedCheckpoint = value.retrieveState();
} catch (Exception e) {
throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
}

return null;
if (completedCheckpoint != null) {
try {
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
}
}
}
};

remove(stateHandleAndPath, action);
checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
}

private void removeShutdown(
final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
final String pathInZooKeeper,
final JobStatus jobStatus,
final SharedStateRegistry sharedStateRegistry) throws Exception {

Callable<Void> action = new Callable<Void>() {
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public Void call() throws Exception {
CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath);

if (completedCheckpoint != null) {
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
}
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
if (value != null) {
final CompletedCheckpoint completedCheckpoint;

try {
completedCheckpoint = value.retrieveState();
} catch (Exception e) {
throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
}

return null;
if (completedCheckpoint != null) {
try {
completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
}
}
}
};

remove(stateHandleAndPath, action);
}

private void removeBrokenStateHandle(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
remove(stateHandleAndPath, null);
checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
}

/**
* Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
*/
private void remove(
final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
final Callable<Void> action) throws Exception {

BackgroundCallback callback = new BackgroundCallback() {
private void removeBrokenStateHandle(
final String pathInZooKeeper,
final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1);

public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
try {
if (event.getType() == CuratorEventType.DELETE) {
if (event.getResultCode() == 0) {
Exception exception = null;

if (null != action) {
try {
action.call();
} catch (Exception e) {
exception = new Exception("Could not execute callable action " +
"for checkpoint " + checkpointId + '.', e);
}
}

try {
// Discard the state handle
stateHandleAndPath.f0.discardState();
} catch (Exception e) {
Exception newException = new Exception("Could not discard meta " +
"data for completed checkpoint " + checkpointId + '.', e);

if (exception == null) {
exception = newException;
} else {
exception.addSuppressed(newException);
}
}

if (exception != null) {
throw exception;
}
} else {
throw new IllegalStateException("Unexpected result code " +
event.getResultCode() + " in '" + event + "' callback.");
}
} else {
throw new IllegalStateException("Unexpected event type " +
event.getType() + " in '" + event + "' callback.");
}
retrievableStateHandle.discardState();
} catch (Exception e) {
LOG.warn("Failed to discard checkpoint {}.", checkpointId, e);
throw new FlinkException("Could not discard state handle.", e);
}
}
};

// Remove state handle from ZooKeeper first. If this fails, we can still recover, but if
// we remove a state handle and fail to remove it from ZooKeeper, we end up in an
// inconsistent state.
checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,36 +157,46 @@ public void stop() throws Exception {
@Override
public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
final String path = getPathForJob(jobId);

LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);

synchronized (cacheLock) {
verifyIsRunning();

RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;
boolean success = false;

try {
jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path);
} catch (KeeperException.NoNodeException ignored) {
return null;
} catch (Exception e) {
throw new Exception("Could not retrieve the submitted job graph state handle " +
"for " + path + "from the submitted job graph store.", e);
}
SubmittedJobGraph jobGraph;
RetrievableStateHandle<SubmittedJobGraph> jobGraphRetrievableStateHandle;

try {
jobGraph = jobGraphRetrievableStateHandle.retrieveState();
} catch (Exception e) {
throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
}
try {
jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.getAndLock(path);
} catch (KeeperException.NoNodeException ignored) {
success = true;
return null;
} catch (Exception e) {
throw new Exception("Could not retrieve the submitted job graph state handle " +
"for " + path + "from the submitted job graph store.", e);
}
SubmittedJobGraph jobGraph;

addedJobGraphs.add(jobGraph.getJobId());
try {
jobGraph = jobGraphRetrievableStateHandle.retrieveState();
} catch (Exception e) {
throw new Exception("Failed to retrieve the submitted job graph from state handle.", e);
}

LOG.info("Recovered {}.", jobGraph);
addedJobGraphs.add(jobGraph.getJobId());

return jobGraph;
LOG.info("Recovered {}.", jobGraph);

success = true;
return jobGraph;
} finally {
if (!success) {
jobGraphsInZooKeeper.release(path);
}
}
}
}

Expand All @@ -207,7 +217,7 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {

if (currentVersion == -1) {
try {
jobGraphsInZooKeeper.add(path, jobGraph);
jobGraphsInZooKeeper.addAndLock(path, jobGraph);

addedJobGraphs.add(jobGraph.getJobId());

Expand Down Expand Up @@ -245,7 +255,7 @@ public void removeJobGraph(JobID jobId) throws Exception {

synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.removeAndDiscardState(path);
jobGraphsInZooKeeper.releaseAndTryRemove(path);

addedJobGraphs.remove(jobId);
}
Expand Down
Loading

0 comments on commit 3d119e1

Please sign in to comment.