Skip to content

Commit

Permalink
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)…
Browse files Browse the repository at this point in the history
…start
  • Loading branch information
StefanRRichter committed Aug 15, 2017
1 parent b71154a commit 91a4b27
Show file tree
Hide file tree
Showing 27 changed files with 743 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);

LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
this.operatorIdentifier,
this.backendUID);
}

/**
Expand Down Expand Up @@ -883,11 +886,17 @@ private StreamStateHandle materializeMetaData() throws Exception {
void takeSnapshot() throws Exception {
assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));

final long lastCompletedCheckpoint;

// use the last completed checkpoint as the comparison base.
synchronized (stateBackend.materializedSstFiles) {
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
}

LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);

// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -174,8 +175,11 @@ public class CheckpointCoordinator {
@Nullable
private CheckpointStatsTracker statsTracker;

/** A factory for SharedStateRegistry objects */
private final SharedStateRegistryFactory sharedStateRegistryFactory;

/** Registry that tracks state which is shared across (incremental) checkpoints */
private final SharedStateRegistry sharedStateRegistry;
private SharedStateRegistry sharedStateRegistry;

// --------------------------------------------------------------------------------------------

Expand All @@ -192,7 +196,8 @@ public CheckpointCoordinator(
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
@Nullable String checkpointDirectory,
Executor executor) {
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory) {

// sanity checks
checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
Expand Down Expand Up @@ -230,7 +235,8 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
this.sharedStateRegistry = new SharedStateRegistry(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
Expand Down Expand Up @@ -1043,10 +1049,23 @@ public boolean restoreLatestCheckpointedState(
throw new IllegalStateException("CheckpointCoordinator is shut down");
}

// Recover the checkpoints
completedCheckpointStore.recover(sharedStateRegistry);
// We create a new shared state registry object, so that all pending async disposal requests from previous
// runs will go against the old object (were they can do no harm).
// This must happen under the checkpoint lock.
sharedStateRegistry.close();
sharedStateRegistry = sharedStateRegistryFactory.create(executor);

// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
completedCheckpointStore.recover();

// Now, we re-register all (shared) states from the checkpoint store with the new registry
for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}

LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);

// restore from the latest checkpoint
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();

if (latest == null) {
Expand Down Expand Up @@ -1120,7 +1139,6 @@ public boolean restoreSavepoint(
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
job, tasks, savepointPath, userClassLoader, allowNonRestored);

savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpointStore.addCheckpoint(savepoint);

// Reset the checkpoint ID counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {

private void doDiscard() throws Exception {

LOG.trace("Executing discard procedure for {}.", this);

try {
// collect exceptions and continue cleanup
Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.SharedStateRegistry;

import java.util.List;

Expand All @@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
*
* @param sharedStateRegistry the shared state registry to register recovered states.
*/
void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
void recover() throws Exception;

/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.SharedStateRegistry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,7 +57,7 @@ public StandaloneCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) {
}

@Override
public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
public void recover() throws Exception {
// Nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -138,14 +139,13 @@ public boolean requiresExternalizedCheckpoints() {
* that the history of checkpoints is consistent.
*/
@Override
public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");

// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
sharedStateRegistry.clear();

// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
Expand All @@ -170,8 +170,6 @@ public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
// Re-register all shared states in the checkpoint.
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedThrowable;
Expand All @@ -69,8 +70,8 @@
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -456,7 +457,8 @@ public void enableCheckpointing(
checkpointIDCounter,
checkpointStore,
checkpointDir,
ioExecutor);
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);

// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
Expand All @@ -39,7 +40,6 @@
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;

import org.slf4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private final UUID backendIdentifier;

/**
* The key-group range covered by this state handle
* The key-group range covered by this state handle.
*/
private final KeyGroupRange keyGroupRange;

/**
* The checkpoint Id
* The checkpoint Id.
*/
private final long checkpointId;

/**
* Shared state in the incremental checkpoint. This i
* Shared state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> sharedState;

/**
* Private state in the incremental checkpoint
* Private state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> privateState;

/**
* Primary meta data state of the incremental checkpoint
* Primary meta data state of the incremental checkpoint.
*/
private final StreamStateHandle metaStateHandle;

Expand Down Expand Up @@ -143,16 +143,21 @@ public UUID getBackendIdentifier() {

@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
return this;
} else {
return null;
}
return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
null : this;
}

@Override
public void discardState() throws Exception {

SharedStateRegistry registry = this.sharedStateRegistry;
final boolean isRegistered = (registry != null);

LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
isRegistered,
checkpointId,
backendIdentifier);

try {
metaStateHandle.discardState();
} catch (Exception e) {
Expand All @@ -168,19 +173,20 @@ public void discardState() throws Exception {
// If this was not registered, we can delete the shared state. We can simply apply this
// to all handles, because all handles that have not been created for the first time for this
// are only placeholders at this point (disposing them is a NOP).
if (sharedStateRegistry == null) {
try {
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard new sst file states.", e);
}
} else {
if (isRegistered) {
// If this was registered, we only unregister all our referenced shared states
// from the registry.
for (StateHandleID stateHandleID : sharedState.keySet()) {
sharedStateRegistry.unregisterReference(
registry.unregisterReference(
createSharedStateRegistryKeyFromFileName(stateHandleID));
}
} else {
// Otherwise, we assume to own those handles and dispose them directly.
try {
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
} catch (Exception e) {
LOG.warn("Could not properly discard new sst file states.", e);
}
}
}

Expand All @@ -202,10 +208,21 @@ public long getStateSize() {
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {

Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
// This is a quick check to avoid that we register twice with the same registry. However, the code allows to
// register again with a different registry. The implication is that ownership is transferred to this new
// registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
// SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
// an old registry object from a previous run is due to be GCed and will never be used for registration again.
Preconditions.checkState(
sharedStateRegistry != stateRegistry,
"The state handle has already registered its shared states to the given registry.");

sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);

LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
checkpointId,
backendIdentifier);

for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
Expand Down Expand Up @@ -284,5 +301,18 @@ public int hashCode() {
result = 31 * result + getMetaStateHandle().hashCode();
return result;
}

@Override
public String toString() {
return "IncrementalKeyedStateHandle{" +
"backendIdentifier=" + backendIdentifier +
", keyGroupRange=" + keyGroupRange +
", checkpointId=" + checkpointId +
", sharedState=" + sharedState +
", privateState=" + privateState +
", metaStateHandle=" + metaStateHandle +
", registered=" + (sharedStateRegistry != null) +
'}';
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public int hashCode() {
public String toString() {
return "KeyGroupsStateHandle{" +
"groupRangeOffsets=" + groupRangeOffsets +
", data=" + stateHandle +
", stateHandle=" + stateHandle +
'}';
}
}
Loading

0 comments on commit 91a4b27

Please sign in to comment.