Skip to content

Commit

Permalink
[FLINK-22483][runtime] Remove CompletedCheckpointStore#recover) metho…
Browse files Browse the repository at this point in the history
…d an change contract of CheckpointRecoveryFactory#createCompletedCheckpointStore, so that newly constructed CheckpointStore is already recovered.

It's enough to recover CompletedCheckpointStoreshould only once, right after JobMasterRunner gains leadership. This also ensures that we'll fetch checkpoints from the external store in a "jobmaster-future-thread", without pontetially blocking RPC threads.

This closes apache#16652.
  • Loading branch information
dmvk authored and tillrohrmann committed Aug 15, 2021
1 parent 94ae3dc commit f64261c
Show file tree
Hide file tree
Showing 43 changed files with 809 additions and 652 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public KubernetesCheckpointRecoveryFactory(
}

@Override
public CompletedCheckpointStore createCheckpointStore(
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
Expand Down Expand Up @@ -299,6 +300,8 @@ public static CompletedCheckpointStore createCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
stateHandleStore,
KubernetesCheckpointStoreUtil.INSTANCE,
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
stateHandleStore, KubernetesCheckpointStoreUtil.INSTANCE),
executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1521,16 +1521,11 @@ private OptionalLong restoreLatestCheckpointedStateInternal(
}

// We create a new shared state registry object, so that all pending async disposal
// requests from previous
// runs will go against the old object (were they can do no harm).
// This must happen under the checkpoint lock.
// requests from previous runs will go against the old object (were they can do no
// harm). This must happen under the checkpoint lock.
sharedStateRegistry.close();
sharedStateRegistry = sharedStateRegistryFactory.create(executor);

// Recover the checkpoints, TODO this could be done only when there is a new leader, not
// on each recovery
completedCheckpointStore.recover();

