diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java index aeab132053b4d..7669bb5103557 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java @@ -24,9 +24,10 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator; import org.apache.flink.streaming.api.TimeDomain; @@ -121,7 +122,8 @@ private KeyedOneInputStreamOperatorTestHarness getHarness( new KeyedOneInputStreamOperatorTestHarness<>( bootstrapOperator, id -> id, Types.LONG, 128, 1, 0); - harness.setStateBackend(new RocksDBStateBackend(folder.newFolder().toURI())); + harness.setStateBackend(new EmbeddedRocksDBStateBackend()); + harness.setCheckpointStorage(new FileSystemCheckpointStorage(folder.newFolder().toURI())); if (state != null) { harness.initializeState(state); } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index 1131b9b98facc..ef89c5f8d0209 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -44,7 +43,7 @@ import java.nio.file.Path; -/** Several integration tests for queryable state using the {@link FsStateBackend}. */ +/** Several integration tests for queryable state. */ class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index 0c4d7362c71f8..94fa34e3f18a1 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -44,7 +43,10 @@ import java.nio.file.Path; -/** Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ +/** + * Several integration tests for queryable state using the {@link + * org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend}. + */ class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase { // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index 31061e667c37f..3fcd20d22733d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.CheckpointStorageUtils; @@ -40,7 +39,7 @@ import java.nio.file.Path; -/** Several integration tests for queryable state using the {@link FsStateBackend}. */ +/** Several integration tests for queryable state. */ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index f27d085ea3002..5c15392a67046 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,7 +39,10 @@ import java.nio.file.Path; -/** Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ +/** + * Several integration tests for queryable state using the {@link + * org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend}. + */ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase { // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java index 8967a91737117..659e6d044566c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java @@ -34,8 +34,7 @@ import java.net.URI; /** - * A base class for all state backends that store their metadata (and data) in files. Examples that - * inherit from this are the {@link FsStateBackend} or the {@code RocksDBStateBackend}. + * A base class for all state backends that store their metadata (and data) in files. * *

This class takes the base checkpoint- and savepoint directory paths, but also accepts null for * both of then, in which case creating externalized checkpoint is not possible, and it is not diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java deleted file mode 100644 index ec219432faae4..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ /dev/null @@ -1,608 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.BackendBuildingException; -import org.apache.flink.runtime.state.CheckpointStorageAccess; -import org.apache.flink.runtime.state.ConfigurableStateBackend; -import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; -import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.TaskStateManager; -import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder; -import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; -import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.TernaryBoolean; - -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.IOException; -import java.net.URI; - -import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * IMPORTANT {@link FsStateBackend} is deprecated in favor of {@link - * org.apache.flink.runtime.state.hashmap.HashMapStateBackend} and {@link - * org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage}. This change does not affect - * the runtime characteristics of your Jobs and is simply an API change to help better communicate - * the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without - * loss of state. If configuring your state backend via the {@code StreamExecutionEnvironment} - * please make the following changes. - * - *

{@code
- * 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * 		env.setStateBackend(new HashMapStateBackend());
- * 		env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
- * }
- * - *

If you are configuring your state backend via the {@code config.yaml} please make the - * following changes set your state backend type to "hashmap" {@code state.backend.type: hashmap}. - * - *

This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The - * state backend checkpoints state as files to a file system (hence the backend's name). - * - *

Each checkpoint individually will store all its files in a subdirectory that includes the - * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}. - * - *

State Size Considerations

- * - *

Working state is kept on the TaskManager heap. If a TaskManager executes multiple tasks - * concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) then the - * aggregate state of all tasks needs to fit into that TaskManager's memory. - * - *

This state backend stores small state chunks directly with the metadata, to avoid creating - * many small files. The threshold for that is configurable. When increasing this threshold, the - * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed - * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, - * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly. - * - *

Persistence Guarantees

- * - *

Checkpoints from this state backend are as persistent and available as filesystem that is - * written to. If the file system is a persistent distributed file system, this state backend - * supports highly available setups. The backend additionally supports savepoints and externalized - * checkpoints. - * - *

Configuration

- * - *

As for all state backends, this backend can either be configured within the application (by - * creating the backend with the respective constructor parameters and setting it on the execution - * environment) or by specifying it in the Flink configuration. - * - *

If the state backend was specified in the application, it may pick up additional configuration - * parameters from the Flink configuration. For example, if the backend if configured in the - * application without a default savepoint directory, it will pick up a default savepoint directory - * specified in the Flink configuration of the running job/cluster. That behavior is implemented via - * the {@link #configure(ReadableConfig, ClassLoader)} method. - */ -@Deprecated -@PublicEvolving -public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { - - private static final long serialVersionUID = -8191916350224044011L; - - /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; - - // ------------------------------------------------------------------------ - - /** - * State below this size will be stored as part of the metadata, rather than in files. A value - * of '-1' means not yet configured, in which case the default will be used. - */ - private final int fileStateThreshold; - - /** - * The write buffer size for created checkpoint stream, this should not be less than file state - * threshold when we want state below that threshold stored as part of metadata not files. A - * value of '-1' means not yet configured, in which case the default will be used. - */ - private final int writeBufferSize; - - /** - * Switch to create checkpoint sub-directory with name of jobId. A value of 'undefined' means - * not yet configured, in which case the default will be used. - */ - private boolean createCheckpointSubDirs; - - // ----------------------------------------------------------------------- - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - */ - public FsStateBackend(String checkpointDataUri) { - this(new Path(checkpointDataUri)); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param asynchronousSnapshots This parameter is only there for API compatibility. Checkpoints - * are always asynchronous now. - */ - public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) { - this(new Path(checkpointDataUri), asynchronousSnapshots); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - */ - public FsStateBackend(Path checkpointDataUri) { - this(checkpointDataUri.toUri()); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param asynchronousSnapshots This parameter is only there for API compatibility. Checkpoints - * are always asynchronous now. - */ - public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) { - this(checkpointDataUri.toUri(), asynchronousSnapshots); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - */ - public FsStateBackend(URI checkpointDataUri) { - this(checkpointDataUri, null, -1, -1, TernaryBoolean.UNDEFINED); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. Optionally, this constructor accepts a default savepoint storage - * directory to which savepoints are stored when no custom target path is give to the savepoint - * command. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param defaultSavepointDirectory The default directory to store savepoints to. May be null. - */ - public FsStateBackend(URI checkpointDataUri, @Nullable URI defaultSavepointDirectory) { - this(checkpointDataUri, defaultSavepointDirectory, -1, -1, TernaryBoolean.UNDEFINED); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param asynchronousSnapshots This parameter is only there for API compatibility. Checkpoints - * are always asynchronous now. - */ - public FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) { - this(checkpointDataUri, null, -1, -1, TernaryBoolean.fromBoolean(asynchronousSnapshots)); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, - * rather than in files - */ - public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) { - this(checkpointDataUri, null, fileStateSizeThreshold, -1, TernaryBoolean.UNDEFINED); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to the checkpoint data directory. - * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, - * rather than in files (-1 for default value). - * @param asynchronousSnapshots This parameter is only there for API compatibility. Checkpoints - * are always asynchronous now. - */ - public FsStateBackend( - URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots) { - - this( - checkpointDataUri, - null, - fileStateSizeThreshold, - -1, - TernaryBoolean.fromBoolean(asynchronousSnapshots)); - } - - /** - * Creates a new state backend that stores its checkpoint data in the file system and location - * defined by the given URI. - * - *

A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or - * 'S3://') must be accessible via {@link FileSystem#get(URI)}. - * - *

For a state backend targeting HDFS, this means that the URI must either specify the - * authority (host and port), or that the Hadoop configuration that describes that information - * must be in the classpath. - * - * @param checkpointDirectory The path to write checkpoint metadata to. - * @param defaultSavepointDirectory The path to write savepoints to. If null, the value from the - * runtime configuration will be used, or savepoint target locations need to be passed when - * triggering a savepoint. - * @param fileStateSizeThreshold State below this size will be stored as part of the metadata, - * rather than in files. If -1, the value configured in the runtime configuration will be - * used, or the default value (1KB) if nothing is configured. - * @param writeBufferSize Write buffer size used to serialize state. If -1, the value configured - * in the runtime configuration will be used, or the default value (4KB) if nothing is - * configured. - * @param asynchronousSnapshots This parameter is only there for API compatibility. Checkpoints - * are always asynchronous now. - */ - public FsStateBackend( - URI checkpointDirectory, - @Nullable URI defaultSavepointDirectory, - int fileStateSizeThreshold, - int writeBufferSize, - @SuppressWarnings("unused") TernaryBoolean asynchronousSnapshots) { - - super( - checkNotNull(checkpointDirectory, "checkpoint directory is null"), - defaultSavepointDirectory); - - checkArgument( - fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, - "The threshold for file state size must be in [-1, %s], where '-1' means to use " - + "the value from the deployment's configuration.", - MAX_FILE_STATE_THRESHOLD); - checkArgument( - writeBufferSize >= -1, - "The write buffer size must be not less than '-1', where '-1' means to use " - + "the value from the deployment's configuration."); - - this.fileStateThreshold = fileStateSizeThreshold; - this.writeBufferSize = writeBufferSize; - this.createCheckpointSubDirs = - CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIR.defaultValue(); - } - - /** - * Private constructor that creates a re-configured copy of the state backend. - * - * @param original The state backend to re-configure - * @param configuration The configuration - */ - private FsStateBackend( - FsStateBackend original, ReadableConfig configuration, ClassLoader classLoader) { - super(original.getCheckpointPath(), original.getSavepointPath(), configuration); - - if (getValidFileStateThreshold(original.fileStateThreshold) >= 0) { - this.fileStateThreshold = original.fileStateThreshold; - } else { - final int configuredStateThreshold = - getValidFileStateThreshold( - configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()); - - if (configuredStateThreshold >= 0) { - this.fileStateThreshold = configuredStateThreshold; - } else { - this.fileStateThreshold = - MathUtils.checkedDownCast( - FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes()); - - // because this is the only place we (unlikely) ever log, we lazily - // create the logger here - LoggerFactory.getLogger(AbstractFileStateBackend.class) - .warn( - "Ignoring invalid file size threshold value ({}): {} - using default value {} instead.", - FS_SMALL_FILE_THRESHOLD.key(), - configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(), - FS_SMALL_FILE_THRESHOLD.defaultValue()); - } - } - - final int bufferSize = - original.writeBufferSize >= 0 - ? original.writeBufferSize - : configuration.get(CheckpointingOptions.FS_WRITE_BUFFER_SIZE); - - this.writeBufferSize = Math.max(bufferSize, this.fileStateThreshold); - this.createCheckpointSubDirs = - configuration - .getOptional(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIR) - .orElse(original.createCheckpointSubDirs); - // configure latency tracking - latencyTrackingConfigBuilder = - original.latencyTrackingConfigBuilder.configure(configuration); - } - - private int getValidFileStateThreshold(long fileStateThreshold) { - if (fileStateThreshold >= 0 && fileStateThreshold <= MAX_FILE_STATE_THRESHOLD) { - return (int) fileStateThreshold; - } - return -1; - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - /** - * Gets the base directory where all the checkpoints are stored. The job-specific checkpoint - * directory is created inside this directory. - * - * @return The base directory for checkpoints. - * @deprecated Deprecated in favor of {@link #getCheckpointPath()}. - */ - @Deprecated - public Path getBasePath() { - return getCheckpointPath(); - } - - /** - * Gets the base directory where all the checkpoints are stored. The job-specific checkpoint - * directory is created inside this directory. - * - * @return The base directory for checkpoints. - */ - @Nonnull - @Override - public Path getCheckpointPath() { - // we know that this can never be null by the way of constructor checks - //noinspection ConstantConditions - return super.getCheckpointPath(); - } - - /** - * Gets the threshold below which state is stored as part of the metadata, rather than in files. - * This threshold ensures that the backend does not create a large amount of very small files, - * where potentially the file pointers are larger than the state itself. - * - *

If not explicitly configured, this is the default value of {@link - * CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}. - * - * @return The file size threshold, in bytes. - */ - public int getMinFileSizeThreshold() { - return fileStateThreshold >= 0 - ? fileStateThreshold - : MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes()); - } - - /** - * Gets the write buffer size for created checkpoint stream. - * - *

If not explicitly configured, this is the default value of {@link - * CheckpointingOptions#FS_WRITE_BUFFER_SIZE}. - * - * @return The write buffer size, in bytes. - */ - public int getWriteBufferSize() { - return writeBufferSize >= 0 - ? writeBufferSize - : CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue(); - } - - /** - * Gets whether the key/value data structures are asynchronously snapshotted, which is always - * true for this state backend. - */ - public boolean isUsingAsynchronousSnapshots() { - return true; - } - - @Override - public boolean supportsNoClaimRestoreMode() { - // we never share any files, all snapshots are full - return true; - } - - @Override - public boolean supportsSavepointFormat(SavepointFormatType formatType) { - return true; - } - - // ------------------------------------------------------------------------ - // Reconfiguration - // ------------------------------------------------------------------------ - - /** - * Creates a copy of this state backend that uses the values defined in the configuration for - * fields where that were not specified in this state backend. - * - * @param config the configuration - * @return The re-configured variant of the state backend - */ - @Override - public FsStateBackend configure(ReadableConfig config, ClassLoader classLoader) { - return new FsStateBackend(this, config, classLoader); - } - - // ------------------------------------------------------------------------ - // initialization and cleanup - // ------------------------------------------------------------------------ - - @Override - public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { - checkNotNull(jobId, "jobId"); - return new FsCheckpointStorageAccess( - getCheckpointPath(), - getSavepointPath(), - createCheckpointSubDirs, - jobId, - getMinFileSizeThreshold(), - getWriteBufferSize()); - } - - // ------------------------------------------------------------------------ - // state holding structures - // ------------------------------------------------------------------------ - - @Override - public AbstractKeyedStateBackend createKeyedStateBackend( - KeyedStateBackendParameters parameters) throws BackendBuildingException { - - TaskStateManager taskStateManager = parameters.getEnv().getTaskStateManager(); - LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); - HeapPriorityQueueSetFactory priorityQueueSetFactory = - new HeapPriorityQueueSetFactory( - parameters.getKeyGroupRange(), parameters.getNumberOfKeyGroups(), 128); - - LatencyTrackingStateConfig latencyTrackingStateConfig = - latencyTrackingConfigBuilder.setMetricGroup(parameters.getMetricGroup()).build(); - return new HeapKeyedStateBackendBuilder<>( - parameters.getKvStateRegistry(), - parameters.getKeySerializer(), - parameters.getEnv().getUserCodeClassLoader().asClassLoader(), - parameters.getNumberOfKeyGroups(), - parameters.getKeyGroupRange(), - parameters.getEnv().getExecutionConfig(), - parameters.getTtlTimeProvider(), - latencyTrackingStateConfig, - parameters.getStateHandles(), - AbstractStateBackend.getCompressionDecorator( - parameters.getEnv().getExecutionConfig()), - localRecoveryConfig, - priorityQueueSetFactory, - isUsingAsynchronousSnapshots(), - parameters.getCancelStreamRegistry()) - .build(); - } - - @Override - public OperatorStateBackend createOperatorStateBackend( - OperatorStateBackendParameters parameters) throws BackendBuildingException { - - return new DefaultOperatorStateBackendBuilder( - parameters.getEnv().getUserCodeClassLoader().asClassLoader(), - parameters.getEnv().getExecutionConfig(), - isUsingAsynchronousSnapshots(), - parameters.getStateHandles(), - parameters.getCancelStreamRegistry()) - .build(); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "File State Backend (" - + "checkpoints: '" - + getCheckpointPath() - + "', savepoints: '" - + getSavepointPath() - + ", fileStateThreshold: " - + fileStateThreshold - + ")"; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java deleted file mode 100644 index b74314f8b9c29..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.filesystem; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.runtime.state.StateBackendFactory; - -/** A factory that creates an {@link FsStateBackend} from a configuration. */ -@PublicEvolving -public class FsStateBackendFactory implements StateBackendFactory { - - @Override - public FsStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) - throws IllegalConfigurationException { - // we need to explicitly read the checkpoint directory here, because that - // is a required constructor parameter - final String checkpointDir = config.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - if (checkpointDir == null) { - throw new IllegalConfigurationException( - "Cannot create the file system state backend: The configuration does not specify the " - + "checkpoint directory '" - + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() - + '\''); - } - - try { - return new FsStateBackend(checkpointDir).configure(config, classLoader); - } catch (IllegalArgumentException e) { - throw new IllegalConfigurationException( - "Invalid configuration for the state backend", e); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java index 64785466365f8..b304c11980da8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java @@ -170,20 +170,6 @@ public JobManagerCheckpointStorage(Path checkpointPath, int maxStateSize) { .build(); } - /** - * Creates a new JobManagerCheckpointStorage, setting optionally the paths to persist checkpoint - * and savepoint - */ - public JobManagerCheckpointStorage(Path checkpointPath, Path savepointPath, int maxStateSize) { - checkArgument(maxStateSize > 0, "maxStateSize must be > 0"); - this.maxStateSize = maxStateSize; - this.location = - ExternalizedSnapshotLocation.newBuilder() - .withCheckpointPath(checkpointPath) - .withSavepointPath(savepointPath) - .build(); - } - /** * Private constructor that creates a re-configured copy of the checkpoint storage. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java deleted file mode 100644 index e2fd216be3971..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; -import org.apache.flink.testutils.junit.utils.TempDirUtils; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.List; - -/** - * Tests for the keyed state backend and operator state backend, as created by the {@link - * FsStateBackend}. - */ -@ExtendWith(ParameterizedTestExtension.class) -public class FileStateBackendTest extends StateBackendTestBase { - - @Parameters - public static List modes() { - return Arrays.asList(true, false); - } - - @Parameter public boolean useAsyncMode; - - @TempDir private Path tempFolder; - - @Override - protected ConfigurableStateBackend getStateBackend() throws Exception { - File checkpointPath = TempDirUtils.newFolder(tempFolder); - return new FsStateBackend(checkpointPath.toURI(), useAsyncMode); - } - - @Override - protected boolean isSerializerPresenceRequiredOnRestore() { - return true; - } - - @Override - protected boolean supportsAsynchronousSnapshots() { - return useAsyncMode; - } - - // disable these because the verification does not work for this state backend - @Override - @TestTemplate - void testValueStateRestoreWithWrongSerializers() {} - - @Override - @TestTemplate - void testListStateRestoreWithWrongSerializers() {} - - @Override - @TestTemplate - void testReducingStateRestoreWithWrongSerializers() {} - - @Override - @TestTemplate - void testMapStateRestoreWithWrongSerializers() {} - - @Disabled - @TestTemplate - void testConcurrentMapIfQueryable() throws Exception { - super.testConcurrentMapIfQueryable(); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendWithFsStorageMigrationTest.java similarity index 69% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendWithFsStorageMigrationTest.java index 3e5ab151056d8..09c6094bde779 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HashMapStateBackendWithFsStorageMigrationTest.java @@ -17,7 +17,8 @@ package org.apache.flink.runtime.state; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension; import org.apache.flink.testutils.junit.utils.TempDirUtils; @@ -27,14 +28,20 @@ /** * Tests for the keyed state backend and operator state backend, as created by the {@link - * FsStateBackend}. + * HashMapStateBackend}. */ @ExtendWith(NoOpTestExtension.class) -public class FileStateBackendMigrationTest extends StateBackendMigrationTestBase { +public class HashMapStateBackendWithFsStorageMigrationTest + extends StateBackendMigrationTestBase { @Override - protected FsStateBackend getStateBackend() throws Exception { + protected HashMapStateBackend getStateBackend() throws Exception { + return new HashMapStateBackend(); + } + + @Override + protected CheckpointStorage getCheckpointStorage() throws Exception { File checkpointPath = TempDirUtils.newFolder(tempFolder); - return new FsStateBackend(checkpointPath.toURI(), false); + return new FileSystemCheckpointStorage(checkpointPath.toURI()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java deleted file mode 100644 index bae693d35e952..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.configuration.StateBackendOptions; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackendFactory; -import org.apache.flink.testutils.junit.utils.TempDirUtils; -import org.apache.flink.util.DynamicCodeLoadingException; -import org.apache.flink.util.TernaryBoolean; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mockito; - -import java.io.IOException; -import java.net.URI; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** This test validates that state backends are properly loaded from configuration. */ -class StateBackendLoadingTest { - - @TempDir private java.nio.file.Path tmp; - - private final ClassLoader cl = getClass().getClassLoader(); - - private final String backendKey = StateBackendOptions.STATE_BACKEND.key(); - - // ------------------------------------------------------------------------ - // defaults - // ------------------------------------------------------------------------ - - @Test - void testDefaultStateBackend() throws Exception { - assertThat(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)) - .isInstanceOf(HashMapStateBackend.class); - } - - @Test - void testInstantiateHashMapStateBackendBackendByDefault() throws Exception { - StateBackend backend = - StateBackendLoader.fromApplicationOrConfigOrDefault( - null, new Configuration(), new Configuration(), cl, null); - - assertThat(backend).isInstanceOf(HashMapStateBackend.class); - } - - @Test - void testApplicationDefinedHasPrecedence() throws Exception { - final StateBackend appBackend = Mockito.mock(StateBackend.class); - - final Configuration config = new Configuration(); - config.setString(backendKey, "hashmap"); - - StateBackend backend = - StateBackendLoader.fromApplicationOrConfigOrDefault( - appBackend, config, config, cl, null); - assertThat(backend).isEqualTo(appBackend); - } - - // ------------------------------------------------------------------------ - // HashMap State Backend - // ------------------------------------------------------------------------ - - /** Validates loading a HashMapStateBackend from the cluster configuration. */ - @Test - void testLoadHashMapStateBackendNoParameters() throws Exception { - // we configure with the explicit string (rather than - // AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name - - final Configuration config = new Configuration(); - config.setString(backendKey, HashMapStateBackendFactory.class.getName()); - - StateBackend backend = StateBackendLoader.loadStateBackendFromConfig(config, cl, null); - - assertThat(backend).isInstanceOf(HashMapStateBackend.class); - } - - /** - * Validates loading a hashMap state backend with additional parameters from the cluster - * configuration. - */ - @Test - void testLoadHashMapStateBackendWithParameters() throws Exception { - // we configure with the explicit string (rather than - // AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name - - final Configuration config1 = new Configuration(); - config1.setString(backendKey, "hashmap"); - - final Configuration config2 = new Configuration(); - config2.setString(backendKey, HashMapStateBackendFactory.class.getName()); - - HashMapStateBackend backend1 = - (HashMapStateBackend) - StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); - HashMapStateBackend backend2 = - (HashMapStateBackend) - StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); - - assertThat(backend1).isNotNull(); - assertThat(backend2).isNotNull(); - } - - // ------------------------------------------------------------------------ - // File System State Backend - // ------------------------------------------------------------------------ - - /** - * Validates taking the application-defined file system state backend and adding with additional - * parameters from configuration, but giving precedence to application-defined parameters over - * configuration-defined parameters. - */ - @Test - void testLoadFileSystemStateBackendMixed() throws Exception { - final String appCheckpointDir = new Path(TempDirUtils.newFolder(tmp).toURI()).toString(); - final String checkpointDir = new Path(TempDirUtils.newFolder(tmp).toURI()).toString(); - final String savepointDir = new Path(TempDirUtils.newFolder(tmp).toURI()).toString(); - - final Path expectedCheckpointsPath = new Path(new URI(appCheckpointDir)); - final Path expectedSavepointsPath = new Path(savepointDir); - - final int threshold = 1000000; - final int writeBufferSize = 4000000; - - final FsStateBackend backend = - new FsStateBackend( - new URI(appCheckpointDir), - null, - threshold, - writeBufferSize, - TernaryBoolean.TRUE); - - final Configuration config = new Configuration(); - config.setString(backendKey, "hashmap"); // this should not be picked up - config.set( - CheckpointingOptions.CHECKPOINTS_DIRECTORY, - checkpointDir); // this should not be picked up - config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); - config.set( - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, - MemorySize.parse("20")); // this should not be picked up - config.set( - CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000); // this should not be picked up - - final StateBackend loadedBackend = - StateBackendLoader.fromApplicationOrConfigOrDefault( - backend, config, config, cl, null); - assertThat(loadedBackend).isInstanceOf(FsStateBackend.class); - - final FsStateBackend fs = (FsStateBackend) loadedBackend; - assertThat(fs.getCheckpointPath()).isEqualTo(expectedCheckpointsPath); - assertThat(fs.getSavepointPath()).isEqualTo(expectedSavepointsPath); - assertThat(fs.getMinFileSizeThreshold()).isEqualTo(threshold); - assertThat(fs.getWriteBufferSize()).isEqualTo(writeBufferSize); - } - - // ------------------------------------------------------------------------ - // Failures - // ------------------------------------------------------------------------ - - /** - * This test makes sure that failures properly manifest when the state backend could not be - * loaded. - */ - @Test - void testLoadingFails() throws Exception { - final Configuration config = new Configuration(); - - // try a value that is neither recognized as a name, nor corresponds to a class - config.setString(backendKey, "does.not.exist"); - assertThatThrownBy( - () -> - StateBackendLoader.fromApplicationOrConfigOrDefault( - null, config, new Configuration(), cl, null)) - .isInstanceOf(DynamicCodeLoadingException.class); - - // try a class that is not a factory - config.setString(backendKey, java.io.File.class.getName()); - assertThatThrownBy( - () -> - StateBackendLoader.fromApplicationOrConfigOrDefault( - null, config, new Configuration(), cl, null)) - .isInstanceOf(DynamicCodeLoadingException.class); - - // a factory that fails - config.setString(backendKey, FailingFactory.class.getName()); - assertThatThrownBy( - () -> - StateBackendLoader.fromApplicationOrConfigOrDefault( - null, config, new Configuration(), cl, null)) - .isInstanceOf(IOException.class); - } - - // ------------------------------------------------------------------------ - - static final class FailingFactory implements StateBackendFactory { - - @Override - public StateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) - throws IOException { - throw new IOException("fail!"); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 7486293d89735..b6ac09dc2dad8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.util.IOUtils; @@ -79,15 +80,7 @@ public abstract class StateBackendMigrationTestBase { protected abstract B getStateBackend() throws Exception; protected CheckpointStorage getCheckpointStorage() throws Exception { - StateBackend stateBackend = getStateBackend(); - if (stateBackend instanceof CheckpointStorage) { - return (CheckpointStorage) stateBackend; - } - - throw new IllegalStateException( - "The state backend under test does not implement CheckpointStorage." - + "Please override 'createCheckpointStorage' and provide an appropriate" - + "checkpoint storage instance"); + return new JobManagerCheckpointStorage(); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java index 306fed51c6cf2..8d5be6a72bf8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java @@ -48,10 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; -/** - * Tests for the {@link FsCheckpointStorageAccess}, which implements the checkpoint storage aspects - * of the {@link FsStateBackend}. - */ +/** Tests for the {@link FsCheckpointStorageAccess}, which implements the checkpoint storage. */ class FsCheckpointStorageAccessTest extends AbstractFileCheckpointStorageAccessTestBase { private static final int FILE_SIZE_THRESHOLD = 1024; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java index 7f7ada6271867..9b7f8be2fbe37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java @@ -93,8 +93,8 @@ void testSharedStateHasAbsolutePathHandles() throws IOException { @Test void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException { - final FsStateBackendEntropyTest.TestEntropyAwareFs fs = - new FsStateBackendEntropyTest.TestEntropyAwareFs(); + final FsStorageEntropyTest.TestEntropyAwareFs fs = + new FsStorageEntropyTest.TestEntropyAwareFs(); final FsCheckpointStreamFactory factory = createFactory(fs, 0); final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = @@ -136,7 +136,7 @@ void testFlushAboveThreshold() throws IOException { private void flushAndVerify(int minFileSize, int bytesToFlush, boolean expectEmpty) throws IOException { FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = - createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize) + createFactory(new FsStorageEntropyTest.TestEntropyAwareFs(), minFileSize) .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); stream.write(new byte[bytesToFlush], 0, bytesToFlush); @@ -160,7 +160,7 @@ private FsCheckpointStreamFactory createFactory(FileSystem fs, int fileSizeThres } private FsCheckpointStreamFactory createFactory( - FsStateBackendEntropyTest.TestEntropyAwareFs fs, int fileSizeThreshold) { + FsStorageEntropyTest.TestEntropyAwareFs fs, int fileSizeThreshold) { final Path exclusiveStateDirWithEntropy = exclusiveStateDir.resolve(Objects.requireNonNull(fs.getEntropyInjectionKey())); return new FsCheckpointStreamFactory( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageEntropyTest.java similarity index 97% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageEntropyTest.java index 1aa4317b407f0..020a3ea955579 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStorageEntropyTest.java @@ -34,10 +34,10 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Tests verifying that the FsStateBackend passes the entropy injection option to the FileSystem for + * Tests verifying that the FsStorage passes the entropy injection option to the FileSystem for * state payload files, but not for metadata files. */ -class FsStateBackendEntropyTest { +class FsStorageEntropyTest { static final String ENTROPY_MARKER = "__ENTROPY__"; static final String RESOLVED_MARKER = "+RESOLVED+"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java index d83341fc0c77f..5061aaf8369fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.java @@ -96,21 +96,16 @@ void testParametrizationDefault() throws Exception { @Test void testParametrizationDirectories() throws Exception { final JobID jid = new JobID(); - final Path checkpointPath = new Path(TempDirUtils.newFolder(tmp).toURI().toString()); - final Path savepointPath = new Path(TempDirUtils.newFolder(tmp).toURI().toString()); + final String checkpointPath = TempDirUtils.newFolder(tmp).toURI().toString(); JobManagerCheckpointStorage jobManagerCheckpointStorage = - new JobManagerCheckpointStorage(checkpointPath, savepointPath, 1000); + new JobManagerCheckpointStorage(checkpointPath); MemoryBackendCheckpointStorageAccess storage = (MemoryBackendCheckpointStorageAccess) jobManagerCheckpointStorage.createCheckpointStorage(jid); assertThat(storage.supportsHighlyAvailableStorage()).isTrue(); - assertThat(storage.hasDefaultSavepointLocation()).isTrue(); - assertThat(storage.getDefaultSavepointDirectory()).isNotNull(); - - assertThat(storage.getDefaultSavepointDirectory()).isEqualTo(savepointPath); } @Test diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java deleted file mode 100644 index 55c1752c31861..0000000000000 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.changelog; - -import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateLatencyTrackOptions; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.state.CheckpointStorage; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; -import org.apache.flink.runtime.state.ConfigurableStateBackend; -import org.apache.flink.runtime.state.FileStateBackendTest; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.TestTaskStateManager; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; -import org.apache.flink.testutils.junit.utils.TempDirUtils; - -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.io.TempDir; - -import java.io.IOException; -import java.nio.file.Path; -import java.time.Duration; - -/** Tests for {@link ChangelogStateBackend} delegating {@link FsStateBackend}. */ -public class ChangelogDelegateFileStateBackendTest extends FileStateBackendTest { - - @TempDir private Path temp; - - @Override - protected TestTaskStateManager getTestTaskStateManager() throws IOException { - return ChangelogStateBackendTestUtils.createTaskStateManager(TempDirUtils.newFolder(temp)); - } - - @Override - protected boolean snapshotUsesStreamFactory() { - return false; - } - - @Override - protected boolean supportsMetaInfoVerification() { - return false; - } - - @Override - protected CheckpointableKeyedStateBackend createKeyedBackend( - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Environment env) - throws Exception { - - return ChangelogStateBackendTestUtils.createKeyedBackend( - new ChangelogStateBackend(super.getStateBackend()), - keySerializer, - numberOfKeyGroups, - keyGroupRange, - env); - } - - @Override - protected ConfigurableStateBackend getStateBackend() throws Exception { - return new ChangelogStateBackend(super.getStateBackend()); - } - - @Override - protected CheckpointStorage getCheckpointStorage() { - return new JobManagerCheckpointStorage(); - } - - @TestTemplate - public void testMaterializedRestore() throws Exception { - CheckpointStreamFactory streamFactory = createStreamFactory(); - - ChangelogStateBackendTestUtils.testMaterializedRestore( - getStateBackend(), StateTtlConfig.DISABLED, env, streamFactory); - } - - @TestTemplate - public void testMaterializedRestoreWithWrappedState() throws Exception { - CheckpointStreamFactory streamFactory = createStreamFactory(); - - Configuration configuration = new Configuration(); - configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true); - StateBackend stateBackend = - getStateBackend() - .configure(configuration, Thread.currentThread().getContextClassLoader()); - ChangelogStateBackendTestUtils.testMaterializedRestore( - stateBackend, - StateTtlConfig.newBuilder(Duration.ofMinutes(1)).build(), - env, - streamFactory); - } - - @TestTemplate - public void testMaterializedRestorePriorityQueue() throws Exception { - CheckpointStreamFactory streamFactory = createStreamFactory(); - - ChangelogStateBackendTestUtils.testMaterializedRestoreForPriorityQueue( - getStateBackend(), env, streamFactory); - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/LegacyEnumBridge.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/LegacyEnumBridge.java deleted file mode 100644 index b6272f55513f1..0000000000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/LegacyEnumBridge.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -/** Bridge utility between the old and new enum type. */ -@SuppressWarnings("deprecation") -class LegacyEnumBridge { - - private LegacyEnumBridge() {} - - static RocksDBStateBackend.PriorityQueueStateType convert( - EmbeddedRocksDBStateBackend.PriorityQueueStateType t) { - switch (t) { - case HEAP: - return RocksDBStateBackend.PriorityQueueStateType.HEAP; - case ROCKSDB: - return RocksDBStateBackend.PriorityQueueStateType.ROCKSDB; - default: - throw new IllegalStateException("Unknown enum type " + t); - } - } - - static EmbeddedRocksDBStateBackend.PriorityQueueStateType convert( - RocksDBStateBackend.PriorityQueueStateType t) { - switch (t) { - case HEAP: - return EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP; - case ROCKSDB: - return EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB; - default: - throw new IllegalStateException("Unknown enum type " + t); - } - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java deleted file mode 100644 index 5f341a2707c40..0000000000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ /dev/null @@ -1,530 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend; -import org.apache.flink.runtime.state.CheckpointStorage; -import org.apache.flink.runtime.state.CheckpointStorageAccess; -import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; -import org.apache.flink.runtime.state.ConfigurableStateBackend; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.util.TernaryBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * IMPORTANT {@link RocksDBStateBackend} is deprecated in favor of {@link - * EmbeddedRocksDBStateBackend}. and {@link - * org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage}. This change does not affect - * the runtime characteristics of your Jobs and is simply an API change to help better communicate - * the ways Flink separates local state storage from fault tolerance. Jobs can be upgraded without - * loss of state. If configuring your state backend via the {@code StreamExecutionEnvironment} - * please make the following changes. - * - *

{@code
- * 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * 		env.setStateBackend(new EmbeddedRocksDBStateBackend());
- * 		env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");
- * }
- * - *

If you are configuring your state backend via the {@code config.yaml} no changes are required. - * - *

A State Backend that stores its state in {@code RocksDB}. This state backend can store very - * large state that exceeds memory and spills to disk. - * - *

All key/value state (including windows) is stored in the key/value index of RocksDB. For - * persistence against loss of machines, checkpoints take a snapshot of the RocksDB database, and - * persist that snapshot in a file system (by default) or another configurable state backend. - * - *

The behavior of the RocksDB instances can be parametrized by setting RocksDB Options using the - * methods {@link #setPredefinedOptions(PredefinedOptions)} and {@link - * #setRocksDBOptions(RocksDBOptionsFactory)}. - */ -@Deprecated -public class RocksDBStateBackend extends AbstractManagedMemoryStateBackend - implements CheckpointStorage, ConfigurableStateBackend { - - /** The options to chose for the type of priority queue state. */ - @Deprecated - public enum PriorityQueueStateType { - HEAP, - ROCKSDB - } - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); - - // ------------------------------------------------------------------------ - - // -- configuration values, set in the application / configuration - - private final EmbeddedRocksDBStateBackend rocksDBStateBackend; - - /** The checkpoint storage that we use for creating checkpoint streams. */ - private final StateBackend checkpointStreamBackend; - - // ------------------------------------------------------------------------ - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the file system - * and location defined by the given URI. - * - *

A state backend that stores checkpoints in HDFS or S3 must specify the file system host - * and port in the URI, or have the Hadoop configuration that describes the file system (host / - * high-availability group / possibly credentials) either referenced from the Flink config, or - * included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data - * directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(String checkpointDataUri) throws IOException { - this(new Path(checkpointDataUri).toUri()); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the file system - * and location defined by the given URI. - * - *

A state backend that stores checkpoints in HDFS or S3 must specify the file system host - * and port in the URI, or have the Hadoop configuration that describes the file system (host / - * high-availability group / possibly credentials) either referenced from the Flink config, or - * included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data - * directory. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) - throws IOException { - this(new Path(checkpointDataUri).toUri(), enableIncrementalCheckpointing); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the file system - * and location defined by the given URI. - * - *

A state backend that stores checkpoints in HDFS or S3 must specify the file system host - * and port in the URI, or have the Hadoop configuration that describes the file system (host / - * high-availability group / possibly credentials) either referenced from the Flink config, or - * included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data - * directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(URI checkpointDataUri) throws IOException { - this(new FsStateBackend(checkpointDataUri)); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the file system - * and location defined by the given URI. - * - *

A state backend that stores checkpoints in HDFS or S3 must specify the file system host - * and port in the URI, or have the Hadoop configuration that describes the file system (host / - * high-availability group / possibly credentials) either referenced from the Flink config, or - * included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data - * directory. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) - throws IOException { - this( - new FsStateBackend(checkpointDataUri), - TernaryBoolean.fromBoolean(enableIncrementalCheckpointing)); - } - - /** - * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its - * checkpoint data streams. Typically, one would supply a filesystem or database state backend - * here where the snapshots from RocksDB would be stored. - * - *

The snapshots of the RocksDB state will be stored using the given backend's {@link - * CheckpointStorage#createCheckpointStorage(JobID)}. - * - * @param checkpointStreamBackend The backend write the checkpoint streams to. - */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend) { - this(checkpointStreamBackend, TernaryBoolean.UNDEFINED); - } - - /** - * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its - * checkpoint data streams. Typically, one would supply a filesystem or database state backend - * here where the snapshots from RocksDB would be stored. - * - *

The snapshots of the RocksDB state will be stored using the given backend's {@code - * StateBackend#createCheckpointStorage(JobID)}. - * - * @param checkpointStreamBackend The backend write the checkpoint streams to. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - */ - public RocksDBStateBackend( - StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { - if (!(checkpointStreamBackend instanceof CheckpointStorage)) { - throw new IllegalStateException( - "RocksDBStateBackend can only checkpoint" - + "to state backends that also implement CheckpointStorage."); - } - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - this.rocksDBStateBackend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); - } - - /** - * Private constructor that creates a re-configured copy of the state backend. - * - * @param original The state backend to re-configure. - * @param config The configuration. - * @param classLoader The class loader. - */ - private RocksDBStateBackend( - RocksDBStateBackend original, ReadableConfig config, ClassLoader classLoader) { - // reconfigure the state backend backing the streams - final StateBackend originalStreamBackend = original.checkpointStreamBackend; - this.checkpointStreamBackend = - originalStreamBackend instanceof ConfigurableStateBackend - ? ((ConfigurableStateBackend) originalStreamBackend) - .configure(config, classLoader) - : originalStreamBackend; - this.rocksDBStateBackend = original.rocksDBStateBackend.configure(config, classLoader); - } - - // ------------------------------------------------------------------------ - // Reconfiguration - // ------------------------------------------------------------------------ - - /** - * Creates a copy of this state backend that uses the values defined in the configuration for - * fields where that were not yet specified in this state backend. - * - * @param config The configuration. - * @param classLoader The class loader. - * @return The re-configured variant of the state backend - */ - @Override - public RocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader) { - return new RocksDBStateBackend(this, config, classLoader); - } - - // ------------------------------------------------------------------------ - // State backend methods - // ------------------------------------------------------------------------ - - /** - * Gets the state backend that this RocksDB state backend uses to persist its bytes to. - * - *

This RocksDB state backend only implements the RocksDB specific parts, it relies on the - * 'CheckpointBackend' to persist the checkpoint and savepoint bytes streams. - */ - public StateBackend getCheckpointBackend() { - return checkpointStreamBackend; - } - - @Override - public boolean supportsNoClaimRestoreMode() { - // We are able to create CheckpointType#FULL_CHECKPOINT. (we might potentially reupload some - // shared files when taking incremental snapshots) - return true; - } - - @Override - public boolean supportsSavepointFormat(SavepointFormatType formatType) { - return true; - } - - // ------------------------------------------------------------------------ - // Checkpoint initialization and persistent storage - // ------------------------------------------------------------------------ - - @Override - public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { - return ((CheckpointStorage) checkpointStreamBackend).resolveCheckpoint(pointer); - } - - @Override - public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { - return ((CheckpointStorage) checkpointStreamBackend).createCheckpointStorage(jobId); - } - - // ------------------------------------------------------------------------ - // State holding data structures - // ------------------------------------------------------------------------ - - @Override - public AbstractKeyedStateBackend createKeyedStateBackend( - KeyedStateBackendParameters parameters) throws IOException { - return rocksDBStateBackend.createKeyedStateBackend(parameters); - } - - @Override - public OperatorStateBackend createOperatorStateBackend( - OperatorStateBackendParameters parameters) throws Exception { - return rocksDBStateBackend.createOperatorStateBackend(parameters); - } - - // ------------------------------------------------------------------------ - // Parameters - // ------------------------------------------------------------------------ - - /** - * Sets the path where the RocksDB local database files should be stored on the local file - * system. Setting this path overrides the default behavior, where the files are stored across - * the configured temp directories. - * - *

Passing {@code null} to this function restores the default behavior, where the configured - * temp directories will be used. - * - * @param path The path where the local RocksDB database files are stored. - */ - public void setDbStoragePath(String path) { - setDbStoragePaths(path == null ? null : new String[] {path}); - } - - /** - * Sets the directories in which the local RocksDB database puts its files (like SST and - * metadata files). These directories do not need to be persistent, they can be ephemeral, - * meaning that they are lost on a machine failure, because state in RocksDB is persisted in - * checkpoints. - * - *

If nothing is configured, these directories default to the TaskManager's local temporary - * file directories. - * - *

Each distinct state will be stored in one path, but when the state backend creates - * multiple states, they will store their files on different paths. - * - *

Passing {@code null} to this function restores the default behavior, where the configured - * temp directories will be used. - * - * @param paths The paths across which the local RocksDB database files will be spread. - */ - public void setDbStoragePaths(String... paths) { - rocksDBStateBackend.setDbStoragePaths(paths); - } - - /** - * Gets the configured local DB storage paths, or null, if none were configured. - * - *

Under these directories on the TaskManager, RocksDB stores its SST files and metadata - * files. These directories do not need to be persistent, they can be ephermeral, meaning that - * they are lost on a machine failure, because state in RocksDB is persisted in checkpoints. - * - *

If nothing is configured, these directories default to the TaskManager's local temporary - * file directories. - */ - public String[] getDbStoragePaths() { - return rocksDBStateBackend.getDbStoragePaths(); - } - - /** Gets whether incremental checkpoints are enabled for this state backend. */ - public boolean isIncrementalCheckpointsEnabled() { - return rocksDBStateBackend.isIncrementalCheckpointsEnabled(); - } - - /** - * Gets the type of the priority queue state. It will fallback to the default value, if it is - * not explicitly set. - * - * @return The type of the priority queue state. - */ - public PriorityQueueStateType getPriorityQueueStateType() { - return LegacyEnumBridge.convert(rocksDBStateBackend.getPriorityQueueStateType()); - } - - /** - * Sets the type of the priority queue state. It will fallback to the default value, if it is - * not explicitly set. - */ - public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) { - rocksDBStateBackend.setPriorityQueueStateType( - LegacyEnumBridge.convert(priorityQueueStateType)); - } - - // ------------------------------------------------------------------------ - // Parametrize with RocksDB Options - // ------------------------------------------------------------------------ - - /** - * Sets the predefined options for RocksDB. - * - *

If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * config.yaml) or a user-defined options factory is set (via {@link - * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on - * top of the here specified predefined options and customized options. - * - * @param options The options to set (must not be null). - */ - public void setPredefinedOptions(@Nonnull PredefinedOptions options) { - rocksDBStateBackend.setPredefinedOptions(options); - } - - /** - * Gets the currently set predefined options for RocksDB. The default options (if nothing was - * set via {@link #setPredefinedOptions(PredefinedOptions)}) are {@link - * PredefinedOptions#DEFAULT}. - * - *

If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * config.yaml) of a user-defined options factory is set (via {@link - * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on - * top of the predefined and customized options. - * - * @return The currently set predefined options for RocksDB. - */ - @VisibleForTesting - public PredefinedOptions getPredefinedOptions() { - return rocksDBStateBackend.getPredefinedOptions(); - } - - /** @return The underlying {@link EmbeddedRocksDBStateBackend} instance. */ - @VisibleForTesting - EmbeddedRocksDBStateBackend getEmbeddedRocksDBStateBackend() { - return rocksDBStateBackend; - } - - /** - * Sets {@link org.rocksdb.Options} for the RocksDB instances. Because the options are not - * serializable and hold native code references, they must be specified through a factory. - * - *

The options created by the factory here are applied on top of the pre-defined options - * profile selected via {@link #setPredefinedOptions(PredefinedOptions)}. If the pre-defined - * options profile is the default ({@link PredefinedOptions#DEFAULT}), then the factory fully - * controls the RocksDB options. - * - * @param optionsFactory The options factory that lazily creates the RocksDB options. - */ - public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory) { - rocksDBStateBackend.setRocksDBOptions(optionsFactory); - } - - /** - * Gets {@link org.rocksdb.Options} for the RocksDB instances. - * - *

The options created by the factory here are applied on top of the pre-defined options - * profile selected via {@link #setPredefinedOptions(PredefinedOptions)}. If the pre-defined - * options profile is the default ({@link PredefinedOptions#DEFAULT}), then the factory fully - * controls the RocksDB options. - */ - @Nullable - public RocksDBOptionsFactory getRocksDBOptions() { - return rocksDBStateBackend.getRocksDBOptions(); - } - - /** Gets the number of threads used to transfer files while snapshotting/restoring. */ - public int getNumberOfTransferThreads() { - return rocksDBStateBackend.getNumberOfTransferThreads(); - } - - /** - * Sets the number of threads used to transfer files while snapshotting/restoring. - * - * @param numberOfTransferThreads The number of threads used to transfer files while - * snapshotting/restoring. - */ - public void setNumberOfTransferThreads(int numberOfTransferThreads) { - rocksDBStateBackend.setNumberOfTransferThreads(numberOfTransferThreads); - } - - /** @deprecated Typo in method name. Use {@link #getNumberOfTransferThreads} instead. */ - @Deprecated - public int getNumberOfTransferingThreads() { - return getNumberOfTransferThreads(); - } - - /** @deprecated Typo in method name. Use {@link #setNumberOfTransferThreads(int)} instead. */ - @Deprecated - public void setNumberOfTransferingThreads(int numberOfTransferingThreads) { - setNumberOfTransferThreads(numberOfTransferingThreads); - } - - /** Gets the max batch size will be used in {@link RocksDBWriteBatchWrapper}. */ - public long getWriteBatchSize() { - return rocksDBStateBackend.getWriteBatchSize(); - } - - /** - * Sets the max batch size will be used in {@link RocksDBWriteBatchWrapper}, no positive value - * will disable memory size controller, just use item count controller. - * - * @param writeBatchSize The size will used to be used in {@link RocksDBWriteBatchWrapper}. - */ - public void setWriteBatchSize(long writeBatchSize) { - rocksDBStateBackend.setWriteBatchSize(writeBatchSize); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - @VisibleForTesting - RocksDBResourceContainer createOptionsAndResourceContainer() { - return rocksDBStateBackend.createOptionsAndResourceContainer(null); - } - - @Override - public String toString() { - return "RocksDBStateBackend{" - + "checkpointStreamBackend=" - + checkpointStreamBackend - + ", localRocksDbDirectories=" - + Arrays.toString(rocksDBStateBackend.getDbStoragePaths()) - + ", enableIncrementalCheckpointing=" - + rocksDBStateBackend.isIncrementalCheckpointsEnabled() - + ", numberOfTransferThreads=" - + rocksDBStateBackend.getNumberOfTransferThreads() - + ", writeBatchSize=" - + rocksDBStateBackend.getWriteBatchSize() - + '}'; - } - - // ------------------------------------------------------------------------ - // static library loading utilities - // ------------------------------------------------------------------------ - - @VisibleForTesting - static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { - EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory); - } - - @VisibleForTesting - static void resetRocksDBLoadedFlag() throws Exception { - EmbeddedRocksDBStateBackend.resetRocksDBLoadedFlag(); - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java deleted file mode 100644 index 35fc4425c542c..0000000000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.runtime.state.StateBackendFactory; - -import java.io.IOException; - -/** - * A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend} - * from a configuration. - */ -@Deprecated -public class RocksDBStateBackendFactory implements StateBackendFactory { - - @Override - public RocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) - throws IllegalConfigurationException, IOException { - - // we need to explicitly read the checkpoint directory here, because that - // is a required constructor parameter - final String checkpointDirURI = config.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - if (checkpointDirURI == null) { - throw new IllegalConfigurationException( - "Cannot create the RocksDB state backend: The configuration does not specify the " - + "checkpoint directory '" - + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() - + '\''); - } - - return new RocksDBStateBackend(checkpointDirURI).configure(config, classLoader); - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java index 29c7f48b854c9..7b9aab789c2e2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HeapTimersSnapshottingTest.java @@ -20,11 +20,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.PriorityQueueStateType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SavepointType; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; @@ -51,10 +52,11 @@ public class HeapTimersSnapshottingTest { public void testNotSerializingTimersInRawStateForSavepoints() throws Exception { try (KeyedOneInputStreamOperatorTestHarness testHarness = getTestHarness()) { - RocksDBStateBackend backend = - new RocksDBStateBackend(temporaryFolder.newFolder().toURI()); + EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(); backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP); testHarness.setStateBackend(backend); + testHarness.setCheckpointStorage( + new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())); testHarness.open(); testHarness.processElement(0, 0L); @@ -71,10 +73,11 @@ public void testNotSerializingTimersInRawStateForSavepoints() throws Exception { public void testSerializingTimersInRawStateForCheckpoints() throws Exception { try (KeyedOneInputStreamOperatorTestHarness testHarness = getTestHarness()) { - RocksDBStateBackend backend = - new RocksDBStateBackend(temporaryFolder.newFolder().toURI()); + EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(); backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP); testHarness.setStateBackend(backend); + testHarness.setCheckpointStorage( + new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI())); testHarness.open(); testHarness.processElement(0, 0L); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index dda75d5cf48fe..b30bcbd409e53 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SnapshotResult; -import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -330,13 +329,12 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream( // to avoid serialization of the above factory instance, we need to pass it in // through a static variable - StateBackend stateBackend = - new BackendForTestStream(new StaticForwardFactory(blockerCheckpointStreamFactory)); - - RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend); + EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(); backend.setDbStoragePath(dbDir.getAbsolutePath()); streamConfig.setStateBackend(backend); + streamConfig.setCheckpointStorage( + new BackendForTestStream(new StaticForwardFactory(blockerCheckpointStreamFactory))); streamConfig.setStreamOperator(new AsyncCheckpointOperator()); streamConfig.setOperatorID(new OperatorID()); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTestFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTestFactory.java index 58462741ac22f..d3324609acadf 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTestFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTestFactory.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.apache.flink.util.TernaryBoolean; @@ -48,7 +47,7 @@ public class RocksDBKeyedStateBackendTestFactory implements AutoCloseable { public RocksDBKeyedStateBackend create( TemporaryFolder tmp, TypeSerializer keySerializer, int maxKeyGroupNumber) throws Exception { - RocksDBStateBackend backend = getRocksDBStateBackend(tmp); + EmbeddedRocksDBStateBackend backend = getRocksDBStateBackend(tmp); env = MockEnvironment.builder().build(); JobID jobID = new JobID(); KeyGroupRange keyGroupRange = new KeyGroupRange(0, maxKeyGroupNumber - 1); @@ -82,11 +81,11 @@ public void close() { IOUtils.closeQuietly(env); } - private RocksDBStateBackend getRocksDBStateBackend(TemporaryFolder tmp) throws IOException { + private EmbeddedRocksDBStateBackend getRocksDBStateBackend(TemporaryFolder tmp) + throws IOException { String dbPath = tmp.newFolder().getAbsolutePath(); String checkpointPath = tmp.newFolder().toURI().toString(); - RocksDBStateBackend backend = - new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.TRUE); + EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(TernaryBoolean.TRUE); backend.setDbStoragePath(dbPath); return backend; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 78fc9a0c65214..7e70cf0979ff5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -41,14 +41,12 @@ import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.TernaryBoolean; import org.apache.commons.lang3.RandomUtils; import org.junit.Assert; @@ -83,12 +81,10 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; /** Tests for configuring the RocksDB State Backend. */ @SuppressWarnings("serial") @@ -268,7 +264,7 @@ public void testConfigureTimerServiceLoadingFromApplication() throws Exception { final Configuration configFromConfFile = new Configuration(); configFromConfFile.setString( RocksDBOptions.TIMER_SERVICE_FACTORY.key(), - RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString()); + EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString()); // configure final backend from job and cluster config final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend = @@ -374,29 +370,6 @@ private void testLocalDbPaths(String configuredPath, File expectedPath) throws E } } - /** Validates that empty arguments for the local DB path are invalid. */ - @Test(expected = IllegalArgumentException.class) - public void testSetEmptyPaths() throws Exception { - String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); - rocksDbBackend.setDbStoragePaths(); - } - - /** Validates that schemes other than 'file:/' are not allowed. */ - @Test(expected = IllegalArgumentException.class) - public void testNonFileSchemePath() throws Exception { - String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); - rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); - } - - @Test(expected = IllegalArgumentException.class) - public void testDbPathRelativePaths() throws Exception { - RocksDBStateBackend rocksDbBackend = - new RocksDBStateBackend(tempFolder.newFolder().toURI().toString()); - rocksDbBackend.setDbStoragePath("relative/path"); - } - @Test @Timeout(value = 60) public void testCleanRelocatedDbLogs() throws Exception { @@ -458,8 +431,7 @@ public void testCleanRelocatedDbLogs() throws Exception { */ @Test public void testUseTempDirectories() throws Exception { - String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); File dir1 = tempFolder.newFolder(); @@ -508,7 +480,7 @@ public void testFailWhenNoLocalStorageDir() throws Exception { "Cannot mark directory non-writable", targetDir.setWritable(false, false)); String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); try (MockEnvironment env = getMockEnvironment(tempFolder.newFolder())) { rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); @@ -554,7 +526,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception { "Cannot mark directory non-writable", targetDir1.setWritable(false, false)); String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); try (MockEnvironment env = getMockEnvironment(tempFolder.newFolder())) { rocksDbBackend.setDbStoragePaths( @@ -600,7 +572,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception { @Test public void testPredefinedOptions() throws Exception { String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); // verify that we would use PredefinedOptions.DEFAULT by default. assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); @@ -609,7 +581,7 @@ public void testPredefinedOptions() throws Exception { Configuration configuration = new Configuration(); configuration.set( RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED.name()); - rocksDbBackend = new RocksDBStateBackend(checkpointPath); + rocksDbBackend = new EmbeddedRocksDBStateBackend(); rocksDbBackend = rocksDbBackend.configure(configuration, getClass().getClassLoader()); assertEquals(PredefinedOptions.FLASH_SSD_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); @@ -719,7 +691,7 @@ public void testConfigurableOptionsFromConfig() throws Exception { @Test public void testOptionsFactory() throws Exception { String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); // verify that user-defined options factory could be configured via config.yaml Configuration config = new Configuration(); @@ -731,7 +703,7 @@ public void testOptionsFactory() throws Exception { assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory); try (RocksDBResourceContainer optionsContainer = - rocksDbBackend.createOptionsAndResourceContainer()) { + rocksDbBackend.createOptionsAndResourceContainer(null)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(4, dbOptions.maxBackgroundJobs()); } @@ -755,7 +727,7 @@ public ColumnFamilyOptions createColumnOptions( }); try (RocksDBResourceContainer optionsContainer = - rocksDbBackend.createOptionsAndResourceContainer()) { + rocksDbBackend.createOptionsAndResourceContainer(null)) { ColumnFamilyOptions colCreated = optionsContainer.getColumnOptions(); assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle()); } @@ -822,51 +794,6 @@ public ColumnFamilyOptions createColumnOptions( } } - // ------------------------------------------------------------------------ - // Reconfiguration - // ------------------------------------------------------------------------ - - @Test - public void testRocksDbReconfigurationCopiesExistingValues() throws Exception { - final FsStateBackend checkpointBackend = - new FsStateBackend(tempFolder.newFolder().toURI().toString()); - final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - - final RocksDBStateBackend original = - new RocksDBStateBackend(checkpointBackend, TernaryBoolean.fromBoolean(incremental)); - - // these must not be the default options - final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM; - assertNotEquals(predOptions, original.getPredefinedOptions()); - original.setPredefinedOptions(predOptions); - - final RocksDBOptionsFactory optionsFactory = mock(RocksDBOptionsFactory.class); - original.setRocksDBOptions(optionsFactory); - - final String[] localDirs = - new String[] { - tempFolder.newFolder().getAbsolutePath(), - tempFolder.newFolder().getAbsolutePath() - }; - original.setDbStoragePaths(localDirs); - - RocksDBStateBackend copy = - original.configure( - new Configuration(), Thread.currentThread().getContextClassLoader()); - - assertEquals( - original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled()); - assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths()); - assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions()); - assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions()); - - FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend(); - assertEquals( - checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath()); - assertEquals( - checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath()); - } - // ------------------------------------------------------------------------ // RocksDB Memory Control // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java deleted file mode 100644 index 7eb0583192eba..0000000000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.StateBackendOptions; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StateBackendLoader; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.util.Arrays; -import java.util.HashSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** Tests for the RocksDBStateBackendFactory. */ -public class RocksDBStateBackendFactoryTest { - - @Rule public final TemporaryFolder tmp = new TemporaryFolder(); - - private final ClassLoader cl = getClass().getClassLoader(); - - private final String backendKey = StateBackendOptions.STATE_BACKEND.key(); - - // ------------------------------------------------------------------------ - - @Test - public void testFactoryName() { - // construct the name such that it will not be automatically adjusted on refactorings - String factoryName = "org.apache.flink.contrib.streaming.state.Roc"; - factoryName += "ksDBStateBackendFactory"; - - // !!! if this fails, the code in StateBackendLoader must be adjusted - assertEquals(factoryName, RocksDBStateBackendFactory.class.getName()); - } - - @Test - public void testEmbeddedFactoryName() { - // construct the name such that it will not be automatically adjusted on refactorings - String factoryName = "org.apache.flink.contrib.streaming.state.EmbeddedRoc"; - factoryName += "ksDBStateBackendFactory"; - - // !!! if this fails, the code in StateBackendLoader must be adjusted - assertEquals(factoryName, EmbeddedRocksDBStateBackendFactory.class.getName()); - } - - /** - * Validates loading a file system state backend with additional parameters from the cluster - * configuration. - */ - @Test - public void testLoadRocksDBStateBackend() throws Exception { - final String localDir1 = tmp.newFolder().getAbsolutePath(); - final String localDir2 = tmp.newFolder().getAbsolutePath(); - final String localDirs = localDir1 + File.pathSeparator + localDir2; - final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - - // we configure with the explicit string (rather than - // AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name - final Configuration config1 = new Configuration(); - config1.setString(backendKey, "rocksdb"); - config1.set(RocksDBOptions.LOCAL_DIRECTORIES, localDirs); - config1.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); - - final Configuration config2 = new Configuration(); - config2.setString(backendKey, EmbeddedRocksDBStateBackendFactory.class.getName()); - config2.set(RocksDBOptions.LOCAL_DIRECTORIES, localDirs); - config2.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); - - StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); - StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); - - assertTrue(backend1 instanceof EmbeddedRocksDBStateBackend); - assertTrue(backend2 instanceof EmbeddedRocksDBStateBackend); - - EmbeddedRocksDBStateBackend fs1 = (EmbeddedRocksDBStateBackend) backend1; - EmbeddedRocksDBStateBackend fs2 = (EmbeddedRocksDBStateBackend) backend2; - - assertEquals(incremental, fs1.isIncrementalCheckpointsEnabled()); - assertEquals(incremental, fs2.isIncrementalCheckpointsEnabled()); - checkPaths(fs1.getDbStoragePaths(), localDir1, localDir2); - checkPaths(fs2.getDbStoragePaths(), localDir1, localDir2); - } - - /** - * Validates taking the application-defined rocksdb state backend and adding with additional - * parameters from the cluster configuration, but giving precedence to application-defined - * parameters over configuration-defined parameters. - */ - @Test - public void testLoadRocksDBStateBackendMixed() throws Exception { - final String localDir1 = tmp.newFolder().getAbsolutePath(); - final String localDir2 = tmp.newFolder().getAbsolutePath(); - final String localDir3 = tmp.newFolder().getAbsolutePath(); - final String localDir4 = tmp.newFolder().getAbsolutePath(); - - final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - - final EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(incremental); - backend.setDbStoragePaths(localDir1, localDir2); - - final Configuration config = new Configuration(); - config.setString(backendKey, "hashmap"); // this should not be picked up - config.set( - CheckpointingOptions.INCREMENTAL_CHECKPOINTS, - !incremental); // this should not be picked up - config.set( - RocksDBOptions.LOCAL_DIRECTORIES, - localDir3 + ":" + localDir4); // this should not be picked up - - final StateBackend loadedBackend = - StateBackendLoader.fromApplicationOrConfigOrDefault( - backend, new Configuration(), config, cl, null); - assertTrue(loadedBackend instanceof EmbeddedRocksDBStateBackend); - - final EmbeddedRocksDBStateBackend loadedRocks = (EmbeddedRocksDBStateBackend) loadedBackend; - - assertEquals(incremental, loadedRocks.isIncrementalCheckpointsEnabled()); - checkPaths(loadedRocks.getDbStoragePaths(), localDir1, localDir2); - } - - // ------------------------------------------------------------------------ - - private static void checkPaths(String[] pathsArray, String... paths) { - assertNotNull(pathsArray); - assertNotNull(paths); - - assertEquals(pathsArray.length, paths.length); - - HashSet pathsSet = new HashSet<>(Arrays.asList(pathsArray)); - - for (String path : paths) { - assertTrue(pathsSet.contains(path)); - } - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java index bd44c9e1be75f..3218b36a677c7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendMigrationTest.java @@ -19,8 +19,9 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackendMigrationTestBase; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; @@ -33,10 +34,10 @@ import java.util.Arrays; import java.util.Collection; -/** Tests for the partitioned state part of {@link RocksDBStateBackend}. */ +/** Tests for the partitioned state part of {@link EmbeddedRocksDBStateBackend}. */ @ExtendWith(ParameterizedTestExtension.class) public class RocksDBStateBackendMigrationTest - extends StateBackendMigrationTestBase { + extends StateBackendMigrationTestBase { @Parameters(name = "Incremental checkpointing: {0}") public static Collection parameters() { @@ -49,12 +50,10 @@ public static Collection parameters() { private String dbPath; @Override - protected RocksDBStateBackend getStateBackend() throws IOException { + protected EmbeddedRocksDBStateBackend getStateBackend() throws IOException { dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath(); - String checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); - RocksDBStateBackend backend = - new RocksDBStateBackend( - new FsStateBackend(checkpointPath), + EmbeddedRocksDBStateBackend backend = + new EmbeddedRocksDBStateBackend( TernaryBoolean.fromBoolean(enableIncrementalCheckpointing)); Configuration configuration = new Configuration(); @@ -65,4 +64,10 @@ protected RocksDBStateBackend getStateBackend() throws IOException { backend.setDbStoragePath(dbPath); return backend; } + + @Override + protected CheckpointStorage getCheckpointStorage() throws Exception { + String checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); + return new FileSystemCheckpointStorage(checkpointPath); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java index 39f5c31d6e9f5..5f233c1991d93 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.PriorityQueueStateType; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.VoidNamespace; @@ -70,10 +71,11 @@ public class RocksDBStateOptionTest { */ @Test public void testUseOptimizePointLookupWithMapState() throws Exception { - RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); + EmbeddedRocksDBStateBackend rocksDBStateBackend = + createStateBackendWithOptimizePointLookup(); RocksDBKeyedStateBackend keyedStateBackend = createKeyedStateBackend( - rocksDBStateBackend.getEmbeddedRocksDBStateBackend(), + rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); try { @@ -113,10 +115,11 @@ public void testUseOptimizePointLookupWithMapState() throws Exception { */ @Test public void testUseOptimizePointLookupWithPriorityQueue() throws IOException { - RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); + EmbeddedRocksDBStateBackend rocksDBStateBackend = + createStateBackendWithOptimizePointLookup(); RocksDBKeyedStateBackend keyedStateBackend = createKeyedStateBackend( - rocksDBStateBackend.getEmbeddedRocksDBStateBackend(), + rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); try { @@ -159,11 +162,10 @@ public void testUseOptimizePointLookupWithPriorityQueue() throws IOException { } } - private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException { - RocksDBStateBackend rocksDBStateBackend = - new RocksDBStateBackend(tempFolder.newFolder().toURI(), true); - rocksDBStateBackend.setPriorityQueueStateType( - RocksDBStateBackend.PriorityQueueStateType.ROCKSDB); + private EmbeddedRocksDBStateBackend createStateBackendWithOptimizePointLookup() + throws IOException { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + rocksDBStateBackend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB); rocksDBStateBackend.setRocksDBOptions( new RocksDBOptionsFactory() { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java index f03410c7baca8..359f567690fa8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java @@ -45,7 +45,10 @@ public void testTwoSeparateClassLoaders() throws Exception { // collect the libraries / class folders with RocksDB related code: the state backend and // RocksDB itself final URL codePath1 = - RocksDBStateBackend.class.getProtectionDomain().getCodeSource().getLocation(); + EmbeddedRocksDBStateBackend.class + .getProtectionDomain() + .getCodeSource() + .getLocation(); final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation(); final ClassLoader parent = getClass().getClassLoader(); @@ -64,17 +67,15 @@ public void testTwoSeparateClassLoaders() throws Exception { NOOP_EXCEPTION_HANDLER, true); - final String className = RocksDBStateBackend.class.getName(); + final String className = EmbeddedRocksDBStateBackend.class.getName(); final Class clazz1 = Class.forName(className, false, loader1); final Class clazz2 = Class.forName(className, false, loader2); assertNotEquals( "Test broken - the two reflectively loaded classes are equal", clazz1, clazz2); - final Object instance1 = - clazz1.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString()); - final Object instance2 = - clazz2.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString()); + final Object instance1 = clazz1.getConstructor().newInstance(); + final Object instance2 = clazz2.getConstructor().newInstance(); final String tempDir = tmp.newFolder().getAbsolutePath(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java index b4e0eff83b8e3..403c184cb4e1a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -144,6 +145,9 @@ public void testScalingUp() throws Exception { try (KeyedOneInputStreamOperatorTestHarness harness = getHarnessTest(keySelector, maxParallelism, 1, 0)) { harness.setStateBackend(getStateBackend()); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness.open(); validHarnessResult(harness, 1, records); @@ -178,6 +182,9 @@ public void testScalingUp() throws Exception { Assert.assertEquals(new KeyGroupRange(0, 4), localKeyGroupRange20); harness2[0] = getHarnessTest(keySelector, maxParallelism, 2, 0); harness2[0].setStateBackend(getStateBackend()); + harness2[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness2[0].setup(); harness2[0].initializeState(initState1); harness2[0].open(); @@ -187,6 +194,9 @@ public void testScalingUp() throws Exception { Assert.assertEquals(new KeyGroupRange(5, 9), localKeyGroupRange21); harness2[1] = getHarnessTest(keySelector, maxParallelism, 2, 1); harness2[1].setStateBackend(getStateBackend()); + harness2[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness2[1].setup(); harness2[1].initializeState(initState2); harness2[1].open(); @@ -235,6 +245,9 @@ public void testScalingUp() throws Exception { Assert.assertEquals(new KeyGroupRange(0, 3), localKeyGroupRange30); harness3[0] = getHarnessTest(keySelector, maxParallelism, 3, 0); harness3[0].setStateBackend(getStateBackend()); + harness3[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[0].setup(); harness3[0].initializeState(initState1); harness3[0].open(); @@ -244,6 +257,9 @@ public void testScalingUp() throws Exception { Assert.assertEquals(new KeyGroupRange(4, 6), localKeyGroupRange31); harness3[1] = getHarnessTest(keySelector, maxParallelism, 3, 1); harness3[1].setStateBackend(getStateBackend()); + harness3[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[1].setup(); harness3[1].initializeState(initState2); harness3[1].open(); @@ -253,6 +269,9 @@ public void testScalingUp() throws Exception { Assert.assertEquals(new KeyGroupRange(7, 9), localKeyGroupRange32); harness3[2] = getHarnessTest(keySelector, maxParallelism, 3, 2); harness3[2].setStateBackend(getStateBackend()); + harness3[2].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[2].setup(); harness3[2].initializeState(initState3); harness3[2].open(); @@ -285,6 +304,9 @@ public void testScalingDown() throws Exception { Assert.assertEquals(new KeyGroupRange(0, 3), localKeyGroupRange30); harness3[0] = getHarnessTest(keySelector, maxParallelism, 3, 0); harness3[0].setStateBackend(getStateBackend()); + harness3[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[0].open(); // task's key-group [4, 6] @@ -292,6 +314,9 @@ public void testScalingDown() throws Exception { Assert.assertEquals(new KeyGroupRange(4, 6), localKeyGroupRange31); harness3[1] = getHarnessTest(keySelector, maxParallelism, 3, 1); harness3[1].setStateBackend(getStateBackend()); + harness3[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[1].open(); // task's key-group [7, 9] @@ -299,6 +324,9 @@ public void testScalingDown() throws Exception { Assert.assertEquals(new KeyGroupRange(7, 9), localKeyGroupRange32); harness3[2] = getHarnessTest(keySelector, maxParallelism, 3, 2); harness3[2].setStateBackend(getStateBackend()); + harness3[2].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness3[2].open(); validHarnessResult(harness3[0], 1, records[0], records[1], records[2], records[3]); @@ -344,6 +372,9 @@ public void testScalingDown() throws Exception { Assert.assertEquals(new KeyGroupRange(0, 4), localKeyGroupRange20); harness2[0] = getHarnessTest(keySelector, maxParallelism, 2, 0); harness2[0].setStateBackend(getStateBackend()); + harness2[0].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness2[0].setup(); harness2[0].initializeState(initState1); harness2[0].open(); @@ -354,6 +385,9 @@ public void testScalingDown() throws Exception { Assert.assertEquals(new KeyGroupRange(5, 9), localKeyGroupRange21); harness2[1] = getHarnessTest(keySelector, maxParallelism, 2, 1); harness2[1].setStateBackend(getStateBackend()); + harness2[1].setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness2[1].setup(); harness2[1].initializeState(initState2); harness2[1].open(); @@ -385,6 +419,9 @@ public void testScalingDown() throws Exception { // this will choose the state handle generated by harness2[0] to init the target db // without any clipping. harness.setStateBackend(getStateBackend()); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + rootFolder.newFolder().getAbsolutePath())); harness.setup(); harness.initializeState(initState1); harness.open(); @@ -432,8 +469,7 @@ private KeyedOneInputStreamOperatorTestHarness getHarne } private StateBackend getStateBackend() throws Exception { - RocksDBStateBackend rocksDBStateBackend = - new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); Configuration configuration = new Configuration(); configuration.set( RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java index 14bfc4e838f13..891e325c81473 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java @@ -21,10 +21,11 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.ttl.StateBackendTestContext; import org.apache.flink.runtime.state.ttl.TtlStateTestBase; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -52,6 +53,17 @@ protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider protected StateBackend createStateBackend() { return RocksDBTtlStateTestBase.this.createStateBackend(); } + + @Override + protected CheckpointStorage createCheckpointStorage() { + String checkpointPath; + try { + checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to init rocksdb test state backend"); + } + return new FileSystemCheckpointStorage(checkpointPath); + } }; } @@ -59,16 +71,13 @@ protected StateBackend createStateBackend() { StateBackend createStateBackend(TernaryBoolean enableIncrementalCheckpointing) { String dbPath; - String checkpointPath; try { dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath(); - checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); } catch (IOException e) { throw new FlinkRuntimeException("Failed to init rocksdb test state backend"); } - RocksDBStateBackend backend = - new RocksDBStateBackend( - new FsStateBackend(checkpointPath), enableIncrementalCheckpointing); + EmbeddedRocksDBStateBackend backend = + new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); Configuration config = new Configuration(); backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); backend.setDbStoragePath(dbPath); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala index f59e0bff129c3..77a12a1e2b72f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala @@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.dag.Transformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.configuration.Configuration -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend import org.apache.flink.runtime.state.CheckpointStorage import org.apache.flink.runtime.state.StateBackend import org.apache.flink.runtime.state.hashmap.HashMapStateBackend @@ -54,7 +54,7 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { new HashMapStateBackend().configure(conf, classLoader) case ROCKSDB_BACKEND => - new RocksDBStateBackend("file://" + TempDirUtils.newFolder(tempFolder).getAbsoluteFile) + new EmbeddedRocksDBStateBackend() } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java index 3226ab1601193..ad74cad679229 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.BackendBuildingException; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder; @@ -60,7 +61,8 @@ import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.SnapshotStrategyRunner; import org.apache.flink.runtime.state.StateBackendFactory; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -395,11 +397,13 @@ public DeclineSinkFailingStateBackend createFromConfig( * The state backend to create {@link DeclineSinkFailingOperatorStateBackend} at {@link * DeclineSink}. */ - private static class DeclineSinkFailingStateBackend extends FsStateBackend { + private static class DeclineSinkFailingStateBackend extends HashMapStateBackend { private static final long serialVersionUID = 1L; + private final CheckpointStorage checkpointStorage; public DeclineSinkFailingStateBackend(Path checkpointDataUri) { - super(checkpointDataUri); + super(); + checkpointStorage = new FileSystemCheckpointStorage(checkpointDataUri); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java index 344e401dc0293..51e4fd501da22 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; @@ -49,7 +49,8 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.TestTaskStateManager; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.KeyContext; @@ -72,7 +73,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -156,16 +156,16 @@ private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Ex // -------------------------------------------------------------------------- snapshot StateBackend stateBackend; - FsStateBackend fsstateBackend = createStateBackendInternal(); + HashMapStateBackend hashMapStateBackend = createStateBackendInternal(); switch (stateBackendEnum) { case FILE: - stateBackend = fsstateBackend; + stateBackend = hashMapStateBackend; break; case ROCKSDB_FULLY_ASYNC: - stateBackend = new RocksDBStateBackend(fsstateBackend, TernaryBoolean.FALSE); + stateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.FALSE); break; case ROCKSDB_INCREMENTAL: - stateBackend = new RocksDBStateBackend(fsstateBackend, TernaryBoolean.TRUE); + stateBackend = new EmbeddedRocksDBStateBackend(TernaryBoolean.TRUE); break; default: throw new IllegalStateException( @@ -211,6 +211,7 @@ private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Ex mockEnvironment); testHarness.setStateBackend(stateBackend); + testHarness.setCheckpointStorage(new JobManagerCheckpointStorage()); testHarness.open(); for (int i = 0; i < 10; ++i) { @@ -276,9 +277,8 @@ public InternalTimeServiceManager create( testHarness.close(); } - private FsStateBackend createStateBackendInternal() throws IOException { - File checkpointDir = temporaryFolder.newFolder(); - return new FsStateBackend(checkpointDir.toURI()); + private HashMapStateBackend createStateBackendInternal() throws IOException { + return new HashMapStateBackend(); } static class TestOneInputStreamOperator extends AbstractStreamOperator