diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md index 6b0ae0ebb6659..8dff5d0deaa6c 100644 --- a/docs/ops/state/state_backends.md +++ b/docs/ops/state/state_backends.md @@ -257,6 +257,8 @@ Set the configuration option `state.backend.rocksdb.timer-service.factory` to `h Note *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously.* +Note *When using RocksDB state backend with heap-based timers, checkpointing and taking savepoints is expected to fail if there are operators in application that write to raw keyed state.* + ### Enabling RocksDB Native Metrics You can optionally access RockDB's native metrics through Flink's metrics system, by enabling certain metrics selectively. diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java index d1c4c26df1fd7..9cea7546a5548 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java @@ -184,7 +184,8 @@ private StreamOperatorStateContext getStreamOperatorStateContext(Environment env operator.getKeyType().createSerializer(environment.getExecutionConfig()), registry, getRuntimeContext().getMetricGroup(), - 1.0); + 1.0, + false); } catch (Exception e) { throw new IOException("Failed to restore state backend", e); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 94b3868b484f7..daf82ca663673 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -255,7 +256,8 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), - runtimeContext.getUserCodeClassLoader())); + runtimeContext.getUserCodeClassLoader()), + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); @@ -263,6 +265,29 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); } + /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + *

Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and + * ultimately fail with read errors. By setting this flag to {@code true}, this allows the + * {@link AbstractStreamOperator} to know that the data written in the raw keyed states were + * not written by the timer services, and skips the timer restore attempt. + * + *

Please refer to FLINK-19741 for further details. + * + *

TODO: this method can be removed once all timers are moved to be managed by state backends. + * + * @return flag indicating whether or not this operator is writing to raw keyed + * state via {@link #snapshotState(StateSnapshotContext)}. + */ + @Internal + protected boolean isUsingCustomRawKeyedState() { + return false; + } + /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic, e.g. state initialization. @@ -322,7 +347,8 @@ public final OperatorSnapshotFutures snapshotState( checkpointId, timestamp, checkpointOptions, - factory); + factory, + isUsingCustomRawKeyedState()); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index ec659ad0337a7..fd48589fc0172 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.KeyedStateStore; @@ -205,13 +206,37 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), - runtimeContext.getUserCodeClassLoader())); + runtimeContext.getUserCodeClassLoader()), + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); } + /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + *

Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and + * ultimately fail with read errors. By setting this flag to {@code true}, this allows the + * {@link AbstractStreamOperator} to know that the data written in the raw keyed states were + * not written by the timer services, and skips the timer restore attempt. + * + *

Please refer to FLINK-19741 for further details. + * + *

TODO: this method can be removed once all timers are moved to be managed by state backends. + * + * @return flag indicating whether or not this operator is writing to raw keyed + * state via {@link #snapshotState(StateSnapshotContext)}. + */ + @Internal + protected boolean isUsingCustomRawKeyedState() { + return false; + } + /** * This method is called immediately before any elements are processed, it should contain the * operator's initialization logic, e.g. state initialization. @@ -268,7 +293,8 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times checkpointId, timestamp, checkpointOptions, - factory); + factory, + isUsingCustomRawKeyedState()); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 36dba427df71c..4daa64b91a5b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; -import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -56,15 +56,25 @@ InternalTimerService getInternalTimerService( void advanceWatermark(Watermark watermark) throws Exception; /** - * Snapshots the timers to keyed state. + * Snapshots the timers to raw keyed state. This should only be called iff + * {@link #isUsingLegacyRawKeyedStateSnapshots()} returns {@code true}. * *

TODO: This can be removed once heap-based timers are integrated with RocksDB * incremental snapshots. */ - void snapshotState( - StateSnapshotContext context, + void snapshotToRawKeyedState( + KeyedStateCheckpointOutputStream stateCheckpointOutputStream, String operatorName) throws Exception; + /** + * Flag indicating whether or not the internal timer services should be checkpointed + * with legacy synchronous snapshots. + * + *

TODO: This can be removed once heap-based timers are integrated with RocksDB + * incremental snapshots. + */ + boolean isUsingLegacyRawKeyedStateSnapshots(); + /** * A provider pattern for creating an instance of a {@link InternalTimeServiceManager}. * Allows substituting the manager that will be used at the runtime. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 3ffafb406a146..e6b0e1519467c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.state.KeyGroupsList; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.PriorityQueueSetFactory; -import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; @@ -46,6 +45,7 @@ import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * An entity keeping all the time-related services. Right now, this is only a @@ -187,36 +187,31 @@ public void advanceWatermark(Watermark watermark) throws Exception { ////////////////// Fault Tolerance Methods /////////////////// @Override - public void snapshotState(StateSnapshotContext context, String operatorName) throws Exception { - //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots - if (useLegacySynchronousSnapshots) { + public boolean isUsingLegacyRawKeyedStateSnapshots() { + return useLegacySynchronousSnapshots; + } - KeyedStateCheckpointOutputStream out; - try { - out = context.getRawKeyedOperatorStateOutput(); - } catch (Exception exception) { - throw new Exception("Could not open raw keyed operator state stream for " + - operatorName + '.', exception); - } + @Override + public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, String operatorName) throws Exception { + checkState(useLegacySynchronousSnapshots); + + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); + snapshotStateForKeyGroup( + new DataOutputViewStreamWrapper(out), keyGroupIdx); + } + } catch (Exception exception) { + throw new Exception("Could not write timer service of " + operatorName + + " to checkpoint state stream.", exception); + } finally { try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - snapshotStateForKeyGroup( - new DataOutputViewStreamWrapper(out), keyGroupIdx); - } - } catch (Exception exception) { - throw new Exception("Could not write timer service of " + operatorName + - " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " + - "might have prevented deleting some state data.", operatorName, closeException); - } + out.close(); + } catch (Exception closeException) { + LOG.warn("Could not close raw keyed operator state stream for {}. This " + + "might have prevented deleting some state data.", operatorName, closeException); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index c9e8861be176f..ba51c06189f31 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -141,7 +141,8 @@ public OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, - CheckpointStreamFactory factory) throws CheckpointException { + CheckpointStreamFactory factory, + boolean isUsingCustomRawKeyedState) throws CheckpointException { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; @@ -163,7 +164,8 @@ public OperatorSnapshotFutures snapshotState( checkpointOptions, factory, snapshotInProgress, - snapshotContext); + snapshotContext, + isUsingCustomRawKeyedState); return snapshotInProgress; } @@ -178,11 +180,19 @@ void snapshotState( CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, - StateSnapshotContextSynchronousImpl snapshotContext) throws CheckpointException { + StateSnapshotContextSynchronousImpl snapshotContext, + boolean isUsingCustomRawKeyedState) throws CheckpointException { try { if (timeServiceManager.isPresent()) { checkState(keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager"); - timeServiceManager.get().snapshotState(snapshotContext, operatorName); + final InternalTimeServiceManager manager = timeServiceManager.get(); + + if (manager.isUsingLegacyRawKeyedStateSnapshots()) { + checkState( + !isUsingCustomRawKeyedState, + "Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write."); + manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName); + } } streamOperator.snapshotState(snapshotContext); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java index 087be92da1841..d6c49254230d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java @@ -46,6 +46,8 @@ public interface StreamTaskStateInitializer { * @param streamTaskCloseableRegistry the closeable registry to which created closeable objects will be registered. * @param metricGroup the parent metric group for all statebackend metrics * @param managedMemoryFraction the managed memory fraction of the operator for state backend + * @param isUsingCustomRawKeyedState flag indicating whether or not the {@link AbstractStreamOperator} is writing + * custom raw keyed state. * @return a context from which the given operator can initialize everything related to state. * @throws Exception when something went wrong while creating the context. */ @@ -57,5 +59,6 @@ StreamOperatorStateContext streamOperatorStateContext( @Nullable TypeSerializer keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, - double managedMemoryFraction) throws Exception; + double managedMemoryFraction, + boolean isUsingCustomRawKeyedState) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 5b048639a517a..8238fcd41e9a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -124,7 +125,8 @@ public StreamOperatorStateContext streamOperatorStateContext( @Nullable TypeSerializer keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, - double managedMemoryFraction) throws Exception { + double managedMemoryFraction, + boolean isUsingCustomRawKeyedState) throws Exception { TaskInfo taskInfo = environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription = @@ -173,12 +175,22 @@ public StreamOperatorStateContext streamOperatorStateContext( // -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) { + + // if the operator indicates that it is using custom raw keyed state, + // then whatever was written in the raw keyed state snapshot was NOT written + // by the internal timer services (because there is only ever one user of raw keyed state); + // in this case, timers should not attempt to restore timers from the raw keyed state. + final Iterable restoredRawKeyedStateTimers = + (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) + ? rawKeyedStateInputs + : Collections.emptyList(); + timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, - rawKeyedStateInputs); + restoredRawKeyedStateTimers); } else { timeServiceManager = null; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index edb7ae2a8928d..4a5f17d8b35c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; @@ -79,12 +79,17 @@ public void advanceWatermark(Watermark watermark) { } @Override - public void snapshotState( - StateSnapshotContext context, + public void snapshotToRawKeyedState( + KeyedStateCheckpointOutputStream context, String operatorName) throws Exception { throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution"); } + @Override + public boolean isUsingLegacyRawKeyedStateSnapshots() { + throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution"); + } + public static InternalTimeServiceManager create( CheckpointableKeyedStateBackend keyedStatedBackend, ClassLoader userClassloader, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index dc3067534079f..6b035dffaeb13 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -20,12 +20,17 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,10 +38,16 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import static junit.framework.TestCase.assertTrue; @@ -67,6 +78,22 @@ protected KeyedOneInputStreamOperatorTestHarness KeyedOneInputStreamOperatorTestHarness createTestHarness( + int maxParalelism, + int numSubtasks, + int subtaskIndex, + OneInputStreamOperator testOperator, + KeySelector keySelector, + TypeInformation keyTypeInfo) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + keySelector, + keyTypeInfo, + maxParalelism, + numSubtasks, + subtaskIndex); + } + @Test public void testStateDoesNotInterfere() throws Exception { try (KeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness()) { @@ -410,6 +437,46 @@ public void testStateAndTimerStateShufflingScalingDown() throws Exception { } } + @Test + public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception { + // setup: 10 key groups, all assigned to single subtask + final int maxParallelism = 10; + final int numSubtasks = 1; + final int subtaskIndex = 0; + final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, maxParallelism - 1); + + final byte[] testSnapshotData = "TEST".getBytes(); + final CustomRawKeyedStateTestOperator testOperator = new CustomRawKeyedStateTestOperator(testSnapshotData); + + // snapshot and then restore + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness testHarness = createTestHarness( + maxParallelism, + numSubtasks, + subtaskIndex, + testOperator, + input -> input, + BasicTypeInfo.STRING_TYPE_INFO)) { + testHarness.setup(); + testHarness.open(); + snapshot = testHarness.snapshot(0, 0); + } + + try (KeyedOneInputStreamOperatorTestHarness testHarness = createTestHarness( + maxParallelism, + numSubtasks, + subtaskIndex, + testOperator, + input -> input, + BasicTypeInfo.STRING_TYPE_INFO)) { + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + } + + assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange)); + } + /** * Extracts the result values form the test harness and clear the output queue. */ @@ -501,6 +568,57 @@ public void onProcessingTime(InternalTimer timer) throws } } + /** + * Operator that writes arbitrary bytes to raw keyed state on snapshots. + */ + private static class CustomRawKeyedStateTestOperator + extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private final byte[] snapshotBytes; + + private Map restoredRawKeyedState; + + CustomRawKeyedStateTestOperator(byte[] snapshotBytes) { + this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + // do nothing + } + + @Override + protected boolean isUsingCustomRawKeyedState() { + return true; + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + KeyedStateCheckpointOutputStream rawKeyedStateStream = context.getRawKeyedOperatorStateOutput(); + for (int keyGroupId : rawKeyedStateStream.getKeyGroupList()) { + rawKeyedStateStream.startNewKeyGroup(keyGroupId); + rawKeyedStateStream.write(snapshotBytes); + } + rawKeyedStateStream.close(); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + restoredRawKeyedState = new HashMap<>(); + for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { + byte[] readBuffer = new byte[snapshotBytes.length]; + int ignored = streamProvider.getStream().read(readBuffer); + restoredRawKeyedState.put(streamProvider.getKeyGroupId(), readBuffer); + } + } + } + private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { Random rand = new Random(System.currentTimeMillis()); int result = rand.nextInt(); @@ -509,4 +627,28 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism } return result; } + + private static Matcher> hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) { + return new TypeSafeMatcher>() { + @Override + protected boolean matchesSafely(Map restored) { + if (restored.size() != range.getNumberOfKeyGroups()) { + return false; + } + + for (int writtenKeyGroupId : range) { + if (!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) { + return false; + } + } + + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("Key groups: " + range + " with snapshot data " + Arrays.toString(testSnapshotData)); + } + }; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 5d8943393dd3f..9f959ec9168dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -201,7 +201,8 @@ public InternalTimeServiceManager create( IntSerializer.INSTANCE, closableRegistry, new UnregisteredMetricsGroup(), - 1.0); + 1.0, + false); this.initializationContext = new StateInitializationContextImpl( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java index 8602093bcee52..2565d22f7bf50 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java @@ -105,7 +105,8 @@ public void testFailingBackendSnapshotMethod() throws Exception { IntSerializer.INSTANCE, closeableRegistry, new InterceptingOperatorMetricGroup(), - 1.0); + 1.0, + false); StreamOperatorStateHandler stateHandler = new StreamOperatorStateHandler(stateContext, new ExecutionConfig(), closeableRegistry); final String keyedStateField = "keyedStateField"; @@ -146,7 +147,8 @@ public void snapshotState(StateSnapshotContext context) throws Exception { CheckpointOptions.forCheckpointWithDefaultLocation(), new MemCheckpointStreamFactory(1024), operatorSnapshotResult, - context); + context, + false); fail("Exception expected."); } catch (CheckpointException e) { // We can not check for ExpectedTestException class directly, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index a02630b8c9da3..b95fb07045e58 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -103,7 +103,8 @@ public void testNoRestore() throws Exception { typeSerializer, closeableRegistry, new UnregisteredMetricsGroup(), - 1.0); + 1.0, + false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend(); @@ -212,7 +213,8 @@ public OperatorStateBackend createOperatorStateBackend( typeSerializer, closeableRegistry, new UnregisteredMetricsGroup(), - 1.0); + 1.0, + false); OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); CheckpointableKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 378b0c9ce78ea..a8541964cd50e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1529,7 +1529,15 @@ protected void cleanup() throws Exception {} @Override public StreamTaskStateInitializer createStreamTaskStateInitializer() { final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer(); - return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, fraction) -> { + return (operatorID, + operatorClassName, + processingTimeService, + keyContext, + keySerializer, + closeableRegistry, + metricGroup, + fraction, + isUsingCustomRawKeyedState) -> { final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext( operatorID, @@ -1539,7 +1547,8 @@ public StreamTaskStateInitializer createStreamTaskStateInitializer() { keySerializer, closeableRegistry, metricGroup, - fraction); + fraction, + isUsingCustomRawKeyedState); return new StreamOperatorStateContext() { @Override