Skip to content

Commit

Permalink
[FLINK-6633] Register shared state before adding to CompletedCheckpoi…
Browse files Browse the repository at this point in the history
…ntStore
  • Loading branch information
StefanRRichter committed May 19, 2017
1 parent 3d119e1 commit 0162543
Show file tree
Hide file tree
Showing 28 changed files with 747 additions and 776 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
Expand Down Expand Up @@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** True if incremental checkpointing is enabled */
private final boolean enableIncrementalCheckpointing;

/** The sst files materialized in pending checkpoints */
private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();

/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
Expand Down Expand Up @@ -720,7 +721,7 @@ private static final class RocksDBIncrementalSnapshotOperation<K> {
private final long checkpointTimestamp;

/** All sst files that were part of the last previously completed checkpoint */
private Map<StateHandleID, StreamStateHandle> baseSstFiles;
private Set<StateHandleID> baseSstFiles;

/** The state meta data */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
Expand All @@ -732,10 +733,7 @@ private static final class RocksDBIncrementalSnapshotOperation<K> {
private final CloseableRegistry closeableRegistry = new CloseableRegistry();

// new sst files since the last completed checkpoint
private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>();

// old sst files which have been materialized in previous completed checkpoints
private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>();
private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();

// handles to the misc files in the current snapshot
private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
Expand Down Expand Up @@ -830,7 +828,6 @@ void takeSnapshot() throws Exception {
// use the last completed checkpoint as the comparison base.
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);


// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
Expand Down Expand Up @@ -867,18 +864,17 @@ KeyedStateHandle materializeSnapshot() throws Exception {
final StateHandleID stateHandleID = new StateHandleID(fileName);

if (fileName.endsWith(SST_FILE_SUFFIX)) {
StreamStateHandle fileHandle =
baseSstFiles == null ? null : baseSstFiles.get(fileName);
final boolean existsAlready =
baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);

if (fileHandle == null) {
fileHandle = materializeStateData(filePath);
newSstFiles.put(stateHandleID, fileHandle);
} else {
if (existsAlready) {
// we introduce a placeholder state handle, that is replaced with the
// original from the shared state registry (created from a previous checkpoint)
oldSstFiles.put(
sstFiles.put(
stateHandleID,
new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
new PlaceholderStreamStateHandle());
} else {
sstFiles.put(stateHandleID, materializeStateData(filePath));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
Expand All @@ -887,22 +883,17 @@ KeyedStateHandle materializeSnapshot() throws Exception {
}
}

Map<StateHandleID, StreamStateHandle> sstFiles =
new HashMap<>(newSstFiles.size() + oldSstFiles.size());

sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);

synchronized (stateBackend.asyncSnapshotLock) {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
}

return new IncrementalKeyedStateHandle(
stateBackend.operatorIdentifier,
stateBackend.keyGroupRange,
checkpointId,
newSstFiles,
oldSstFiles,
sstFiles,
miscFiles,
metaStateHandle);
}
Expand Down Expand Up @@ -933,7 +924,7 @@ void releaseResources(boolean canceled) {

statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(newSstFiles.values());
statesToDiscard.addAll(sstFiles.values());

try {
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
Expand Down Expand Up @@ -1308,15 +1299,12 @@ private void restoreInstance(
UUID.randomUUID().toString());

try {
final Map<StateHandleID, StreamStateHandle> newSstFiles =
restoreStateHandle.getCreatedSharedState();
final Map<StateHandleID, StreamStateHandle> oldSstFiles =
restoreStateHandle.getReferencedSharedState();
final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();

readAllStateData(newSstFiles, restoreInstancePath);
readAllStateData(oldSstFiles, restoreInstancePath);
readAllStateData(sstFiles, restoreInstancePath);
readAllStateData(miscFiles, restoreInstancePath);

// read meta data
Expand Down Expand Up @@ -1409,8 +1397,7 @@ private void restoreInstance(
throw new IOException("Could not create RocksDB data directory.");
}

createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);

List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
Expand All @@ -1437,10 +1424,7 @@ private void restoreInstance(


// use the restore sst files as the base for succeeding checkpoints
Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles);
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());

stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -58,7 +62,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.RunnableFuture;

import static junit.framework.TestCase.assertNotNull;
Expand All @@ -67,6 +75,7 @@
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static org.powermock.api.mockito.PowerMockito.mock;
Expand Down Expand Up @@ -351,6 +360,83 @@ public void testDisposeDeletesAllDirectories() throws Exception {
assertEquals(1, allFilesInDbDir.size());
}

@Test
public void testSharedIncrementalStateDeRegistration() throws Exception {
if (enableIncrementalCheckpointing) {
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
ValueStateDescriptor<String> kvId =
new ValueStateDescriptor<>("id", String.class, null);

kvId.initializeSerializerUnlessSet(new ExecutionConfig());

ValueState<String> state =
backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);


Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {

reset(sharedStateRegistry);

backend.setCurrentKey(checkpointId);
state.update("Hello-" + checkpointId);

RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
checkpointId,
checkpointId,
createStreamFactory(),
CheckpointOptions.forFullCheckpoint());

snapshot.run();

IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
Map<StateHandleID, StreamStateHandle> sharedState =
new HashMap<>(stateHandle.getSharedState());

stateHandle.registerSharedStates(sharedStateRegistry);

for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
verify(sharedStateRegistry).registerReference(
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
e.getValue());
}

previousStateHandles.add(stateHandle);
backend.notifyCheckpointComplete(checkpointId);

//-----------------------------------------------------------------

if (previousStateHandles.size() > 1) {
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
}
}

while (!previousStateHandles.isEmpty()) {

reset(sharedStateRegistry);

checkRemove(previousStateHandles.remove(), sharedStateRegistry);
}

backend.close();
backend.dispose();
}
}

private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
for (StateHandleID id : remove.getSharedState().keySet()) {
verify(registry, times(0)).unregisterReference(
remove.createSharedStateRegistryKeyFromFileName(id));
}

remove.discardState();

for (StateHandleID id : remove.getSharedState().keySet()) {
verify(registry).unregisterReference(
remove.createSharedStateRegistryKeyFromFileName(id));
}
}

private void runStateUpdates() throws Exception{
for (int i = 50; i < 150; ++i) {
Expand Down
Loading

0 comments on commit 0162543

Please sign in to comment.