Skip to content

Commit

Permalink
[FLINK-8539] [checkpointing] (part 1) Introduce CompletedCheckpointSt…
Browse files Browse the repository at this point in the history
…orageLocation to properly handle disposal of checkpoints.

That concept allows us to properly handle deletion of a checkpoint storage, for example deleting checkpoint
directories, or the dropping of a checkpoint specific table.

This replaces the current workaround for file systems, where every file disposal checks if the parent
directory is now empty, and deletes it if that is the case. That is not only inefficient, but
prohibitively expensive on some systems, like Amazon S3.
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent b4675f2 commit f9dd19b
Show file tree
Hide file tree
Showing 19 changed files with 396 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.util.AbstractID;

Expand Down Expand Up @@ -374,7 +374,7 @@ private File getNextStoragePath() {
// ------------------------------------------------------------------------

@Override
public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
return checkpointStreamBackend.resolveCheckpoint(pointer);
}

Expand Down
19 changes: 19 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ else if (t instanceof Exception) {
}
}

/**
* Throws the given {@code Throwable} in scenarios where the signatures do allow to
* throw a Exception. Errors and Exceptions are thrown directly, other "exotic"
* subclasses of Throwable are wrapped in an Exception.
*
* @param t The throwable to be thrown.
*/
public static void rethrowException(Throwable t) throws Exception {
if (t instanceof Error) {
throw (Error) t;
}
else if (t instanceof Exception) {
throw (Exception) t;
}
else {
throw new Exception(t.getMessage(), t);
}
}

/**
* Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions
* (and RuntimeException and Error). Throws this exception directly, if it is an IOException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -1080,11 +1080,11 @@ public boolean restoreSavepoint(
LOG.info("Starting job from savepoint {} ({})",
savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));

final StreamStateHandle metadataHandle = checkpointStorage.resolveCheckpoint(savepointPointer);
final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);

// Load the savepoint as a checkpoint into the system
CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
job, tasks, savepointPointer, metadataHandle, userClassLoader, allowNonRestored);
job, tasks, checkpointLocation, userClassLoader, allowNonRestored);

completedCheckpointStore.addCheckpoint(savepoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,30 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -125,20 +123,21 @@ public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader c
public static CompletedCheckpoint loadAndValidateCheckpoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
String checkpointPointer,
StreamStateHandle metadataHandle,
CompletedCheckpointStorageLocation location,
ClassLoader classLoader,
boolean allowNonRestoredState) throws IOException {

checkNotNull(jobId, "jobId");
checkNotNull(tasks, "tasks");
checkNotNull(checkpointPointer, "checkpointPointer");
checkNotNull(metadataHandle, "metadataHandle");
checkNotNull(location, "location");
checkNotNull(classLoader, "classLoader");

final StreamStateHandle metadataHandle = location.getMetadataHandle();
final String checkpointPointer = location.getExternalPointer();

// (1) load the savepoint
final Savepoint rawCheckpointMetadata;
try (FSDataInputStream in = metadataHandle.openInputStream()) {
try (InputStream in = metadataHandle.openInputStream()) {
DataInputStream dis = new DataInputStream(in);
rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
}
Expand Down Expand Up @@ -221,8 +220,7 @@ public static CompletedCheckpoint loadAndValidateCheckpoint(
operatorStates,
checkpointMetadata.getMasterStates(),
props,
metadataHandle,
checkpointPointer);
location);
}

// ------------------------------------------------------------------------
Expand All @@ -238,12 +236,14 @@ public static void disposeSavepoint(
checkNotNull(stateBackend, "stateBackend");
checkNotNull(classLoader, "classLoader");

final StreamStateHandle metadataHandle = stateBackend.resolveCheckpoint(pointer);
final CompletedCheckpointStorageLocation checkpointLocation = stateBackend.resolveCheckpoint(pointer);

final StreamStateHandle metadataHandle = checkpointLocation.getMetadataHandle();

// load the savepoint object (the metadata) to have all the state handles that we need
// to dispose of all state
final Savepoint savepoint;
try (FSDataInputStream in = metadataHandle.openInputStream();
try (InputStream in = metadataHandle.openInputStream();
DataInputStream dis = new DataInputStream(in)) {

savepoint = loadCheckpointMetadata(dis, classLoader);
Expand All @@ -268,13 +268,15 @@ public static void disposeSavepoint(
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

// until we have the proper hooks to delete full directories via the checkpoint storage,
// we need to have a special case here to remove the empty directory
if (stateBackend instanceof AbstractFileStateBackend && metadataHandle instanceof FileStateHandle) {
Path dir = ((FileStateHandle) metadataHandle).getFilePath().getParent();
FileUtils.deletePathIfEmpty(dir.getFileSystem(), dir);
// now dispose the location (directory, table, whatever)
try {
checkpointLocation.disposeStorageLocation();
}
catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

// forward exceptions caught in the process
if (exception != null) {
ExceptionUtils.rethrowIOException(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class CompletedCheckpoint implements Serializable {
/** States that were created by a hook on the master (in the checkpoint coordinator). */
private final Collection<MasterState> masterHookStates;

