Skip to content

Commit

Permalink
[FLINK-32086][checkpointing] Fix deletion fail of file-merging manage…
Browse files Browse the repository at this point in the history
…d dir
  • Loading branch information
zoltar9264 authored and Zakelly committed Jun 18, 2024
1 parent 6723ad7 commit 9f90e58
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -210,8 +209,7 @@ public void initFileSystem(
createManagedDirectory(managedExclusivePath);
this.managedExclusiveStateDir = managedExclusivePath;
this.managedExclusiveStateDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(managedExclusivePath.getPath()).toPath());
DirectoryStreamStateHandle.of(managedExclusivePath);
this.writeBufferSize = writeBufferSize;
}
}
Expand All @@ -224,9 +222,7 @@ public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
createManagedDirectory(managedPath);
managedSharedStateDir.put(subtaskKey, managedPath);
managedSharedStateDirHandles.put(
subtaskKey,
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(managedPath.getPath()).toPath()));
subtaskKey, DirectoryStreamStateHandle.of(managedPath));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -675,17 +674,13 @@ OperatorStateHandle deserializeOperatorStateHandle(
Preconditions.checkArgument(stateHandle instanceof SegmentFileStateHandle);
return isEmpty
? new EmptyFileMergingOperatorStreamStateHandle(
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(taskOwnedDirPathStr).toPath()),
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(sharedDirPathStr).toPath()),
DirectoryStreamStateHandle.of(new Path(taskOwnedDirPathStr)),
DirectoryStreamStateHandle.of(new Path(sharedDirPathStr)),
offsetsMap,
stateHandle)
: new FileMergingOperatorStreamStateHandle(
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(taskOwnedDirPathStr).toPath()),
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(sharedDirPathStr).toPath()),
DirectoryStreamStateHandle.of(new Path(taskOwnedDirPathStr)),
DirectoryStreamStateHandle.of(new Path(sharedDirPathStr)),
offsetsMap,
stateHandle);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@
package org.apache.flink.runtime.state.filemerging;

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;

import javax.annotation.Nonnull;

import java.nio.file.Path;
import java.util.Optional;

/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
public class DirectoryStreamStateHandle extends DirectoryStateHandle implements StreamStateHandle {
/**
* This state handle represents a directory, usually used to be registered to {@link
* org.apache.flink.runtime.state.SharedStateRegistry} to track the life cycle of the directory.
*/
public class DirectoryStreamStateHandle implements StreamStateHandle {

private static final long serialVersionUID = 1L;

public DirectoryStreamStateHandle(@Nonnull Path directory, long directorySize) {
super(directory, directorySize);
private final Path directory;

public DirectoryStreamStateHandle(@Nonnull Path directory) {
this.directory = directory;
}

@Override
Expand All @@ -50,11 +55,16 @@ public Optional<byte[]> asBytesIfInMemory() {

@Override
public PhysicalStateHandleID getStreamStateHandleID() {
return new PhysicalStateHandleID(getDirectory().toString());
return new PhysicalStateHandleID(directory.toString());
}

public SharedStateRegistryKey createStateRegistryKey() {
return new SharedStateRegistryKey(getDirectory().toUri().toString());
return new SharedStateRegistryKey(directory.toString());
}

@Override
public int hashCode() {
return directory.hashCode();
}

@Override
Expand All @@ -68,23 +78,34 @@ public boolean equals(Object o) {

DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;

return getDirectory().equals(that.getDirectory());
return directory.equals(that.directory);
}

@Override
public String toString() {
return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + '}';
}

public Path getDirectory() {
return directory;
}

@Override
public void discardState() throws Exception {
FileSystem fs = directory.getFileSystem();
fs.delete(directory, true);
}

/**
* Return a {@link DirectoryStreamStateHandle} with zero size, which usually used to be
* registered to {@link org.apache.flink.runtime.state.SharedStateRegistry} to track the life
* cycle of the directory, therefore a fake size is provided.
*
* @param directory the directory.
* @return DirectoryStreamStateHandle with zero size.
* This handle usually used to track the life cycle of the directory, therefore a fake size is
* provided.
*/
public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path directory) {
return new DirectoryStreamStateHandle(directory, 0);
@Override
public long getStateSize() {
return 0;
}

public static DirectoryStreamStateHandle of(@Nonnull Path directory) {
return new DirectoryStreamStateHandle(directory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import javax.annotation.Nullable;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -212,11 +211,9 @@ private static OperatorStreamStateHandle createDummyOperatorStreamStateHandle(Ra
boolean enableFileMerging = rnd.nextBoolean();
if (enableFileMerging) {
DirectoryStreamStateHandle taskOwnedDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(String.valueOf(createRandomUUID(rnd))).toPath());
DirectoryStreamStateHandle.of(new Path(createRandomUUID(rnd).toString()));
DirectoryStreamStateHandle sharedDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(String.valueOf(createRandomUUID(rnd))).toPath());
DirectoryStreamStateHandle.of(new Path(createRandomUUID(rnd).toString()));
return rnd.nextBoolean()
? new FileMergingOperatorStreamStateHandle(
taskOwnedDirHandle,
Expand Down

0 comments on commit 9f90e58

Please sign in to comment.