Skip to content

Commit

Permalink
[FLINK-19741] Let timer service skip reading raw keyed state if it is…
Browse files Browse the repository at this point in the history
…n't the writer

This closes apache#13761.
  • Loading branch information
tzulitai committed Nov 7, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c33e1ad commit c151abc
Showing 15 changed files with 300 additions and 54 deletions.
2 changes: 2 additions & 0 deletions docs/ops/state/state_backends.md
Original file line number Diff line number Diff line change
@@ -257,6 +257,8 @@ Set the configuration option `state.backend.rocksdb.timer-service.factory` to `h

<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously.*

<span class="label label-info">Note</span> *When using RocksDB state backend with heap-based timers, checkpointing and taking savepoints is expected to fail if there are operators in application that write to raw keyed state.*

### Enabling RocksDB Native Metrics

You can optionally access RockDB's native metrics through Flink's metrics system, by enabling certain metrics selectively.
Original file line number Diff line number Diff line change
@@ -184,7 +184,8 @@ private StreamOperatorStateContext getStreamOperatorStateContext(Environment env
operator.getKeyType().createSerializer(environment.getExecutionConfig()),
registry,
getRuntimeContext().getMetricGroup(),
1.0);
1.0,
false);
} catch (Exception e) {
throw new IOException("Failed to restore state backend", e);
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -255,14 +256,38 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()));
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());

stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}

/**
* Indicates whether or not implementations of this class is writing to the raw keyed state streams
* on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should
* override this method to return {@code true}.
*
* <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally,
* the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and
* ultimately fail with read errors. By setting this flag to {@code true}, this allows the
* {@link AbstractStreamOperator} to know that the data written in the raw keyed states were
* not written by the timer services, and skips the timer restore attempt.
*
* <p>Please refer to FLINK-19741 for further details.
*
* <p>TODO: this method can be removed once all timers are moved to be managed by state backends.
*
* @return flag indicating whether or not this operator is writing to raw keyed
* state via {@link #snapshotState(StateSnapshotContext)}.
*/
@Internal
protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
@@ -322,7 +347,8 @@ public final OperatorSnapshotFutures snapshotState(
checkpointId,
timestamp,
checkpointOptions,
factory);
factory,
isUsingCustomRawKeyedState());
}

/**
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
@@ -205,13 +206,37 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()));
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());

stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
stateHandler.initializeOperatorState(this);
}

/**
* Indicates whether or not implementations of this class is writing to the raw keyed state streams
* on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should
* override this method to return {@code true}.
*
* <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally,
* the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and
* ultimately fail with read errors. By setting this flag to {@code true}, this allows the
* {@link AbstractStreamOperator} to know that the data written in the raw keyed states were
* not written by the timer services, and skips the timer restore attempt.
*
* <p>Please refer to FLINK-19741 for further details.
*
* <p>TODO: this method can be removed once all timers are moved to be managed by state backends.
*
* @return flag indicating whether or not this operator is writing to raw keyed
* state via {@link #snapshotState(StateSnapshotContext)}.
*/
@Internal
protected boolean isUsingCustomRawKeyedState() {
return false;
}

/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
@@ -268,7 +293,8 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
checkpointId,
timestamp,
checkpointOptions,
factory);
factory,
isUsingCustomRawKeyedState());
}

/**
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

@@ -56,15 +56,25 @@ <N> InternalTimerService<N> getInternalTimerService(
void advanceWatermark(Watermark watermark) throws Exception;

/**
* Snapshots the timers to keyed state.
* Snapshots the timers to raw keyed state. This should only be called iff
* {@link #isUsingLegacyRawKeyedStateSnapshots()} returns {@code true}.
*
* <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB
* incremental snapshots.
*/
void snapshotState(
StateSnapshotContext context,
void snapshotToRawKeyedState(
KeyedStateCheckpointOutputStream stateCheckpointOutputStream,
String operatorName) throws Exception;

/**
* Flag indicating whether or not the internal timer services should be checkpointed
* with legacy synchronous snapshots.
*
* <p><b>TODO:</b> This can be removed once heap-based timers are integrated with RocksDB
* incremental snapshots.
*/
boolean isUsingLegacyRawKeyedStateSnapshots();

/**
* A provider pattern for creating an instance of a {@link InternalTimeServiceManager}.
* Allows substituting the manager that will be used at the runtime.
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
@@ -46,6 +45,7 @@
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* An entity keeping all the time-related services. Right now, this is only a
@@ -187,36 +187,31 @@ public void advanceWatermark(Watermark watermark) throws Exception {
////////////////// Fault Tolerance Methods ///////////////////

@Override
public void snapshotState(StateSnapshotContext context, String operatorName) throws Exception {
//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
if (useLegacySynchronousSnapshots) {
public boolean isUsingLegacyRawKeyedStateSnapshots() {
return useLegacySynchronousSnapshots;
}

KeyedStateCheckpointOutputStream out;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " +
operatorName + '.', exception);
}
@Override
public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream out, String operatorName) throws Exception {
checkState(useLegacySynchronousSnapshots);

try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);

snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " + operatorName +
" to checkpoint state stream.", exception);
} finally {
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);

snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " + operatorName +
" to checkpoint state stream.", exception);
} finally {
try {
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This " +
"might have prevented deleting some state data.", operatorName, closeException);
}
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This " +
"might have prevented deleting some state data.", operatorName, closeException);
}
}
}
Original file line number Diff line number Diff line change
@@ -141,7 +141,8 @@ public OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws CheckpointException {
CheckpointStreamFactory factory,
boolean isUsingCustomRawKeyedState) throws CheckpointException {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

@@ -163,7 +164,8 @@ public OperatorSnapshotFutures snapshotState(
checkpointOptions,
factory,
snapshotInProgress,
snapshotContext);
snapshotContext,
isUsingCustomRawKeyedState);

return snapshotInProgress;
}
@@ -178,11 +180,19 @@ void snapshotState(
CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory,
OperatorSnapshotFutures snapshotInProgress,
StateSnapshotContextSynchronousImpl snapshotContext) throws CheckpointException {
StateSnapshotContextSynchronousImpl snapshotContext,
boolean isUsingCustomRawKeyedState) throws CheckpointException {
try {
if (timeServiceManager.isPresent()) {
checkState(keyedStateBackend != null, "keyedStateBackend should be available with timeServiceManager");
timeServiceManager.get().snapshotState(snapshotContext, operatorName);
final InternalTimeServiceManager<?> manager = timeServiceManager.get();

if (manager.isUsingLegacyRawKeyedStateSnapshots()) {
checkState(
!isUsingCustomRawKeyedState,
"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
}
}
streamOperator.snapshotState(snapshotContext);

Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@ public interface StreamTaskStateInitializer {
* @param streamTaskCloseableRegistry the closeable registry to which created closeable objects will be registered.
* @param metricGroup the parent metric group for all statebackend metrics
* @param managedMemoryFraction the managed memory fraction of the operator for state backend
* @param isUsingCustomRawKeyedState flag indicating whether or not the {@link AbstractStreamOperator} is writing
* custom raw keyed state.
* @return a context from which the given operator can initialize everything related to state.
* @throws Exception when something went wrong while creating the context.
*/
@@ -57,5 +59,6 @@ StreamOperatorStateContext streamOperatorStateContext(
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup,
double managedMemoryFraction) throws Exception;
double managedMemoryFraction,
boolean isUsingCustomRawKeyedState) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -124,7 +125,8 @@ public StreamOperatorStateContext streamOperatorStateContext(
@Nullable TypeSerializer<?> keySerializer,
@Nonnull CloseableRegistry streamTaskCloseableRegistry,
@Nonnull MetricGroup metricGroup,
double managedMemoryFraction) throws Exception {
double managedMemoryFraction,
boolean isUsingCustomRawKeyedState) throws Exception {

TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
@@ -173,12 +175,22 @@ public StreamOperatorStateContext streamOperatorStateContext(

// -------------- Internal Timer Service Manager --------------
if (keyedStatedBackend != null) {

// if the operator indicates that it is using custom raw keyed state,
// then whatever was written in the raw keyed state snapshot was NOT written
// by the internal timer services (because there is only ever one user of raw keyed state);
// in this case, timers should not attempt to restore timers from the raw keyed state.
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
(prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState)
? rawKeyedStateInputs
: Collections.emptyList();

timeServiceManager = timeServiceManagerProvider.create(
keyedStatedBackend,
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
rawKeyedStateInputs);
restoredRawKeyedStateTimers);
} else {
timeServiceManager = null;
}
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
@@ -79,12 +79,17 @@ public void advanceWatermark(Watermark watermark) {
}

@Override
public void snapshotState(
StateSnapshotContext context,
public void snapshotToRawKeyedState(
KeyedStateCheckpointOutputStream context,
String operatorName) throws Exception {
throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
}

@Override
public boolean isUsingLegacyRawKeyedStateSnapshots() {
throw new UnsupportedOperationException("Checkpoints are not supported in BATCH execution");
}

public static <K> InternalTimeServiceManager<K> create(
CheckpointableKeyedStateBackend<K> keyedStatedBackend,
ClassLoader userClassloader,
Loading

0 comments on commit c151abc

Please sign in to comment.