From f9dd19b584fca5932594392b0afc9f1d0eec7f1a Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 31 Jan 2018 16:54:58 +0100 Subject: [PATCH] [FLINK-8539] [checkpointing] (part 1) Introduce CompletedCheckpointStorageLocation to properly handle disposal of checkpoints. That concept allows us to properly handle deletion of a checkpoint storage, for example deleting checkpoint directories, or the dropping of a checkpoint specific table. This replaces the current workaround for file systems, where every file disposal checks if the parent directory is now empty, and deletes it if that is the case. That is not only inefficient, but prohibitively expensive on some systems, like Amazon S3. --- .../streaming/state/RocksDBStateBackend.java | 4 +- .../org/apache/flink/util/ExceptionUtils.java | 19 ++++ .../checkpoint/CheckpointCoordinator.java | 6 +- .../flink/runtime/checkpoint/Checkpoints.java | 40 +++---- .../checkpoint/CompletedCheckpoint.java | 20 +++- .../runtime/checkpoint/PendingCheckpoint.java | 17 ++- .../state/CheckpointMetadataOutputStream.java | 42 +++++++ .../runtime/state/CheckpointStorage.java | 15 +-- .../state/CheckpointStorageLocation.java | 12 +- .../CompletedCheckpointStorageLocation.java | 48 ++++++++ .../state/IncrementalKeyedStateHandle.java | 4 + .../flink/runtime/state/StateBackend.java | 15 +-- .../filesystem/AbstractFileStateBackend.java | 4 +- .../AbstractFsCheckpointStorage.java | 20 +++- .../FixFileFsStateOutputStream.java | 40 ++++--- .../FsCheckpointStorageLocation.java | 10 +- .../FsCompletedCheckpointStorageLocation.java | 72 ++++++++++++ ...tentMetadataCheckpointStorageLocation.java | 106 ++++++++++++++++-- ...tentMetadataCheckpointStorageLocation.java | 10 +- 19 files changed, 396 insertions(+), 108 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/CompletedCheckpointStorageLocation.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 5c389485119c5..b7e47948c7435 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -29,12 +29,12 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; @@ -374,7 +374,7 @@ private File getNextStoragePath() { // ------------------------------------------------------------------------ @Override - public StreamStateHandle resolveCheckpoint(String pointer) throws IOException { + public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { return checkpointStreamBackend.resolveCheckpoint(pointer); } diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index ca55c5b7fc14c..42deb69cf3632 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -236,6 +236,25 @@ else if (t instanceof Exception) { } } + /** + * Throws the given {@code Throwable} in scenarios where the signatures do allow to + * throw a Exception. Errors and Exceptions are thrown directly, other "exotic" + * subclasses of Throwable are wrapped in an Exception. + * + * @param t The throwable to be thrown. + */ + public static void rethrowException(Throwable t) throws Exception { + if (t instanceof Error) { + throw (Error) t; + } + else if (t instanceof Exception) { + throw (Exception) t; + } + else { + throw new Exception(t.getMessage(), t); + } + } + /** * Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions * (and RuntimeException and Error). Throws this exception directly, if it is an IOException, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 18787f7cf7f8d..ed3570ac18776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -36,10 +36,10 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -1080,11 +1080,11 @@ public boolean restoreSavepoint( LOG.info("Starting job from savepoint {} ({})", savepointPointer, (allowNonRestored ? "allowing non restored state" : "")); - final StreamStateHandle metadataHandle = checkpointStorage.resolveCheckpoint(savepointPointer); + final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer); // Load the savepoint as a checkpoint into the system CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint( - job, tasks, savepointPointer, metadataHandle, userClassLoader, allowNonRestored); + job, tasks, checkpointLocation, userClassLoader, allowNonRestored); completedCheckpointStore.addCheckpoint(savepoint); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index bfa7d453d3079..47efa6facf65c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -20,8 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; @@ -29,23 +27,23 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; @@ -125,20 +123,21 @@ public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader c public static CompletedCheckpoint loadAndValidateCheckpoint( JobID jobId, Map tasks, - String checkpointPointer, - StreamStateHandle metadataHandle, + CompletedCheckpointStorageLocation location, ClassLoader classLoader, boolean allowNonRestoredState) throws IOException { checkNotNull(jobId, "jobId"); checkNotNull(tasks, "tasks"); - checkNotNull(checkpointPointer, "checkpointPointer"); - checkNotNull(metadataHandle, "metadataHandle"); + checkNotNull(location, "location"); checkNotNull(classLoader, "classLoader"); + final StreamStateHandle metadataHandle = location.getMetadataHandle(); + final String checkpointPointer = location.getExternalPointer(); + // (1) load the savepoint final Savepoint rawCheckpointMetadata; - try (FSDataInputStream in = metadataHandle.openInputStream()) { + try (InputStream in = metadataHandle.openInputStream()) { DataInputStream dis = new DataInputStream(in); rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader); } @@ -221,8 +220,7 @@ public static CompletedCheckpoint loadAndValidateCheckpoint( operatorStates, checkpointMetadata.getMasterStates(), props, - metadataHandle, - checkpointPointer); + location); } // ------------------------------------------------------------------------ @@ -238,12 +236,14 @@ public static void disposeSavepoint( checkNotNull(stateBackend, "stateBackend"); checkNotNull(classLoader, "classLoader"); - final StreamStateHandle metadataHandle = stateBackend.resolveCheckpoint(pointer); + final CompletedCheckpointStorageLocation checkpointLocation = stateBackend.resolveCheckpoint(pointer); + + final StreamStateHandle metadataHandle = checkpointLocation.getMetadataHandle(); // load the savepoint object (the metadata) to have all the state handles that we need // to dispose of all state final Savepoint savepoint; - try (FSDataInputStream in = metadataHandle.openInputStream(); + try (InputStream in = metadataHandle.openInputStream(); DataInputStream dis = new DataInputStream(in)) { savepoint = loadCheckpointMetadata(dis, classLoader); @@ -268,13 +268,15 @@ public static void disposeSavepoint( exception = ExceptionUtils.firstOrSuppressed(e, exception); } - // until we have the proper hooks to delete full directories via the checkpoint storage, - // we need to have a special case here to remove the empty directory - if (stateBackend instanceof AbstractFileStateBackend && metadataHandle instanceof FileStateHandle) { - Path dir = ((FileStateHandle) metadataHandle).getFilePath().getParent(); - FileUtils.deletePathIfEmpty(dir.getFileSystem(), dir); + // now dispose the location (directory, table, whatever) + try { + checkpointLocation.disposeStorageLocation(); + } + catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); } + // forward exceptions caught in the process if (exception != null) { ExceptionUtils.rethrowIOException(exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 207c9179ca77d..df8c233ada527 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; @@ -92,6 +93,9 @@ public class CompletedCheckpoint implements Serializable { /** States that were created by a hook on the master (in the checkpoint coordinator). */ private final Collection masterHookStates; + /** The location where the checkpoint is stored. */ + private final CompletedCheckpointStorageLocation storageLocation; + /** The state handle to the externalized meta data. */ private final StreamStateHandle metadataHandle; @@ -112,8 +116,7 @@ public CompletedCheckpoint( Map operatorStates, @Nullable Collection masterHookStates, CheckpointProperties props, - StreamStateHandle metadataHandle, - String externalPointer) { + CompletedCheckpointStorageLocation storageLocation) { checkArgument(checkpointID >= 0); checkArgument(timestamp >= 0); @@ -132,8 +135,9 @@ public CompletedCheckpoint( new ArrayList<>(masterHookStates); this.props = checkNotNull(props); - this.metadataHandle = checkNotNull(metadataHandle); - this.externalPointer = checkNotNull(externalPointer); + this.storageLocation = checkNotNull(storageLocation); + this.metadataHandle = storageLocation.getMetadataHandle(); + this.externalPointer = storageLocation.getExternalPointer(); } // ------------------------------------------------------------------------ @@ -253,6 +257,14 @@ private void doDiscard() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); } + // discard location as a whole + try { + storageLocation.disposeStorageLocation(); + } + catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + if (exception != null) { throw exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 5e8559d225d1e..1b51ac4bf8d3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -24,10 +24,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -253,15 +253,13 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException { try { // write out the metadata final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState); - final StreamStateHandle metadataHandle; + final CompletedCheckpointStorageLocation finalizedLocation; - try (CheckpointStateOutputStream out = targetLocation.createMetadataOutputStream()) { + try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) { Checkpoints.storeCheckpointMetadata(savepoint, out); - metadataHandle = out.closeAndGetHandle(); + finalizedLocation = out.closeAndFinalizeCheckpoint(); } - final String externalPointer = targetLocation.markCheckpointAsFinished(); - CompletedCheckpoint completed = new CompletedCheckpoint( jobId, checkpointId, @@ -270,8 +268,7 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException { operatorStates, masterState, props, - metadataHandle, - externalPointer); + finalizedLocation); onCompletionPromise.complete(completed); @@ -281,7 +278,7 @@ public CompletedCheckpoint finalizeCheckpoint() throws IOException { // Finalize the statsCallback and give the completed checkpoint a // callback for discards. CompletedCheckpointStats.DiscardCallback discardCallback = - statsCallback.reportCompletedCheckpoint(externalPointer); + statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer()); completed.setDiscardCallback(discardCallback); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java new file mode 100644 index 0000000000000..4180e884ece33 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; + +public abstract class CheckpointMetadataOutputStream extends FSDataOutputStream { + + public abstract CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException; + + /** + * This method should close the stream, if has not been closed before. + * If this method actually closes the stream, it should delete/release the + * resource behind the stream, such as the file that the stream writes to. + * + *

The above implies that this method is intended to be the "unsuccessful close", + * such as when cancelling the stream writing, or when an exception occurs. + * Closing the stream for the successful case must go through {@link #closeAndFinalizeCheckpoint()}. + * + * @throws IOException Thrown, if the stream cannot be closed. + */ + @Override + public abstract void close() throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java index 4a1e906d9512e..0f8aa692340ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java @@ -45,18 +45,19 @@ public interface CheckpointStorage { boolean hasDefaultSavepointLocation(); /** - * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the - * checkpoint metadata can be read. If the state backend cannot understand the format of - * the pointer (for example because it was created by a different state backend) this method - * should throw an {@code IOException}. + * Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location + * supports reading the checkpoint metadata, or disposing the checkpoint storage location. * - * @param pointer The pointer to resolve. - * @return The state handler from which one can read the checkpoint metadata. + *

If the state backend cannot understand the format of the pointer (for example because it + * was created by a different state backend) this method should throw an {@code IOException}. + * + * @param externalPointer The external checkpoint pointer to resolve. + * @return The checkpoint location handle. * * @throws IOException Thrown, if the state backend does not understand the pointer, or if * the pointer could not be resolved due to an I/O error. */ - StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; /** * Initializes a storage location for new checkpoint with the given ID. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java index 246717a2937e1..74e7b08186f17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java @@ -36,17 +36,7 @@ public interface CheckpointStorageLocation extends CheckpointStreamFactory { * @return The output stream to persist the checkpoint metadata to. * @throws IOException Thrown, if the stream cannot be opened due to an I/O error. */ - CheckpointStateOutputStream createMetadataOutputStream() throws IOException; - - /** - * Finalizes the checkpoint, marking the location as a finished checkpoint. - * This method returns the external checkpoint pointer that can be used to resolve - * the checkpoint upon recovery. - * - * @return The external pointer to the checkpoint at this location. - * @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error. - */ - String markCheckpointAsFinished() throws IOException; + CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; /** * Disposes the checkpoint location in case the checkpoint has failed. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompletedCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompletedCheckpointStorageLocation.java new file mode 100644 index 0000000000000..6fa303db6a1c2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompletedCheckpointStorageLocation.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.IOException; + +/** + * The CompletedCheckpointStorageLocation describes the storage aspect of a completed checkpoint. + * It can be used to obtain access to the metadata, get a reference pointer to the checkpoint, + * or to dispose the storage location. + */ +public interface CompletedCheckpointStorageLocation extends java.io.Serializable { + + /** + * Gets the external pointer to the checkpoint. The pointer can be used to resume + * a program from the savepoint or checkpoint, and is typically passed as a command + * line argument, an HTTP request parameter, or stored in a system like ZooKeeper. + */ + String getExternalPointer(); + + /** + * Gets the state handle to the checkpoint's metadata. + */ + StreamStateHandle getMetadataHandle(); + + /** + * Disposes the storage location. This method should be called after all state objects have + * been released. It typically disposes the base structure of the checkpoint storage, + * like the checkpoint directory. + */ + void disposeStorageLocation() throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 0268b102be885..01d4ac0132ed7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -141,6 +141,10 @@ public UUID getBackendIdentifier() { return backendIdentifier; } + public SharedStateRegistry getSharedStateRegistry() { + return sharedStateRegistry; + } + @Override public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ? diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index 8d17b6420f35f..f34cd9b63a58f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -86,18 +86,19 @@ public interface StateBackend extends java.io.Serializable { // ------------------------------------------------------------------------ /** - * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the - * checkpoint metadata can be read. If the state backend cannot understand the format of - * the pointer (for example because it was created by a different state backend) this method - * should throw an {@code IOException}. + * Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location + * supports reading the checkpoint metadata, or disposing the checkpoint storage location. * - * @param pointer The pointer to resolve. - * @return The state handler from which one can read the checkpoint metadata. + *

If the state backend cannot understand the format of the pointer (for example because it + * was created by a different state backend) this method should throw an {@code IOException}. + * + * @param externalPointer The external checkpoint pointer to resolve. + * @return The checkpoint location handle. * * @throws IOException Thrown, if the state backend does not understand the pointer, or if * the pointer could not be resolved due to an I/O error. */ - StreamStateHandle resolveCheckpoint(String pointer) throws IOException; + CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; /** * Creates a storage for checkpoints for the given job. The checkpoint storage is diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java index 0b622c9dc8b60..0929504b2b0e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import javax.annotation.Nullable; @@ -161,7 +161,7 @@ public Path getSavepointPath() { // ------------------------------------------------------------------------ @Override - public StreamStateHandle resolveCheckpoint(String pointer) throws IOException { + public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java index 054e636250472..3e57de6a52f1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.util.FileUtils; import javax.annotation.Nullable; @@ -105,7 +105,7 @@ public boolean hasDefaultSavepointLocation() { } @Override - public StreamStateHandle resolveCheckpoint(String checkpointPointer) throws IOException { + public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException { return resolveCheckpointPointer(checkpointPointer); } @@ -208,7 +208,7 @@ protected static Path createCheckpointDirectory(Path baseDirectory, long checkpo * @throws IOException Thrown, if the pointer cannot be resolved, the file system not accessed, or * the pointer points to a location that does not seem to be a checkpoint/savepoint. */ - protected static StreamStateHandle resolveCheckpointPointer(String checkpointPointer) throws IOException { + protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException { checkNotNull(checkpointPointer, "checkpointPointer"); checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer"); @@ -242,10 +242,12 @@ protected static StreamStateHandle resolveCheckpointPointer(String checkpointPoi } // if we are here, the file / directory exists + final Path checkpointDir; final FileStatus metadataFileStatus; // If this is a directory, we need to find the meta data file if (status.isDir()) { + checkpointDir = status.getPath(); final Path metadataFilePath = new Path(path, METADATA_FILE_NAME); try { metadataFileStatus = fs.getFileStatus(metadataFilePath); @@ -260,9 +262,19 @@ protected static StreamStateHandle resolveCheckpointPointer(String checkpointPoi // this points to a file and we either do no name validation, or // the name is actually correct, so we can return the path metadataFileStatus = status; + checkpointDir = status.getPath().getParent(); } - return new FileStateHandle(metadataFileStatus.getPath(), metadataFileStatus.getLen()); + final FileStateHandle metaDataFileHandle = new FileStateHandle( + metadataFileStatus.getPath(), metadataFileStatus.getLen()); + + final String pointer = checkpointDir.makeQualified(fs).toString(); + + return new FsCompletedCheckpointStorageLocation( + fs, + checkpointDir, + metaDataFileHandle, + pointer); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java index bc1ca014067bc..046863bb3179f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java @@ -22,12 +22,14 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; + import java.io.IOException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -38,9 +40,9 @@ * *

Unlike the {@link org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream}, * this stream does not have a threshold below which it returns a memory byte stream handle, - * and does not create random files, but writes to a specified file. + * and does not create random files, but writes to a specified file. */ -public final class FixFileFsStateOutputStream extends CheckpointStateOutputStream { +public final class FixFileFsStateOutputStream extends CheckpointMetadataOutputStream { private static final Logger LOG = LoggerFactory.getLogger(FixFileFsStateOutputStream.class); @@ -48,18 +50,24 @@ public final class FixFileFsStateOutputStream extends CheckpointStateOutputStrea private final FSDataOutputStream out; - private final Path path; + private final Path metadataFilePath; + + private final Path exclusiveCheckpointDir; private final FileSystem fileSystem; private volatile boolean closed; + public FixFileFsStateOutputStream( + FileSystem fileSystem, + Path metadataFilePath, + Path exclusiveCheckpointDir) throws IOException { - public FixFileFsStateOutputStream(FileSystem fileSystem, Path path) throws IOException { this.fileSystem = checkNotNull(fileSystem); - this.path = checkNotNull(path); + this.metadataFilePath = checkNotNull(metadataFilePath); + this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir); - this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE); + this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE); } // ------------------------------------------------------------------------ @@ -106,16 +114,16 @@ public void close() { try { out.close(); - fileSystem.delete(path, false); + fileSystem.delete(metadataFilePath, false); } catch (Throwable t) { - LOG.warn("Could not close the state stream for {}.", path, t); + LOG.warn("Could not close the state stream for {}.", metadataFilePath, t); } } } @Override - public FileStateHandle closeAndGetHandle() throws IOException { + public FsCompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { try { @@ -125,21 +133,23 @@ public FileStateHandle closeAndGetHandle() throws IOException { size = out.getPos(); } catch (Exception ignored) {} - // close and return out.close(); - return new FileStateHandle(path, size); + FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size); + + return new FsCompletedCheckpointStorageLocation( + fileSystem, exclusiveCheckpointDir, metaDataHandle, exclusiveCheckpointDir.toString()); } catch (Exception e) { try { - fileSystem.delete(path, false); + fileSystem.delete(metadataFilePath, false); } catch (Exception deleteException) { - LOG.warn("Could not delete the checkpoint stream file {}.", path, deleteException); + LOG.warn("Could not delete the checkpoint stream file {}.", metadataFilePath, deleteException); } throw new IOException("Could not flush and close the file system " + - "output stream to " + path + " in order to obtain the " + + "output stream to " + metadataFilePath + " in order to obtain the " + "stream state handle", e); } finally { @@ -151,4 +161,4 @@ public FileStateHandle closeAndGetHandle() throws IOException { } } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java index c05ae5ee032d2..48d0e8b136b92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java @@ -20,6 +20,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -94,13 +95,8 @@ public Path getMetadataFilePath() { // ------------------------------------------------------------------------ @Override - public CheckpointStateOutputStream createMetadataOutputStream() throws IOException { - return new FixFileFsStateOutputStream(fileSystem, metadataFilePath); - } - - @Override - public String markCheckpointAsFinished() throws IOException { - return checkpointDirectory.toString(); + public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { + return new FixFileFsStateOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java new file mode 100644 index 0000000000000..8682860c6a2dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; + +import java.io.IOException; + +/** + * A {@link CompletedCheckpointStorageLocation} that resides on a file system. + * This locattion is internally represented through the checkpoint directory plus the metadata file. + */ +public class FsCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { + + private static final long serialVersionUID = 1L; + + private final Path exclusiveCheckpointDir; + + private final FileStateHandle metadataFileHandle; + + private final String externalPointer; + + private transient FileSystem fs; + + public FsCompletedCheckpointStorageLocation( + FileSystem fs, + Path exclusiveCheckpointDir, + FileStateHandle metadataFileHandle, + String externalPointer) { + + this.fs = fs; + this.exclusiveCheckpointDir = exclusiveCheckpointDir; + this.metadataFileHandle = metadataFileHandle; + this.externalPointer = externalPointer; + } + + @Override + public String getExternalPointer() { + return externalPointer; + } + + @Override + public FileStateHandle getMetadataHandle() { + return metadataFileHandle; + } + + @Override + public void disposeStorageLocation() throws IOException { + if (fs == null) { + fs = exclusiveCheckpointDir.getFileSystem(); + } + fs.delete(exclusiveCheckpointDir, false); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java index 0582aefa26138..3112b5b9fd6ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java @@ -18,10 +18,15 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.StreamStateHandle; import java.io.IOException; +import java.util.UUID; /** * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence @@ -34,21 +39,13 @@ public class NonPersistentMetadataCheckpointStorageLocation /** The external pointer returned for checkpoints that are not externally addressable. */ public static final String EXTERNAL_POINTER = ""; - /** The maximum serialized state size for the checkpoint metadata. */ - private static final int MAX_METADATA_STATE_SIZE = Integer.MAX_VALUE; - public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) { super(maxStateSize); } @Override - public CheckpointStateOutputStream createMetadataOutputStream() throws IOException { - return new MemoryCheckpointOutputStream(MAX_METADATA_STATE_SIZE); - } - - @Override - public String markCheckpointAsFinished() { - return EXTERNAL_POINTER; + public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { + return new MetadataOutputStream(); } @Override @@ -58,4 +55,93 @@ public void disposeOnFailure() {} public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } + + // ------------------------------------------------------------------------ + // CompletedCheckpointStorageLocation + // ------------------------------------------------------------------------ + + /** + * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the + * metadata in an internal byte array. + */ + private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { + + private static final long serialVersionUID = 1L; + + private final ByteStreamStateHandle metaDataHandle; + + NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) { + this.metaDataHandle = metaDataHandle; + } + + @Override + public String getExternalPointer() { + return EXTERNAL_POINTER; + } + + @Override + public StreamStateHandle getMetadataHandle() { + return metaDataHandle; + } + + @Override + public void disposeStorageLocation() {} + } + + // ------------------------------------------------------------------------ + // CheckpointMetadataOutputStream + // ------------------------------------------------------------------------ + + private static class MetadataOutputStream extends CheckpointMetadataOutputStream { + + private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); + + private boolean closed; + + @Override + public void write(int b) throws IOException { + os.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + os.write(b, off, len); + } + + @Override + public void flush() throws IOException { + os.flush(); + } + + @Override + public long getPos() throws IOException { + return os.getPosition(); + } + + @Override + public void sync() throws IOException { } + + @Override + public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { + synchronized (this) { + if (!closed) { + closed = true; + + byte[] bytes = os.toByteArray(); + ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes); + return new NonPersistentCompletedCheckpointStorageLocation(handle); + } else { + throw new IOException("Already closed"); + } + } + } + + @Override + public void close() { + if (!closed) { + closed = true; + os.reset(); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java index 6da8191aa9878..c9b5068479b3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java @@ -20,6 +20,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorageLocation; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage; @@ -65,13 +66,8 @@ public PersistentMetadataCheckpointStorageLocation( // ------------------------------------------------------------------------ @Override - public CheckpointStateOutputStream createMetadataOutputStream() throws IOException { - return new FixFileFsStateOutputStream(fileSystem, metadataFilePath); - } - - @Override - public String markCheckpointAsFinished() throws IOException { - return checkpointDirectory.toString(); + public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { + return new FixFileFsStateOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override