// Now, we re-register all (shared) states from the checkpoint store with the new
// registry
for (CompletedCheckpoint completedCheckpoint :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
public interface CheckpointRecoveryFactory {

/**
* Creates a {@link CompletedCheckpointStore} instance for a job.
* Creates a RECOVERED {@link CompletedCheckpointStore} instance for a job. In this context,
* RECOVERED means, that if we already have completed checkpoints from previous runs, we should
* use them as the initial state.
*
* @param jobId Job ID to recover checkpoints for
* @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createCheckpointStore(
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public interface CompletedCheckpointStore {

Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);

/**
* Recover available {@link CompletedCheckpoint} instances.
*
* <p>After a call to this method, {@link #getLatestCheckpoint(boolean)} returns the latest
* available checkpoint.
*/
void recover() throws Exception;

/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
*
Expand Down Expand Up @@ -107,7 +99,7 @@ default long getLatestCheckpointId() {
* or kept.
*
* @param jobStatus Job state on shut down
* @param checkpointsCleaner that will cleanup copmpleted checkpoints if needed
* @param checkpointsCleaner that will cleanup completed checkpoints if needed
*/
void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
public enum DeactivatedCheckpointCompletedCheckpointStore implements CompletedCheckpointStore {
INSTANCE;

@Override
public void recover() throws Exception {
throw unsupportedOperationException();
}

@Override
public void addCheckpoint(
CompletedCheckpoint checkpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,20 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -61,9 +57,6 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
private static final Logger LOG =
LoggerFactory.getLogger(DefaultCompletedCheckpointStore.class);

private static final Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
STRING_COMPARATOR = Comparator.comparing(o -> o.f1);

/** Completed checkpoints state handle store. */
private final StateHandleStore<CompletedCheckpoint, R> checkpointStateHandleStore;

Expand All @@ -81,6 +74,9 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>

private final CheckpointStoreUtil completedCheckpointStoreUtil;

/** False if store has been shutdown. */
private final AtomicBoolean running = new AtomicBoolean(true);

/**
* Creates a {@link DefaultCompletedCheckpointStore} instance.
*
Expand All @@ -95,18 +91,14 @@ public DefaultCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
StateHandleStore<CompletedCheckpoint, R> stateHandleStore,
CheckpointStoreUtil completedCheckpointStoreUtil,
Collection<CompletedCheckpoint> completedCheckpoints,
Executor executor) {

checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");

this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;

this.checkpointStateHandleStore = checkNotNull(stateHandleStore);

this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);

this.completedCheckpoints.addAll(completedCheckpoints);
this.ioExecutor = checkNotNull(executor);

this.completedCheckpointStoreUtil = checkNotNull(completedCheckpointStoreUtil);
}

Expand All @@ -115,55 +107,13 @@ public boolean requiresExternalizedCheckpoints() {
return true;
}

/**
* Recover all the valid checkpoints from state handle store. All the successfully recovered
* checkpoints will be added to {@link #completedCheckpoints} sorted by checkpoint id.
*/
@Override
public void recover() throws Exception {
LOG.info("Recovering checkpoints from {}.", checkpointStateHandleStore);

// Get all there is first
final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints =
checkpointStateHandleStore.getAllAndLock();

initialCheckpoints.sort(STRING_COMPARATOR);

final int numberOfInitialCheckpoints = initialCheckpoints.size();

LOG.info(
"Found {} checkpoints in {}.",
numberOfInitialCheckpoints,
checkpointStateHandleStore);
if (haveAllDownloaded(initialCheckpoints)) {
LOG.info(
"All {} checkpoints found are already downloaded.", numberOfInitialCheckpoints);
return;
}

final List<CompletedCheckpoint> retrievedCheckpoints =
new ArrayList<>(numberOfInitialCheckpoints);
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);

for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
initialCheckpoints) {
retrievedCheckpoints.add(
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
}

// Clear local handles in order to prevent duplicates on recovery. The local handles should
// reflect the state handle store contents.
completedCheckpoints.clear();
completedCheckpoints.addAll(retrievedCheckpoints);
}

/**
* Synchronously writes the new checkpoints to state handle store and asynchronously removes
* older ones.
*
* @param checkpoint Completed checkpoint to add.
* @throws PossibleInconsistentStateException if adding the checkpoint failed and leaving the
* system in an possibly inconsistent state, i.e. it's uncertain whether the checkpoint
* system in a possibly inconsistent state, i.e. it's uncertain whether the checkpoint
* metadata was fully written to the underlying systems or not.
*/
@Override
Expand All @@ -172,13 +122,13 @@ public void addCheckpoint(
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup)
throws Exception {

Preconditions.checkState(running.get(), "Checkpoint store has already been shutdown.");
checkNotNull(checkpoint, "Checkpoint");

final String path =
completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());

// Now add the new one. If it fails, we don't want to loose existing data.
// Now add the new one. If it fails, we don't want to lose existing data.
checkpointStateHandleStore.addAndLock(path, checkpoint);

completedCheckpoints.addLast(checkpoint);
Expand Down Expand Up @@ -214,30 +164,28 @@ public int getMaxNumberOfRetainedCheckpoints() {
@Override
public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner)
throws Exception {
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");

for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try {
tryRemoveCompletedCheckpoint(
checkpoint,
checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
checkpointsCleaner,
() -> {});
} catch (Exception e) {
LOG.warn("Fail to remove checkpoint during shutdown.", e);
if (running.compareAndSet(true, false)) {
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try {
tryRemoveCompletedCheckpoint(
checkpoint,
checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
checkpointsCleaner,
() -> {});
} catch (Exception e) {
LOG.warn("Fail to remove checkpoint during shutdown.", e);
}
}
completedCheckpoints.clear();
checkpointStateHandleStore.clearEntries();
} else {
LOG.info("Suspending");
// Clear the local handles, but don't remove any state
completedCheckpoints.clear();
checkpointStateHandleStore.releaseAll();
}

completedCheckpoints.clear();
checkpointStateHandleStore.clearEntries();
} else {
LOG.info("Suspending");

// Clear the local handles, but don't remove any state
completedCheckpoints.clear();

checkpointStateHandleStore.releaseAll();
}
}

Expand All @@ -257,25 +205,6 @@ private void tryRemoveCompletedCheckpoint(
}
}

private boolean haveAllDownloaded(
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointPointers) {
if (completedCheckpoints.size() != checkpointPointers.size()) {
return false;
}
Set<Long> localIds =
completedCheckpoints.stream()
.map(CompletedCheckpoint::getCheckpointID)
.collect(Collectors.toSet());
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> initialCheckpoint :
checkpointPointers) {
if (!localIds.contains(
completedCheckpointStoreUtil.nameToCheckpointID(initialCheckpoint.f1))) {
return false;
}
}
return true;
}

/**
* Tries to remove the checkpoint identified by the given checkpoint id.
*
Expand All @@ -286,34 +215,4 @@ private boolean tryRemove(long checkpointId) throws Exception {
return checkpointStateHandleStore.releaseAndTryRemove(
completedCheckpointStoreUtil.checkpointIDToName(checkpointId));
}

private CompletedCheckpoint retrieveCompletedCheckpoint(
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle)
throws FlinkException {
long checkpointId = completedCheckpointStoreUtil.nameToCheckpointID(stateHandle.f1);

LOG.info("Trying to retrieve checkpoint {}.", checkpointId);

try {
return stateHandle.f0.retrieveState();
} catch (ClassNotFoundException cnfe) {
throw new FlinkException(
"Could not retrieve checkpoint "
+ checkpointId
+ " from state handle under "
+ stateHandle.f1
+ ". This indicates that you are trying to recover from state written by an "
+ "older Flink version which is not compatible. Try cleaning the state handle store.",
cnfe);
} catch (IOException ioe) {
throw new FlinkException(
"Could not retrieve checkpoint "
+ checkpointId
+ " from state handle under "
+ stateHandle.f1
+ ". This indicates that the retrieved state handle is broken. Try cleaning the "
+ "state handle store.",
ioe);
}
}
}
Loading

0 comments on commit f64261c

Please sign in to comment.