Skip to content

Commit

Permalink
[FLINK-10329] Fail ZooKeeperSubmittedJobGraphStore#removeJobGraph if …
Browse files Browse the repository at this point in the history
…job cannot be removed

Fail properly with an exception if we cannot remove the JobGraph in ZooKeeperSubmittedJobGraphStore#
removeJobGraph. This is necessary in order to notify callers about the unsuccessful attempt.
  • Loading branch information
tillrohrmann committed Sep 14, 2018
1 parent 9c4c138 commit 60ed037
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ public void removeJobGraph(JobID jobId) throws Exception {

synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.releaseAndTryRemove(path);

addedJobGraphs.remove(jobId);
if (jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
addedJobGraphs.remove(jobId);
} else {
throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;

import java.io.Serializable;

/**
* {@link RetrievableStateStorageHelper} implementation for testing purposes.
*
* @param <T> type of the element to store
*/
public final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {

@Override
public RetrievableStateHandle<T> store(T state) {
return new TestingRetrievableStateHandle<>(state);
}

private static final class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {

private static final long serialVersionUID = 137053380713794300L;

private final T state;

private TestingRetrievableStateHandle(T state) {
this.state = state;
}

@Override
public T retrieveState() {
return state;
}

@Override
public void discardState() {
// no op
}

@Override
public long getStateSize() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;

Expand All @@ -36,8 +34,6 @@

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -129,36 +125,4 @@ private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(Curator
Executors.directExecutor());
}

private static final class TestingRetrievableStateStorageHelper<T extends Serializable> implements RetrievableStateStorageHelper<T> {
@Override
public RetrievableStateHandle<T> store(T state) {
return new TestingRetrievableStateHandle<>(state);
}

private static class TestingRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {

private static final long serialVersionUID = 137053380713794300L;

private final T state;

private TestingRetrievableStateHandle(T state) {
this.state = state;
}

@Override
public T retrieveState() throws IOException, ClassNotFoundException {
return state;
}

@Override
public void discardState() throws Exception {
// no op
}

@Override
public long getStateSize() {
return 0;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;

import org.apache.curator.framework.CuratorFramework;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import javax.annotation.Nonnull;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Tests for the {@link ZooKeeperSubmittedJobGraphStore}.
*/
public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {

@Rule
public ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

private Configuration configuration;

@Before
public void setup() {
configuration = new Configuration();
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
}

/**
* Tests that we fail with an exception if the job cannot be removed from the
* ZooKeeperSubmittedJobGraphStore.
*/
@Test
public void testJobGraphRemovalFailure() throws Exception {
try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
submittedJobGraphStore.start(null);
final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
otherSubmittedJobGraphStore.start(null);

final SubmittedJobGraph jobGraph = new SubmittedJobGraph(new JobGraph(), null);
submittedJobGraphStore.putJobGraph(jobGraph);

final SubmittedJobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobId());

assertThat(recoveredJobGraph, is(notNullValue()));

try {
otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
} catch (Exception ignored) {
// expected
}

otherSubmittedJobGraphStore.stop();
}
}

@Nonnull
public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
return new ZooKeeperSubmittedJobGraphStore(
client,
"/foobar",
stateStorage);
}

}

0 comments on commit 60ed037

Please sign in to comment.