Skip to content

Commit

Permalink
[FLINK-35784][checkpoint] Fix the missing shared state registration o…
Browse files Browse the repository at this point in the history
…f file-merging directories (apache#25051)
  • Loading branch information
Zakelly authored Jul 9, 2024
1 parent 050767c commit 3d79387
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -225,6 +226,13 @@ public void discardState() {
public void registerSharedStates(SharedStateRegistry sharedStateRegistry, long checkpointID) {
registerSharedState(sharedStateRegistry, managedKeyedState, checkpointID);
registerSharedState(sharedStateRegistry, rawKeyedState, checkpointID);
registerFileMergingDirectoryHandle(
sharedStateRegistry,
managedOperatorState.stream()
.filter(e -> e instanceof FileMergingOperatorStreamStateHandle)
.map(e -> (FileMergingOperatorStreamStateHandle) e)
.collect(Collectors.toList()),
checkpointID);
}

private static void registerSharedState(
Expand All @@ -245,6 +253,17 @@ private static void registerSharedState(
}
}

private static void registerFileMergingDirectoryHandle(
SharedStateRegistry sharedStateRegistry,
Iterable<FileMergingOperatorStreamStateHandle> stateHandles,
long checkpointID) {
for (FileMergingOperatorStreamStateHandle stateHandle : stateHandles) {
if (stateHandle != null) {
stateHandle.registerSharedStates(sharedStateRegistry, checkpointID);
}
}
}

@Override
public long getCheckpointedSize() {
return checkpointedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -83,12 +82,9 @@ public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpo
LOG.trace(
"Registering FileMergingOperatorStreamStateHandle for checkpoint {} from backend.",
checkpointId);
stateRegistry.registerReference(
new SharedStateRegistryKey(
getDelegateStateHandle().getStreamStateHandleID().getKeyString()),
getDelegateStateHandle(),
checkpointId);

// Only register the directory here, leave the delegateStateHandle unregistered, since the
// OperatorSubtaskState will only take care of the keyed state while leaving others
// unregistered.
stateRegistry.registerReference(
taskOwnedDirHandle.createStateRegistryKey(), taskOwnedDirHandle, checkpointId);
stateRegistry.registerReference(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ private static String runJobAndGetExternalizedCheckpoint(
throws Exception {
// complete at least two checkpoints so that the initial checkpoint can be subsumed
return runJobAndGetExternalizedCheckpoint(
backend, externalCheckpoint, cluster, restoreMode, new Configuration(), 2);
backend, externalCheckpoint, cluster, restoreMode, new Configuration(), 2, true);
}

static String runJobAndGetExternalizedCheckpoint(
Expand All @@ -413,9 +413,11 @@ static String runJobAndGetExternalizedCheckpoint(
MiniClusterWithClientResource cluster,
RestoreMode restoreMode,
Configuration jobConfig,
int consecutiveCheckpoints)
int consecutiveCheckpoints,
boolean retainCheckpoints)
throws Exception {
JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, restoreMode, jobConfig);
JobGraph initialJobGraph =
getJobGraph(backend, externalCheckpoint, restoreMode, jobConfig, retainCheckpoints);
NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM);
cluster.getClusterClient().submitJob(initialJobGraph).get();

Expand All @@ -439,7 +441,8 @@ private static JobGraph getJobGraph(
StateBackend backend,
@Nullable String externalCheckpoint,
RestoreMode restoreMode,
Configuration jobConfig) {
Configuration jobConfig,
boolean retainCheckpoints) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(jobConfig);

Expand All @@ -448,7 +451,9 @@ private static JobGraph getJobGraph(
env.setParallelism(PARALLELISM);
env.getCheckpointConfig()
.setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
retainCheckpoints
? ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION
: ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.noRestart());

env.addSource(new NotifyingInfiniteTupleSource(10_000))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
Expand All @@ -44,6 +45,8 @@
import java.util.Collection;
import java.util.List;

import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR;
import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR;
import static org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -115,7 +118,8 @@ private void testSwitchingFileMerging(
firstCluster,
restoreMode,
config,
consecutiveCheckpoint);
consecutiveCheckpoint,
true);
assertThat(firstCheckpoint).isNotNull();
verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch);
} finally {
Expand All @@ -142,7 +146,8 @@ private void testSwitchingFileMerging(
secondCluster,
restoreMode,
config,
consecutiveCheckpoint);
consecutiveCheckpoint,
true);
assertThat(secondCheckpoint).isNotNull();
verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch);
} finally {
Expand All @@ -159,20 +164,58 @@ private void testSwitchingFileMerging(
.setNumberSlotsPerTaskManager(2)
.build());
thirdCluster.before();
String thirdCheckpoint;
try {
String thirdCheckpoint =
thirdCheckpoint =
runJobAndGetExternalizedCheckpoint(
stateBackend3,
secondCheckpoint,
thirdCluster,
restoreMode,
config,
consecutiveCheckpoint);
consecutiveCheckpoint,
true);
assertThat(thirdCheckpoint).isNotNull();
verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch);
} finally {
thirdCluster.after();
}

