diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index 9a1867fd54efb..e149607a65ed7 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -19,16 +19,23 @@ package org.apache.flink.state.api.output; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.MathUtils; + +import java.io.IOException; + +import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; +import static org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE; /** Takes a final snapshot of the state of an operator subtask. */ @Internal @@ -43,7 +50,7 @@ public static > TaggedOperatorSubtaskState s long timestamp, boolean isExactlyOnceMode, boolean isUnalignedCheckpoint, - CheckpointStorageWorkerView checkpointStorage, + Configuration configuration, Path savepointPath) throws Exception { @@ -57,9 +64,7 @@ public static > TaggedOperatorSubtaskState s operator.prepareSnapshotPreBarrier(CHECKPOINT_ID); - CheckpointStreamFactory storage = - checkpointStorage.resolveCheckpointStorageLocation( - CHECKPOINT_ID, options.getTargetLocation()); + CheckpointStreamFactory storage = createStreamFactory(configuration, options); OperatorSnapshotFutures snapshotInProgress = operator.snapshotState(CHECKPOINT_ID, timestamp, options, storage); @@ -70,4 +75,20 @@ public static > TaggedOperatorSubtaskState s operator.notifyCheckpointComplete(CHECKPOINT_ID); return new TaggedOperatorSubtaskState(index, state); } + + private static CheckpointStreamFactory createStreamFactory( + Configuration configuration, CheckpointOptions options) throws IOException { + final Path path = + AbstractFsCheckpointStorageAccess.decodePathFromReference( + options.getTargetLocation()); + + return new FsCheckpointStorageLocation( + path.getFileSystem(), + path, + path, + path, + options.getTargetLocation(), + MathUtils.checkedDownCast(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()), + configuration.get(FS_WRITE_BUFFER_SIZE)); + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java index 38c37eaf973bc..60ab7ffe5c447 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java @@ -78,7 +78,7 @@ public void endInput() throws Exception { timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java index 5e9f91de9be47..1817304143f44 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java @@ -93,7 +93,7 @@ public void endInput() throws Exception { timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java index ab6f11225eec4..08dace21677f7 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java @@ -74,7 +74,7 @@ public void endInput() throws Exception { timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), - getContainingTask().getCheckpointStorage(), + getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index a72fcdc6a08c5..933602abb2da9 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -196,7 +196,7 @@ public void endInput() throws Exception { operator.getContainingTask() .getConfiguration() .isUnalignedCheckpointsEnabled(), - operator.getContainingTask().getCheckpointStorage(), + operator.getContainingTask().getConfiguration().getConfiguration(), savepointPath); output.collect(new StreamRecord<>(state)); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index d13ba03bf1728..e35e40903f197 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction; import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction; import org.apache.flink.state.api.functions.StateBootstrapFunction; @@ -100,6 +102,17 @@ public void testRocksDBStateBackend() throws Exception { testStateBootstrapAndModification(backend); } + @Test + public void testHashMapStateBackend() throws Exception { + testStateBootstrapAndModification(new HashMapStateBackend()); + } + + @Test + public void testEmbeddedRocksDBStateBackend() throws Exception { + StateBackend backend = new EmbeddedRocksDBStateBackend(); + testStateBootstrapAndModification(backend); + } + public void testStateBootstrapAndModification(StateBackend backend) throws Exception { final String savepointPath = getTempDirPath(new AbstractID().toHexString()); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java index 9a1d7741d7a49..7a93c12558267 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java @@ -28,10 +28,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.state.api.utils.MaxWatermarkSource; import org.apache.flink.streaming.api.datastream.DataStream; @@ -111,7 +113,9 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { private static final List> STATE_BACKENDS = Arrays.asList( - Tuple2.of("MemoryStateBackend", new MemoryStateBackend()), + Tuple2.of("HashMap", new HashMapStateBackend()), + Tuple2.of("EmbeddedRocksDB", new EmbeddedRocksDBStateBackend()), + Tuple2.of("Memory", new MemoryStateBackend()), Tuple2.of( "RocksDB", new RocksDBStateBackend((StateBackend) new MemoryStateBackend()))); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index f2056f367d5b6..773f80eec1543 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -18,14 +18,12 @@ package org.apache.flink.state.api.output; -import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ttl.mock.MockCheckpointStorage; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; @@ -55,12 +53,9 @@ public class SnapshotUtilsTest { @Test public void testSnapshotUtilsLifecycle() throws Exception { StreamOperator operator = new LifecycleOperator(); - CheckpointStorageWorkerView storage = - new MockCheckpointStorage().createCheckpointStorage(new JobID()); - Path path = new Path(folder.newFolder().getAbsolutePath()); - SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path); + SnapshotUtils.snapshot(operator, 0, 0L, true, false, new Configuration(), path); Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING); }