diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.java
deleted file mode 100644
index f265fb14e9c23..0000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.persistence.IntegerResourceVersion;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths;
-import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * {@link JobGraph} instances for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
- *
- *
Each job graph creates ZNode:
- *
- * +----O /flink/jobgraphs/<job-id> 1 [persistent]
- * .
- * .
- * .
- * +----O /flink/jobgraphs/<job-id> N [persistent]
- *
- *
- * The root path is watched to detect concurrent modifications in corner situations where
- * multiple instances operate concurrently. The job manager acts as a {@link JobGraphListener}
- * to react to such situations.
- */
-public class ZooKeeperJobGraphStore implements JobGraphStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperJobGraphStore.class);
-
- /** Lock to synchronize with the {@link JobGraphListener}. */
- private final Object cacheLock = new Object();
-
- /** The set of IDs of all added job graphs. */
- private final Set addedJobGraphs = new HashSet<>();
-
- /** Submitted job graphs in ZooKeeper. */
- private final ZooKeeperStateHandleStore jobGraphsInZooKeeper;
-
- /**
- * Cache to monitor all children. This is used to detect races with other instances working
- * on the same state.
- */
- private final PathChildrenCache pathCache;
-
- /** The full configured base path including the namespace. */
- private final String zooKeeperFullBasePath;
-
- /** The external listener to be notified on races. */
- private JobGraphListener jobGraphListener;
-
- /** Flag indicating whether this instance is running. */
- private boolean isRunning;
-
- /**
- * Submitted job graph store backed by ZooKeeper.
- *
- * @param zooKeeperFullBasePath ZooKeeper path for current job graphs
- * @param zooKeeperStateHandleStore State storage used to persist the submitted jobs
- */
- public ZooKeeperJobGraphStore(
- String zooKeeperFullBasePath,
- ZooKeeperStateHandleStore zooKeeperStateHandleStore,
- PathChildrenCache pathCache) {
-
- checkNotNull(zooKeeperFullBasePath, "Current jobs path");
-
- this.zooKeeperFullBasePath = zooKeeperFullBasePath;
- this.jobGraphsInZooKeeper = checkNotNull(zooKeeperStateHandleStore);
-
- this.pathCache = checkNotNull(pathCache);
- pathCache.getListenable().addListener(new JobGraphsPathCacheListener());
- }
-
- @Override
- public void start(JobGraphListener jobGraphListener) throws Exception {
- synchronized (cacheLock) {
- if (!isRunning) {
- this.jobGraphListener = jobGraphListener;
-
- pathCache.start();
-
- isRunning = true;
- }
- }
- }
-
- @Override
- public void stop() throws Exception {
- synchronized (cacheLock) {
- if (isRunning) {
- jobGraphListener = null;
-
- try {
- Exception exception = null;
-
- try {
- jobGraphsInZooKeeper.releaseAll();
- } catch (Exception e) {
- exception = e;
- }
-
- try {
- pathCache.close();
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(e, exception);
- }
-
- if (exception != null) {
- throw new FlinkException("Could not properly stop the ZooKeeperJobGraphStore.", exception);
- }
- } finally {
- isRunning = false;
- }
- }
- }
- }
-
- @Override
- @Nullable
- public JobGraph recoverJobGraph(JobID jobId) throws Exception {
- checkNotNull(jobId, "Job ID");
- final String path = getPathForJob(jobId);
-
- LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
-
- synchronized (cacheLock) {
- verifyIsRunning();
-
- boolean success = false;
-
- try {
- RetrievableStateHandle jobGraphRetrievableStateHandle;
-
- try {
- jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.getAndLock(path);
- } catch (KeeperException.NoNodeException ignored) {
- success = true;
- return null;
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve the submitted job graph state handle " +
- "for " + path + " from the submitted job graph store.", e);
- }
- JobGraph jobGraph;
-
- try {
- jobGraph = jobGraphRetrievableStateHandle.retrieveState();
- } catch (ClassNotFoundException cnfe) {
- throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path +
- ". This indicates that you are trying to recover from state written by an " +
- "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe);
- } catch (IOException ioe) {
- throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path +
- ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " +
- "store.", ioe);
- }
-
- addedJobGraphs.add(jobGraph.getJobID());
-
- LOG.info("Recovered {}.", jobGraph);
-
- success = true;
- return jobGraph;
- } finally {
- if (!success) {
- jobGraphsInZooKeeper.release(path);
- }
- }
- }
- }
-
- @Override
- public void putJobGraph(JobGraph jobGraph) throws Exception {
- checkNotNull(jobGraph, "Job graph");
- String path = getPathForJob(jobGraph.getJobID());
-
- LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobID(), zooKeeperFullBasePath, path);
-
- boolean success = false;
-
- while (!success) {
- synchronized (cacheLock) {
- verifyIsRunning();
-
- final IntegerResourceVersion currentVersion = jobGraphsInZooKeeper.exists(path);
-
- if (!currentVersion.isExisting()) {
- try {
- jobGraphsInZooKeeper.addAndLock(path, jobGraph);
-
- addedJobGraphs.add(jobGraph.getJobID());
-
- success = true;
- }
- catch (KeeperException.NodeExistsException ignored) {
- }
- }
- else if (addedJobGraphs.contains(jobGraph.getJobID())) {
- try {
- jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
- LOG.info("Updated {} in ZooKeeper.", jobGraph);
-
- success = true;
- }
- catch (KeeperException.NoNodeException ignored) {
- }
- }
- else {
- throw new IllegalStateException("Oh, no. Trying to update a graph you didn't " +
- "#getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
- }
- }
- }
-
- LOG.info("Added {} to ZooKeeper.", jobGraph);
- }
-
- @Override
- public void removeJobGraph(JobID jobId) throws Exception {
- checkNotNull(jobId, "Job ID");
- String path = getPathForJob(jobId);
-
- LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
-
- synchronized (cacheLock) {
- if (addedJobGraphs.contains(jobId)) {
- if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
- addedJobGraphs.remove(jobId);
- } else {
- throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
- }
- }
- }
-
- LOG.info("Removed job graph {} from ZooKeeper.", jobId);
- }
-
- @Override
- public void releaseJobGraph(JobID jobId) throws Exception {
- checkNotNull(jobId, "Job ID");
- final String path = getPathForJob(jobId);
-
- LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
-
- synchronized (cacheLock) {
- if (addedJobGraphs.contains(jobId)) {
- jobGraphsInZooKeeper.release(path);
-
- addedJobGraphs.remove(jobId);
- }
- }
-
- LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
- }
-
- @Override
- public Collection getJobIds() throws Exception {
- Collection paths;
-
- LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath);
-
- try {
- paths = jobGraphsInZooKeeper.getAllHandles();
- } catch (Exception e) {
- throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
- }
-
- List jobIds = new ArrayList<>(paths.size());
-
- for (String path : paths) {
- try {
- jobIds.add(jobIdFromPath(path));
- } catch (Exception exception) {
- LOG.warn("Could not parse job id from {}. This indicates a malformed path.", path, exception);
- }
- }
-
- return jobIds;
- }
-
- /**
- * Monitors ZooKeeper for changes.
- *
- * Detects modifications from other job managers in corner situations. The event
- * notifications fire for changes from this job manager as well.
- */
- private final class JobGraphsPathCacheListener implements PathChildrenCacheListener {
-
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception {
-
- if (LOG.isDebugEnabled()) {
- if (event.getData() != null) {
- LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
- }
- else {
- LOG.debug("Received {} event", event.getType());
- }
- }
-
- switch (event.getType()) {
- case CHILD_ADDED: {
- JobID jobId = fromEvent(event);
-
- LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
-
- synchronized (cacheLock) {
- try {
- if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
- try {
- // Whoa! This has been added by someone else. Or we were fast
- // to remove it (false positive).
- jobGraphListener.onAddedJobGraph(jobId);
- } catch (Throwable t) {
- LOG.error("Error in callback", t);
- }
- }
- } catch (Exception e) {
- LOG.error("Error in JobGraphsPathCacheListener", e);
- }
- }
- }
- break;
-
- case CHILD_UPDATED: {
- // Nothing to do
- }
- break;
-
- case CHILD_REMOVED: {
- JobID jobId = fromEvent(event);
-
- LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
-
- synchronized (cacheLock) {
- try {
- if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
- try {
- // Oh oh. Someone else removed one of our job graphs. Mean!
- jobGraphListener.onRemovedJobGraph(jobId);
- } catch (Throwable t) {
- LOG.error("Error in callback", t);
- }
- }
-
- break;
- } catch (Exception e) {
- LOG.error("Error in JobGraphsPathCacheListener", e);
- }
- }
- }
- break;
-
- case CONNECTION_SUSPENDED: {
- LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " +
- "graphs are not monitored (temporarily).");
- }
- break;
-
- case CONNECTION_LOST: {
- LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
- "graphs are not monitored (permanently).");
- }
- break;
-
- case CONNECTION_RECONNECTED: {
- LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
- "graphs are monitored again.");
- }
- break;
-
- case INITIALIZED: {
- LOG.info("JobGraphsPathCacheListener initialized");
- }
- break;
- }
- }
-
- /**
- * Returns a JobID for the event's path.
- */
- private JobID fromEvent(PathChildrenCacheEvent event) {
- return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
- }
- }
-
- /**
- * Verifies that the state is running.
- */
- private void verifyIsRunning() {
- checkState(isRunning, "Not running. Forgot to call start()?");
- }
-
- /**
- * Returns the JobID as a String (with leading slash).
- */
- public static String getPathForJob(JobID jobId) {
- checkNotNull(jobId, "Job ID");
- return String.format("/%s", jobId);
- }
-
- /**
- * Returns the JobID from the given path in ZooKeeper.
- *
- * @param path in ZooKeeper
- * @return JobID associated with the given path
- */
- public static JobID jobIdFromPath(final String path) {
- return JobID.fromHexString(path);
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreUtil.java
new file mode 100644
index 0000000000000..44eba71a3beb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * Singleton {@link JobGraphStoreUtil} implementation for ZooKeeper.
+ */
+public enum ZooKeeperJobGraphStoreUtil implements JobGraphStoreUtil {
+ INSTANCE;
+
+ @Override
+ public String jobIDToName(JobID jobId) {
+ return ZooKeeperUtils.getPathForJob(jobId);
+ }
+
+ @Override
+ public JobID nameToJobID(String name) {
+ return JobID.fromHexString(name);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcher.java
new file mode 100644
index 0000000000000..0b317618f67a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcher.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link JobGraphStoreWatcher} implementation for ZooKeeper.
+ *
+ *
Each job graph creates ZNode:
+ *
+ * +----O /flink/jobgraphs/<job-id> 1 [persistent]
+ * .
+ * .
+ * .
+ * +----O /flink/jobgraphs/<job-id> N [persistent]
+ *
+ *
+ * The root path is watched to detect concurrent modifications in corner situations where
+ * multiple instances operate concurrently. The job manager acts as a {@link JobGraphStore.JobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperJobGraphStoreWatcher implements JobGraphStoreWatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperJobGraphStoreWatcher.class);
+
+ /**
+ * Cache to monitor all children. This is used to detect races with other instances working
+ * on the same state.
+ */
+ private final PathChildrenCache pathCache;
+
+ private JobGraphStore.JobGraphListener jobGraphListener;
+
+ private volatile boolean running;
+
+ public ZooKeeperJobGraphStoreWatcher(PathChildrenCache pathCache) {
+ this.pathCache = checkNotNull(pathCache);
+ this.pathCache.getListenable().addListener(new JobGraphsPathCacheListener());
+ running = false;
+ }
+
+ @Override
+ public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
+ this.jobGraphListener = checkNotNull(jobGraphListener);
+ running = true;
+ pathCache.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (!running) {
+ return;
+ }
+ running = false;
+
+ LOG.info("Stopping ZooKeeperJobGraphStoreWatcher ");
+ pathCache.close();
+ }
+
+ /**
+ * Monitors ZooKeeper for changes.
+ *
+ *
Detects modifications from other job managers in corner situations. The event
+ * notifications fire for changes from this job manager as well.
+ */
+ private final class JobGraphsPathCacheListener implements PathChildrenCacheListener {
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
+
+ if (LOG.isDebugEnabled()) {
+ if (event.getData() != null) {
+ LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
+ }
+ else {
+ LOG.debug("Received {} event", event.getType());
+ }
+ }
+
+ switch (event.getType()) {
+ case CHILD_ADDED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
+ jobGraphListener.onAddedJobGraph(jobId);
+ }
+ break;
+
+ case CHILD_UPDATED: {
+ // Nothing to do
+ }
+ break;
+
+ case CHILD_REMOVED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
+
+ jobGraphListener.onRemovedJobGraph(jobId);
+ }
+ break;
+
+ case CONNECTION_SUSPENDED: {
+ LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " +
+ "graphs are not monitored (temporarily).");
+ }
+ break;
+
+ case CONNECTION_LOST: {
+ LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
+ "graphs are not monitored (permanently).");
+ }
+ break;
+
+ case CONNECTION_RECONNECTED: {
+ LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
+ "graphs are monitored again.");
+ }
+ break;
+
+ case INITIALIZED: {
+ LOG.info("JobGraphsPathCacheListener initialized");
+ }
+ break;
+ }
+ }
+
+ /**
+ * Returns a JobID for the event's path.
+ */
+ private JobID fromEvent(PathChildrenCacheEvent event) {
+ return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 538d0554daf1e..18ca98738bc86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -30,8 +30,11 @@
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreUtil;
+import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
@@ -294,14 +297,15 @@ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFac
}
/**
- * Creates a {@link ZooKeeperJobGraphStore} instance.
+ * Creates a {@link DefaultJobGraphStore} instance with {@link ZooKeeperStateHandleStore},
+ * {@link ZooKeeperJobGraphStoreWatcher} and {@link ZooKeeperJobGraphStoreUtil}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
- * @return {@link ZooKeeperJobGraphStore} instance
+ * @return {@link DefaultJobGraphStore} instance
* @throws Exception if the submitted job graph store cannot be created
*/
- public static ZooKeeperJobGraphStore createJobGraphs(
+ public static JobGraphStore createJobGraphs(
CuratorFramework client,
Configuration configuration) throws Exception {
@@ -326,10 +330,10 @@ public static ZooKeeperJobGraphStore createJobGraphs(
final PathChildrenCache pathCache = new PathChildrenCache(facade, "/", false);
- return new ZooKeeperJobGraphStore(
- zooKeeperFullJobsPath,
+ return new DefaultJobGraphStore<>(
zooKeeperStateHandleStore,
- pathCache);
+ new ZooKeeperJobGraphStoreWatcher(pathCache),
+ ZooKeeperJobGraphStoreUtil.INSTANCE);
}
/**
@@ -359,7 +363,7 @@ public static CompletedCheckpointStore createCompletedCheckpoints(
configuration,
HA_STORAGE_COMPLETED_CHECKPOINT);
- checkpointsPath += ZooKeeperJobGraphStore.getPathForJob(jobId);
+ checkpointsPath += getPathForJob(jobId);
final ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
@@ -370,6 +374,15 @@ public static CompletedCheckpointStore createCompletedCheckpoints(
return zooKeeperCompletedCheckpointStore;
}
+
+ /**
+ * Returns the JobID as a String (with leading slash).
+ */
+ public static String getPathForJob(JobID jobId) {
+ checkNotNull(jobId, "Job ID");
+ return String.format("/%s", jobId);
+ }
+
/**
* Creates an instance of {@link ZooKeeperStateHandleStore}.
*
@@ -404,7 +417,7 @@ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
String checkpointIdCounterPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
- checkpointIdCounterPath += ZooKeeperJobGraphStore.getPathForJob(jobId);
+ checkpointIdCounterPath += getPathForJob(jobId);
return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath, new DefaultLastStateConnectionStateListener());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index c27a3d5145269..2a9dbed910ccf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -40,7 +40,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.UUID;
@@ -159,7 +158,8 @@ public RetrievableStateHandle addAndLock(
return storeHandle;
}
catch (KeeperException.NodeExistsException e) {
- throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
+ // We wrap the exception here so that it could be caught in DefaultJobGraphStore
+ throw new AlreadyExistException("ZooKeeper node " + path + " already exists.", e);
}
finally {
if (!success) {
@@ -202,7 +202,8 @@ public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersi
.forPath(path, serializedStateHandle);
success = true;
} catch (KeeperException.NoNodeException e) {
- throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
+ // We wrap the exception here so that it could be caught in DefaultJobGraphStore
+ throw new NotExistException("ZooKeeper node " + path + " does not exist.", e);
} finally {
if (success) {
oldStateHandle.discardState();
@@ -492,6 +493,10 @@ private RetrievableStateHandle get(String pathInZooKeeper, boolean lock) thro
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
} catch (KeeperException.NodeExistsException ignored) {
// we have already created the lock
+ } catch (KeeperException.NoNodeException ex) {
+ // We could run into this exception because the parent node does not exist when we are trying to lock.
+ // We wrap the exception here so that it could be caught in DefaultJobGraphStore
+ throw new NotExistException("ZooKeeper node " + path + " does not exist.", ex);
}
}
@@ -500,18 +505,19 @@ private RetrievableStateHandle get(String pathInZooKeeper, boolean lock) thro
try {
byte[] data = client.getData().forPath(path);
- try {
- RetrievableStateHandle retrievableStateHandle = InstantiationUtil.deserializeObject(
- data,
- Thread.currentThread().getContextClassLoader());
+ RetrievableStateHandle retrievableStateHandle = InstantiationUtil.deserializeObject(
+ data,
+ Thread.currentThread().getContextClassLoader());
- success = true;
+ success = true;
- return retrievableStateHandle;
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
- path + '.', e);
- }
+ return retrievableStateHandle;
+ } catch (KeeperException.NoNodeException ex) {
+ // We wrap the exception here so that it could be caught in DefaultJobGraphStore
+ throw new NotExistException("ZooKeeper node " + path + " does not exist.", ex);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
+ path + '.', e);
} finally {
if (!success && lock) {
// release the lock
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index a1fa441a87660..936fe1cec91cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -39,8 +39,8 @@
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
-import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -192,7 +192,7 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception {
dispatcherLeaderElectionService.notLeader();
// check that the job has been removed from ZooKeeper
- final ZooKeeperJobGraphStore submittedJobGraphStore = createZooKeeperJobGraphStore(client);
+ final JobGraphStore submittedJobGraphStore = createZooKeeperJobGraphStore(client);
CommonTestUtils.waitUntilCondition(() -> submittedJobGraphStore.getJobIds().isEmpty(), Deadline.fromNow(VERIFICATION_TIMEOUT), 20L);
}
@@ -217,7 +217,7 @@ private DispatcherRunner createDispatcherRunner(
partialDispatcherServices);
}
- private ZooKeeperJobGraphStore createZooKeeperJobGraphStore(CuratorFramework client) {
+ private JobGraphStore createZooKeeperJobGraphStore(CuratorFramework client) {
try {
return ZooKeeperUtils.createJobGraphs(client, configuration);
} catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreTest.java
deleted file mode 100644
index b9c72a8db37c0..0000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import javax.annotation.Nonnull;
-
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link ZooKeeperJobGraphStore}.
- */
-public class ZooKeeperJobGraphStoreTest extends TestLogger {
-
- @Rule
- public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
-
- private Configuration configuration;
-
- @Before
- public void setup() {
- configuration = new Configuration();
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
- }
-
- /**
- * Tests that we fail with an exception if the job cannot be removed from the
- * ZooKeeperJobGraphStore.
- *
- * Tests that a close ZooKeeperJobGraphStore no longer holds any locks.
- */
- @Test
- public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
- try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
- final TestingRetrievableStateStorageHelper stateStorage = new TestingRetrievableStateStorageHelper<>();
- final ZooKeeperJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
- submittedJobGraphStore.start(null);
- final ZooKeeperJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
- otherSubmittedJobGraphStore.start(null);
-
- final JobGraph jobGraph = new JobGraph();
- submittedJobGraphStore.putJobGraph(jobGraph);
-
- final JobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobID());
-
- assertThat(recoveredJobGraph, is(notNullValue()));
-
- try {
- otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
- fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
- } catch (Exception ignored) {
- // expected
- }
-
- submittedJobGraphStore.stop();
-
- // now we should be able to delete the job graph
- otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
-
- assertThat(otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()), is(nullValue()));
-
- otherSubmittedJobGraphStore.stop();
- }
- }
-
- @Nonnull
- private ZooKeeperJobGraphStore createSubmittedJobGraphStore(
- CuratorFramework client,
- TestingRetrievableStateStorageHelper stateStorage
- ) {
- return new ZooKeeperJobGraphStore(
- client.getNamespace(),
- new ZooKeeperStateHandleStore<>(client, stateStorage),
- new PathChildrenCache(client, "/", false));
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java
new file mode 100644
index 0000000000000..8639a2d26d8eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreWatcherTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link ZooKeeperJobGraphStoreWatcher}.
+ */
+public class ZooKeeperJobGraphStoreWatcherTest extends TestLogger {
+
+ @Rule
+ public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final Duration TIMEOUT = Duration.ofMillis(30 * 1000);
+
+ private Configuration configuration;
+
+ private TestingJobGraphListener testingJobGraphListener;
+
+ @Before
+ public void setup() throws Exception {
+ configuration = new Configuration();
+ configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+ configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
+ testingJobGraphListener = new TestingJobGraphListener();
+ }
+
+ @Test
+ public void testJobGraphAddedAndRemovedShouldNotifyGraphStoreListener() throws Exception {
+ try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
+ final JobGraphStoreWatcher jobGraphStoreWatcher = createAndStartJobGraphStoreWatcher(client);
+
+ final ZooKeeperStateHandleStore stateHandleStore = createStateHandleStore(client);
+
+ final JobGraph jobGraph = new JobGraph();
+ final JobID jobID = jobGraph.getJobID();
+ stateHandleStore.addAndLock("/" + jobID, jobGraph);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> testingJobGraphListener.getAddedJobGraphs().size() > 0,
+ Deadline.fromNow(TIMEOUT));
+
+ assertThat(testingJobGraphListener.getAddedJobGraphs(), contains(jobID));
+
+ stateHandleStore.releaseAndTryRemove("/" + jobID);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> testingJobGraphListener.getRemovedJobGraphs().size() > 0,
+ Deadline.fromNow(TIMEOUT));
+ assertThat(testingJobGraphListener.getRemovedJobGraphs(), contains(jobID));
+
+ jobGraphStoreWatcher.stop();
+ }
+ }
+
+ private JobGraphStoreWatcher createAndStartJobGraphStoreWatcher(CuratorFramework client) throws Exception {
+ final ZooKeeperJobGraphStoreWatcher jobGraphStoreWatcher = new ZooKeeperJobGraphStoreWatcher(
+ new PathChildrenCache(client, "/", false));
+ jobGraphStoreWatcher.start(testingJobGraphListener);
+ return jobGraphStoreWatcher;
+ }
+
+ private ZooKeeperStateHandleStore createStateHandleStore(CuratorFramework client) throws Exception {
+ final RetrievableStateStorageHelper stateStorage = ZooKeeperUtils.createFileSystemStateStorage(
+ configuration, "test_jobgraph");
+ return new ZooKeeperStateHandleStore<>(client, stateStorage);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
index bdebcc026354f..82c6489192b74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore.JobGraphListener;
@@ -46,8 +47,13 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
@@ -57,7 +63,8 @@
import static org.mockito.Mockito.verify;
/**
- * Tests for basic {@link JobGraphStore} contract.
+ * IT tests for {@link DefaultJobGraphStore} with all ZooKeeper components(e.g. {@link ZooKeeperStateHandleStore},
+ * {@link ZooKeeperJobGraphStoreWatcher}, {@link ZooKeeperJobGraphStoreUtil}).
*/
public class ZooKeeperJobGraphsStoreITCase extends TestLogger {
@@ -82,7 +89,7 @@ public void cleanUp() throws Exception {
@Test
public void testPutAndRemoveJobGraph() throws Exception {
- ZooKeeperJobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testPutAndRemoveJobGraph");
+ JobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testPutAndRemoveJobGraph");
try {
JobGraphStore.JobGraphListener listener = mock(JobGraphStore.JobGraphListener.class);
@@ -136,24 +143,26 @@ public void testPutAndRemoveJobGraph() throws Exception {
}
@Nonnull
- private ZooKeeperJobGraphStore createZooKeeperJobGraphStore(String fullPath) throws Exception {
+ private JobGraphStore createZooKeeperJobGraphStore(String fullPath) throws Exception {
final CuratorFramework client = ZooKeeper.getClient();
// Ensure that the job graphs path exists
client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
- return new ZooKeeperJobGraphStore(
- fullPath,
- new ZooKeeperStateHandleStore<>(
- facade,
- localStateStorage),
- new PathChildrenCache(facade, "/", false));
+ final ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(
+ facade,
+ localStateStorage);
+ return new DefaultJobGraphStore<>(
+ zooKeeperStateHandleStore,
+ new ZooKeeperJobGraphStoreWatcher(new PathChildrenCache(facade, "/", false)),
+ ZooKeeperJobGraphStoreUtil.INSTANCE
+ );
}
@Test
public void testRecoverJobGraphs() throws Exception {
- ZooKeeperJobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testRecoverJobGraphs");
+ JobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testRecoverJobGraphs");
try {
JobGraphStore.JobGraphListener listener = mock(JobGraphStore.JobGraphListener.class);
@@ -199,8 +208,8 @@ public void testRecoverJobGraphs() throws Exception {
@Test
public void testConcurrentAddJobGraph() throws Exception {
- ZooKeeperJobGraphStore jobGraphs = null;
- ZooKeeperJobGraphStore otherJobGraphs = null;
+ JobGraphStore jobGraphs = null;
+ JobGraphStore otherJobGraphs = null;
try {
jobGraphs = createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
@@ -227,7 +236,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
// Test
jobGraphs.start(listener);
- otherJobGraphs.start(null);
+ otherJobGraphs.start(NoOpJobGraphListener.INSTANCE);
jobGraphs.putJobGraph(jobGraph);
@@ -259,12 +268,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
@Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
- ZooKeeperJobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
+ JobGraphStore jobGraphs = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
- ZooKeeperJobGraphStore otherJobGraphs = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
+ JobGraphStore otherJobGraphs = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
- jobGraphs.start(null);
- otherJobGraphs.start(null);
+ jobGraphs.start(NoOpJobGraphListener.INSTANCE);
+ otherJobGraphs.start(NoOpJobGraphListener.INSTANCE);
JobGraph jobGraph = createJobGraph(new JobID());
@@ -273,6 +282,47 @@ public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
otherJobGraphs.putJobGraph(jobGraph);
}
+ /**
+ * Tests that we fail with an exception if the job cannot be removed from the
+ * ZooKeeperJobGraphStore.
+ *
+ * Tests that a close ZooKeeperJobGraphStore no longer holds any locks.
+ */
+ @Test
+ public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
+ final JobGraphStore submittedJobGraphStore =
+ createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
+ final JobGraphStore otherSubmittedJobGraphStore =
+ createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
+
+ final TestingJobGraphListener listener = new TestingJobGraphListener();
+ submittedJobGraphStore.start(listener);
+ otherSubmittedJobGraphStore.start(listener);
+
+ final JobGraph jobGraph = new JobGraph();
+ submittedJobGraphStore.putJobGraph(jobGraph);
+
+ final JobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobID());
+
+ assertThat(recoveredJobGraph, is(notNullValue()));
+
+ try {
+ otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
+ fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
+ } catch (Exception ignored) {
+ // expected
+ }
+
+ submittedJobGraphStore.stop();
+
+ // now we should be able to delete the job graph
+ otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
+
+ assertThat(otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()), is(nullValue()));
+
+ otherSubmittedJobGraphStore.stop();
+ }
+
// ---------------------------------------------------------------------------------------------
private JobGraph createJobGraph(JobID jobId) {