Skip to content

Commit

Permalink
[hotfix][coordination] Introduce the interface StateHandleStore and m…
Browse files Browse the repository at this point in the history
…ake ZooKeeperStateHandleStore as an implementation
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Nov 7, 2020
1 parent 86d2019 commit 3dda515
Show file tree
Hide file tree
Showing 10 changed files with 716 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
Expand Down Expand Up @@ -211,8 +212,8 @@ public void putWorker(MesosWorkerStore.Worker worker) throws Exception {
synchronized (startStopLock) {
verifyIsRunning();

int currentVersion = workersInZooKeeper.exists(path);
if (currentVersion == -1) {
final IntegerResourceVersion currentVersion = workersInZooKeeper.exists(path);
if (!currentVersion.isExisting()) {
workersInZooKeeper.addAndLock(path, worker);
LOG.debug("Added {} in ZooKeeper.", worker);
} else {
Expand All @@ -229,7 +230,7 @@ public boolean removeWorker(Protos.TaskID taskID) throws Exception {
synchronized (startStopLock) {
verifyIsRunning();

if (workersInZooKeeper.exists(path) == -1) {
if (!workersInZooKeeper.exists(path).isExisting()) {
LOG.debug("No such worker {} in ZooKeeper.", taskID);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner,
}

completedCheckpoints.clear();
checkpointsInZooKeeper.deleteChildren();
checkpointsInZooKeeper.clearEntries();
} else {
LOG.info("Suspending");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -221,9 +222,9 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
synchronized (cacheLock) {
verifyIsRunning();

int currentVersion = jobGraphsInZooKeeper.exists(path);
final IntegerResourceVersion currentVersion = jobGraphsInZooKeeper.exists(path);

if (currentVersion == -1) {
if (!currentVersion.isExisting()) {
try {
jobGraphsInZooKeeper.addAndLock(path, jobGraph);

Expand Down Expand Up @@ -299,7 +300,7 @@ public Collection<JobID> getJobIds() throws Exception {
LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", zooKeeperFullBasePath);

try {
paths = jobGraphsInZooKeeper.getAllPaths();
paths = jobGraphsInZooKeeper.getAllHandles();
} catch (Exception e) {
throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.persistence;

import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

/**
* {@link ResourceVersion} implementation with {@link Integer} value. The resource version in
* ZooKeeper is {@link Integer}.
*/
public class IntegerResourceVersion implements ResourceVersion<IntegerResourceVersion> {

private static final long serialVersionUID = 1L;

private static final IntegerResourceVersion NOT_EXISTING = new IntegerResourceVersion(-1);

private final int value;

private IntegerResourceVersion(int value) {
this.value = value;
}

@Override
public int compareTo(@Nonnull IntegerResourceVersion other) {
return Integer.compare(value, other.getValue());
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == IntegerResourceVersion.class) {
final IntegerResourceVersion that = (IntegerResourceVersion) obj;
return this.value == that.getValue();
} else {
return false;
}
}

@Override
public int hashCode() {
return Integer.hashCode(value);
}

@Override
public boolean isExisting() {
return this != NOT_EXISTING;
}

@Override
public String toString() {
return "IntegerResourceVersion{" + "value='" + value + '\'' + '}';
}

public int getValue() {
return this.value;
}

public static IntegerResourceVersion notExisting() {
return NOT_EXISTING;
}

/**
* Create a {@link IntegerResourceVersion} with given integer value.
*
* @param value resource version integer value. The value should not be negative.
*
* @return {@link IntegerResourceVersion} with given value.
*/
public static IntegerResourceVersion valueOf(int value) {
Preconditions.checkArgument(value >= 0);
return new IntegerResourceVersion(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.persistence;

import java.io.Serializable;

/**
* Resource version for specific state handle on the underlying storage. The implementation also needs to implement the
* {@link Comparable} interface so that we could compare the resource versions.
*
* @param <R> Type of {@link ResourceVersion}
*/
public interface ResourceVersion<R> extends Comparable<R>, Serializable {

/**
* Check whether the state handle is existing.
*
* @return true if state handle exists with current {@link ResourceVersion} on external storage. Or false
* it does not exist.
*/
boolean isExisting();
}
Loading

0 comments on commit 3dda515

Please sign in to comment.