Skip to content

Commit

Permalink
[FLINK-23728][state-processor-api] State bootstrapping fails on new s…
Browse files Browse the repository at this point in the history
…tate backend factory stack

This closes apache#16849
  • Loading branch information
sjwiesman committed Aug 17, 2021
1 parent 7c3e742 commit 80bd85d
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
package org.apache.flink.state.api.output;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.MathUtils;

import java.io.IOException;

import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
import static org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE;

/** Takes a final snapshot of the state of an operator subtask. */
@Internal
Expand All @@ -43,7 +50,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
long timestamp,
boolean isExactlyOnceMode,
boolean isUnalignedCheckpoint,
CheckpointStorageWorkerView checkpointStorage,
Configuration configuration,
Path savepointPath)
throws Exception {

Expand All @@ -57,9 +64,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s

operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);

CheckpointStreamFactory storage =
checkpointStorage.resolveCheckpointStorageLocation(
CHECKPOINT_ID, options.getTargetLocation());
CheckpointStreamFactory storage = createStreamFactory(configuration, options);

OperatorSnapshotFutures snapshotInProgress =
operator.snapshotState(CHECKPOINT_ID, timestamp, options, storage);
Expand All @@ -70,4 +75,20 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
operator.notifyCheckpointComplete(CHECKPOINT_ID);
return new TaggedOperatorSubtaskState(index, state);
}

private static CheckpointStreamFactory createStreamFactory(
Configuration configuration, CheckpointOptions options) throws IOException {
final Path path =
AbstractFsCheckpointStorageAccess.decodePathFromReference(
options.getTargetLocation());

return new FsCheckpointStorageLocation(
path.getFileSystem(),
path,
path,
path,
options.getTargetLocation(),
MathUtils.checkedDownCast(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes()),
configuration.get(FS_WRITE_BUFFER_SIZE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void endInput() throws Exception {
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
getContainingTask().getConfiguration().getConfiguration(),
savepointPath);

output.collect(new StreamRecord<>(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void endInput() throws Exception {
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
getContainingTask().getConfiguration().getConfiguration(),
savepointPath);

output.collect(new StreamRecord<>(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void endInput() throws Exception {
timestamp,
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
getContainingTask().getCheckpointStorage(),
getContainingTask().getConfiguration().getConfiguration(),
savepointPath);

output.collect(new StreamRecord<>(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void endInput() throws Exception {
operator.getContainingTask()
.getConfiguration()
.isUnalignedCheckpointsEnabled(),
operator.getContainingTask().getCheckpointStorage(),
operator.getContainingTask().getConfiguration().getConfiguration(),
savepointPath);

output.collect(new StreamRecord<>(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
Expand Down Expand Up @@ -100,6 +102,17 @@ public void testRocksDBStateBackend() throws Exception {
testStateBootstrapAndModification(backend);
}

@Test
public void testHashMapStateBackend() throws Exception {
testStateBootstrapAndModification(new HashMapStateBackend());
}

@Test
public void testEmbeddedRocksDBStateBackend() throws Exception {
StateBackend backend = new EmbeddedRocksDBStateBackend();
testStateBootstrapAndModification(backend);
}

public void testStateBootstrapAndModification(StateBackend backend) throws Exception {
final String savepointPath = getTempDirPath(new AbstractID().toHexString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.utils.MaxWatermarkSource;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -111,7 +113,9 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {

private static final List<Tuple2<String, StateBackend>> STATE_BACKENDS =
Arrays.asList(
Tuple2.of("MemoryStateBackend", new MemoryStateBackend()),
Tuple2.of("HashMap", new HashMapStateBackend()),
Tuple2.of("EmbeddedRocksDB", new EmbeddedRocksDBStateBackend()),
Tuple2.of("Memory", new MemoryStateBackend()),
Tuple2.of(
"RocksDB",
new RocksDBStateBackend((StateBackend) new MemoryStateBackend())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.flink.state.api.output;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ttl.mock.MockCheckpointStorage;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
Expand Down Expand Up @@ -55,12 +53,9 @@ public class SnapshotUtilsTest {
@Test
public void testSnapshotUtilsLifecycle() throws Exception {
StreamOperator<Void> operator = new LifecycleOperator();
CheckpointStorageWorkerView storage =
new MockCheckpointStorage().createCheckpointStorage(new JobID());

Path path = new Path(folder.newFolder().getAbsolutePath());

SnapshotUtils.snapshot(operator, 0, 0L, true, false, storage, path);
SnapshotUtils.snapshot(operator, 0, 0L, true, false, new Configuration(), path);

Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
}
Expand Down

0 comments on commit 80bd85d

Please sign in to comment.