// We config ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION here.
EmbeddedRocksDBStateBackend stateBackend4 = new EmbeddedRocksDBStateBackend();
stateBackend4.configure(config, Thread.currentThread().getContextClassLoader());
MiniClusterWithClientResource fourthCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(3)
.setNumberSlotsPerTaskManager(2)
.build());
fourthCluster.before();
String fourthCheckpoint;
try {
fourthCheckpoint =
runJobAndGetExternalizedCheckpoint(
stateBackend4,
thirdCheckpoint,
fourthCluster,
restoreMode,
config,
consecutiveCheckpoint,
false);
assertThat(fourthCheckpoint).isNotNull();
} finally {
fourthCluster.after();
}

waitUntilNoJobThreads();
verifyCheckpointExist(
firstCheckpoint, restoreMode != RestoreMode.CLAIM, firstFileMergingSwitch);
verifyCheckpointExist(
secondCheckpoint, restoreMode != RestoreMode.CLAIM, secondFileMergingSwitch);
verifyCheckpointExist(
thirdCheckpoint, restoreMode != RestoreMode.CLAIM, secondFileMergingSwitch);
verifyCheckpointExist(fourthCheckpoint, false, secondFileMergingSwitch);
}

private void verifyStateHandleType(String checkpointPath, boolean fileMergingEnabled)
Expand Down Expand Up @@ -205,4 +248,49 @@ private void verifyStateHandleType(String checkpointPath, boolean fileMergingEna
}
assertThat(hasKeyedState).isTrue();
}

private static void waitUntilNoJobThreads() throws InterruptedException {
SecurityManager securityManager = System.getSecurityManager();
ThreadGroup group =
(securityManager != null)
? securityManager.getThreadGroup()
: Thread.currentThread().getThreadGroup();

boolean jobThreads = true;
while (jobThreads) {
jobThreads = false;
Thread[] activeThreads = new Thread[group.activeCount() * 2];
group.enumerate(activeThreads);
for (Thread thread : activeThreads) {
if (thread != null
&& thread != Thread.currentThread()
&& thread.getName().contains("jobmanager")) {
jobThreads = true;
Thread.sleep(500);
break;
}
}
}
}

private void verifyCheckpointExist(
String checkpointPath, boolean exist, boolean fileMergingEnabled) throws IOException {
org.apache.flink.core.fs.Path checkpointDir =
new org.apache.flink.core.fs.Path(checkpointPath);
FileSystem fs = checkpointDir.getFileSystem();
assertThat(fs.exists(checkpointDir)).isEqualTo(exist);
org.apache.flink.core.fs.Path baseDir = checkpointDir.getParent();
assertThat(fs.exists(baseDir)).isTrue();
org.apache.flink.core.fs.Path sharedFile =
new org.apache.flink.core.fs.Path(baseDir, CHECKPOINT_SHARED_STATE_DIR);
assertThat(fs.exists(sharedFile)).isTrue();
assertThat(fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0)
.isEqualTo(exist);
org.apache.flink.core.fs.Path taskOwnedFile =
new org.apache.flink.core.fs.Path(baseDir, CHECKPOINT_TASK_OWNED_STATE_DIR);
assertThat(fs.exists(taskOwnedFile)).isTrue();
// Since there is no exclusive state, we should consider fileMergingEnabled.
assertThat(fs.exists(taskOwnedFile) && fs.listStatus(taskOwnedFile).length > 0)
.isEqualTo(exist && fileMergingEnabled);
}
}

0 comments on commit 3d79387

Please sign in to comment.