Skip to content

Commit

Permalink
[FLINK-35843][test] Check only JM acknowledged files in file-merging …
Browse files Browse the repository at this point in the history
…IT cases (apache#25090)
  • Loading branch information
Zakelly authored Jul 16, 2024
1 parent 0050e42 commit 48f3522
Showing 1 changed file with 127 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
Expand All @@ -45,6 +49,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR;
import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR;
Expand All @@ -57,7 +62,7 @@
*/
public class SnapshotFileMergingCompatibilityITCase extends TestLogger {

private static final long DELETE_TIMEOUT_MILLS = 60000;
private static final long DELETE_TIMEOUT_MILLS = 120000;

public static Collection<Object[]> parameters() {
return Arrays.asList(
Expand Down Expand Up @@ -113,6 +118,7 @@ private void testSwitchingFileMerging(
stateBackend1.configure(config, Thread.currentThread().getContextClassLoader());
firstCluster.before();
String firstCheckpoint;
CheckpointMetadata firstMetadata;
try {
firstCheckpoint =
runJobAndGetExternalizedCheckpoint(
Expand All @@ -124,7 +130,8 @@ private void testSwitchingFileMerging(
consecutiveCheckpoint,
true);
assertThat(firstCheckpoint).isNotNull();
verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch);
firstMetadata = TestUtils.loadCheckpointMetadata(firstCheckpoint);
verifyStateHandleType(firstMetadata, firstFileMergingSwitch);
} finally {
firstCluster.after();
}
Expand All @@ -141,6 +148,7 @@ private void testSwitchingFileMerging(
.build());
secondCluster.before();
String secondCheckpoint;
CheckpointMetadata secondMetadata;
try {
secondCheckpoint =
runJobAndGetExternalizedCheckpoint(
Expand All @@ -152,12 +160,14 @@ private void testSwitchingFileMerging(
consecutiveCheckpoint,
true);
assertThat(secondCheckpoint).isNotNull();
verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch);
secondMetadata = TestUtils.loadCheckpointMetadata(secondCheckpoint);
verifyStateHandleType(secondMetadata, secondFileMergingSwitch);
verifyCheckpointExistOrWaitDeleted(
firstCheckpoint,
determineFileExist(
restoreMode, firstFileMergingSwitch, secondFileMergingSwitch),
firstFileMergingSwitch);
firstFileMergingSwitch,
firstMetadata);
} finally {
secondCluster.after();
}
Expand All @@ -173,6 +183,7 @@ private void testSwitchingFileMerging(
.build());
thirdCluster.before();
String thirdCheckpoint;
CheckpointMetadata thirdMetadata;
try {
thirdCheckpoint =
runJobAndGetExternalizedCheckpoint(
Expand All @@ -184,12 +195,14 @@ private void testSwitchingFileMerging(
consecutiveCheckpoint,
true);
assertThat(thirdCheckpoint).isNotNull();
verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch);
thirdMetadata = TestUtils.loadCheckpointMetadata(thirdCheckpoint);
verifyStateHandleType(thirdMetadata, secondFileMergingSwitch);
verifyCheckpointExistOrWaitDeleted(
secondCheckpoint,
determineFileExist(
restoreMode, secondFileMergingSwitch, secondFileMergingSwitch),
secondFileMergingSwitch);
secondFileMergingSwitch,
secondMetadata);
} finally {
thirdCluster.after();
}
Expand Down Expand Up @@ -221,17 +234,17 @@ private void testSwitchingFileMerging(
thirdCheckpoint,
determineFileExist(
restoreMode, secondFileMergingSwitch, secondFileMergingSwitch),
secondFileMergingSwitch);
secondFileMergingSwitch,
thirdMetadata);
verifyCheckpointExistOrWaitDeleted(
fourthCheckpoint, TernaryBoolean.FALSE, secondFileMergingSwitch);
fourthCheckpoint, TernaryBoolean.FALSE, secondFileMergingSwitch, null);
} finally {
fourthCluster.after();
}
}