/** The location where the checkpoint is stored. */
private final CompletedCheckpointStorageLocation storageLocation;

/** The state handle to the externalized meta data. */
private final StreamStateHandle metadataHandle;

Expand All @@ -112,8 +116,7 @@ public CompletedCheckpoint(
Map<OperatorID, OperatorState> operatorStates,
@Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props,
StreamStateHandle metadataHandle,
String externalPointer) {
CompletedCheckpointStorageLocation storageLocation) {

checkArgument(checkpointID >= 0);
checkArgument(timestamp >= 0);
Expand All @@ -132,8 +135,9 @@ public CompletedCheckpoint(
new ArrayList<>(masterHookStates);

this.props = checkNotNull(props);
this.metadataHandle = checkNotNull(metadataHandle);
this.externalPointer = checkNotNull(externalPointer);
this.storageLocation = checkNotNull(storageLocation);
this.metadataHandle = storageLocation.getMetadataHandle();
this.externalPointer = storageLocation.getExternalPointer();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -253,6 +257,14 @@ private void doDiscard() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

// discard location as a whole
try {
storageLocation.disposeStorageLocation();
}
catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -253,15 +253,13 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException {
try {
// write out the metadata
final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState);
final StreamStateHandle metadataHandle;
final CompletedCheckpointStorageLocation finalizedLocation;

try (CheckpointStateOutputStream out = targetLocation.createMetadataOutputStream()) {
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
metadataHandle = out.closeAndGetHandle();
finalizedLocation = out.closeAndFinalizeCheckpoint();
}

final String externalPointer = targetLocation.markCheckpointAsFinished();

CompletedCheckpoint completed = new CompletedCheckpoint(
jobId,
checkpointId,
Expand All @@ -270,8 +268,7 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException {
operatorStates,
masterState,
props,
metadataHandle,
externalPointer);
finalizedLocation);

onCompletionPromise.complete(completed);

Expand All @@ -281,7 +278,7 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
CompletedCheckpointStats.DiscardCallback discardCallback =
statsCallback.reportCompletedCheckpoint(externalPointer);
statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
completed.setDiscardCallback(discardCallback);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;

public abstract class CheckpointMetadataOutputStream extends FSDataOutputStream {

public abstract CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException;

/**
* This method should close the stream, if has not been closed before.
* If this method actually closes the stream, it should delete/release the
* resource behind the stream, such as the file that the stream writes to.
*
* <p>The above implies that this method is intended to be the "unsuccessful close",
* such as when cancelling the stream writing, or when an exception occurs.
* Closing the stream for the successful case must go through {@link #closeAndFinalizeCheckpoint()}.
*
* @throws IOException Thrown, if the stream cannot be closed.
*/
@Override
public abstract void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,19 @@ public interface CheckpointStorage {
boolean hasDefaultSavepointLocation();

/**
* Resolves the given pointer to a checkpoint/savepoint into a state handle from which the
* checkpoint metadata can be read. If the state backend cannot understand the format of
* the pointer (for example because it was created by a different state backend) this method
* should throw an {@code IOException}.
* Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location
* supports reading the checkpoint metadata, or disposing the checkpoint storage location.
*
* @param pointer The pointer to resolve.
* @return The state handler from which one can read the checkpoint metadata.
* <p>If the state backend cannot understand the format of the pointer (for example because it
* was created by a different state backend) this method should throw an {@code IOException}.
*
* @param externalPointer The external checkpoint pointer to resolve.
* @return The checkpoint location handle.
*
* @throws IOException Thrown, if the state backend does not understand the pointer, or if
* the pointer could not be resolved due to an I/O error.
*/
StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

/**
* Initializes a storage location for new checkpoint with the given ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,7 @@ public interface CheckpointStorageLocation extends CheckpointStreamFactory {
* @return The output stream to persist the checkpoint metadata to.
* @throws IOException Thrown, if the stream cannot be opened due to an I/O error.
*/
CheckpointStateOutputStream createMetadataOutputStream() throws IOException;

/**
* Finalizes the checkpoint, marking the location as a finished checkpoint.
* This method returns the external checkpoint pointer that can be used to resolve
* the checkpoint upon recovery.
*
* @return The external pointer to the checkpoint at this location.
* @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error.
*/
String markCheckpointAsFinished() throws IOException;
CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

/**
* Disposes the checkpoint location in case the checkpoint has failed.
Expand Down
Loading

0 comments on commit f9dd19b

Please sign in to comment.