diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index 2b935af229a49..343b22aa25e71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -264,9 +264,11 @@ public void removeJobGraph(JobID jobId) throws Exception { synchronized (cacheLock) { if (addedJobGraphs.contains(jobId)) { - jobGraphsInZooKeeper.releaseAndTryRemove(path); - - addedJobGraphs.remove(jobId); + if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) { + addedJobGraphs.remove(jobId); + } else { + throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId)); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java new file mode 100644 index 0000000000000..92bf1afab68a6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; + +import java.io.Serializable; + +/** + * {@link RetrievableStateStorageHelper} implementation for testing purposes. + * + * @param type of the element to store + */ +public final class TestingRetrievableStateStorageHelper implements RetrievableStateStorageHelper { + + @Override + public RetrievableStateHandle store(T state) { + return new TestingRetrievableStateHandle<>(state); + } + + private static final class TestingRetrievableStateHandle implements RetrievableStateHandle { + + private static final long serialVersionUID = 137053380713794300L; + + private final T state; + + private TestingRetrievableStateHandle(T state) { + this.state = state; + } + + @Override + public T retrieveState() { + return state; + } + + @Override + public void discardState() { + // no op + } + + @Override + public long getStateSize() { + return 0; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index f992d3b00c06d..a9cba8861c044 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -22,10 +22,8 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.util.ZooKeeperUtils; -import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; import org.apache.flink.util.TestLogger; @@ -36,8 +34,6 @@ import javax.annotation.Nonnull; -import java.io.IOException; -import java.io.Serializable; import java.util.List; import static org.junit.Assert.assertEquals; @@ -129,36 +125,4 @@ private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(Curator Executors.directExecutor()); } - private static final class TestingRetrievableStateStorageHelper implements RetrievableStateStorageHelper { - @Override - public RetrievableStateHandle store(T state) { - return new TestingRetrievableStateHandle<>(state); - } - - private static class TestingRetrievableStateHandle implements RetrievableStateHandle { - - private static final long serialVersionUID = 137053380713794300L; - - private final T state; - - private TestingRetrievableStateHandle(T state) { - this.state = state; - } - - @Override - public T retrieveState() throws IOException, ClassNotFoundException { - return state; - } - - @Override - public void discardState() throws Exception { - // no op - } - - @Override - public long getStateSize() { - return 0; - } - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java new file mode 100644 index 0000000000000..dde3b7ab20fcf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.framework.CuratorFramework; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link ZooKeeperSubmittedJobGraphStore}. + */ +public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger { + + @Rule + public ZooKeeperResource zooKeeperResource = new ZooKeeperResource(); + + private Configuration configuration; + + @Before + public void setup() { + configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); + } + + /** + * Tests that we fail with an exception if the job cannot be removed from the + * ZooKeeperSubmittedJobGraphStore. + */ + @Test + public void testJobGraphRemovalFailure() throws Exception { + try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) { + final TestingRetrievableStateStorageHelper stateStorage = new TestingRetrievableStateStorageHelper<>(); + final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage); + submittedJobGraphStore.start(null); + final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage); + otherSubmittedJobGraphStore.start(null); + + final SubmittedJobGraph jobGraph = new SubmittedJobGraph(new JobGraph(), null); + submittedJobGraphStore.putJobGraph(jobGraph); + + final SubmittedJobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId()); + + assertThat(recoveredJobGraph, is(notNullValue())); + + try { + otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId()); + fail("It should not be possible to remove the JobGraph since the first store still has a lock on it."); + } catch (Exception ignored) { + // expected + } + + otherSubmittedJobGraphStore.stop(); + } + } + + @Nonnull + public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper stateStorage) throws Exception { + return new ZooKeeperSubmittedJobGraphStore( + client, + "/foobar", + stateStorage); + } + +}