private void verifyStateHandleType(String checkpointPath, boolean fileMergingEnabled)
private void verifyStateHandleType(CheckpointMetadata metadata, boolean fileMergingEnabled)
throws IOException {
CheckpointMetadata metadata = TestUtils.loadCheckpointMetadata(checkpointPath);
boolean hasKeyedState = false;
for (OperatorState operatorState : metadata.getOperatorStates()) {
for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
Expand Down Expand Up @@ -275,7 +288,10 @@ private static TernaryBoolean determineFileExist(
}

private static void verifyCheckpointExistOrWaitDeleted(
String checkpointPath, TernaryBoolean exist, boolean fileMergingEnabled)
String checkpointPath,
TernaryBoolean exist,
boolean fileMergingEnabled,
CheckpointMetadata metadata)
throws Exception {
org.apache.flink.core.fs.Path checkpointDir =
new org.apache.flink.core.fs.Path(checkpointPath);
Expand Down Expand Up @@ -306,10 +322,8 @@ private static void verifyCheckpointExistOrWaitDeleted(
try {
fileExist =
(fs.exists(checkpointDir)
|| (fs.listStatus(sharedFile) != null
&& fs.listStatus(sharedFile).length > 0)
|| (fs.listStatus(taskOwnedFile) != null
&& fs.listStatus(taskOwnedFile).length > 0));
|| (metadata != null && !verifyCheckpointDisposed(metadata))
|| !verifyCheckpointNoDirectory(fs, sharedFile, taskOwnedFile));
} catch (IOException e) {
// Sometimes it may happen that the files are being deleted while we list them,
// thus an IOException is raised.
Expand All @@ -319,9 +333,106 @@ private static void verifyCheckpointExistOrWaitDeleted(
Thread.sleep(500L);
waited += 500L;
// Or timeout
assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS);
if (waited >= DELETE_TIMEOUT_MILLS) {
assertThat(fs.exists(checkpointDir)).isFalse();
assertThat(fs.listStatus(sharedFile)).isNullOrEmpty();
assertThat(fs.listStatus(taskOwnedFile)).isNullOrEmpty();
}
}
}
}
}

/**
* Traverse the checkpoint metadata and verify all the state handle is disposed.
*
* @param metadata the metadata to traverse.
* @return true if all corresponding files are deleted.
*/
private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) {
AtomicBoolean disposed = new AtomicBoolean(true);
for (OperatorState operatorState : metadata.getOperatorStates()) {
for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
// Check keyed state handle
List<KeyedStateHandle> keyedStateHandles =
new ArrayList<>(subtaskState.getManagedKeyedState());
for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
assertThat(keyedStateHandle)
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
.streamSubHandles()
.forEach(
handle -> {
try {
if (handle instanceof FileStateHandle) {
org.apache.flink.core.fs.Path p =
((FileStateHandle) handle).getFilePath();
if (p.getFileSystem().exists(p)) {
disposed.set(false);
}
}
} catch (IOException e) {
disposed.set(false);
}
});
}
if (!disposed.get()) {
break;
}
List<OperatorStateHandle> operatorStateHandles =
new ArrayList<>(subtaskState.getManagedOperatorState());
for (OperatorStateHandle handle : operatorStateHandles) {
if (handle instanceof FileMergingOperatorStreamStateHandle) {
try {
org.apache.flink.core.fs.Path p =
((FileMergingOperatorStreamStateHandle) handle)
.getSharedDirHandle()
.getDirectory();
if (p.getFileSystem().exists(p)) {
disposed.set(false);
}
p =
((FileMergingOperatorStreamStateHandle) handle)
.getTaskOwnedDirHandle()
.getDirectory();
if (p.getFileSystem().exists(p)) {
disposed.set(false);
}
} catch (IOException e) {
disposed.set(false);
}
}
}
if (!disposed.get()) {
break;
}
}
}
return disposed.get();
}

/** Verifying that there is no subdirectory under shared and task-owned directory. */
private static boolean verifyCheckpointNoDirectory(
FileSystem fs,
org.apache.flink.core.fs.Path sharedFile,
org.apache.flink.core.fs.Path taskOwnedFile)
throws IOException {
FileStatus[] fileStatuses = fs.listStatus(sharedFile);
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDir()) {
return false;
}
}
}
fileStatuses = fs.listStatus(taskOwnedFile);
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDir()) {
return false;
}
}
}
return true;
}
}

0 comments on commit 48f3522

Please sign in to comment.