Skip to content

Commit

Permalink
[FLINK-25817] Let TaskExecutorLocalStateStoresManager only deleted ow…
Browse files Browse the repository at this point in the history
…ned local state directories

This commit makes sure that the TaskExecutorLocalStateStoresManager only deletes local state directories
on shutdown if they are owned by it. If the local state is stored under the working directory, then it is
borrowed and, thus, won't be deleted when the TaskExecutorLocalStateStoresManager shuts down.
  • Loading branch information
tillrohrmann committed Feb 8, 2022
1 parent d63c48d commit 577ef90
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class TaskExecutorLocalStateStoresManager {
private final boolean localRecoveryEnabled;

/** This is the root directory for all local state of this task manager / executor. */
private final File[] localStateRootDirectories;
private final Reference<File[]> localStateRootDirectories;

/** Executor that runs the discarding of released state objects. */
private final Executor discardExecutor;
Expand All @@ -74,7 +75,7 @@ public class TaskExecutorLocalStateStoresManager {

public TaskExecutorLocalStateStoresManager(
boolean localRecoveryEnabled,
@Nonnull File[] localStateRootDirectories,
@Nonnull Reference<File[]> localStateRootDirectories,
@Nonnull Executor discardExecutor)
throws IOException {

Expand All @@ -90,7 +91,7 @@ public TaskExecutorLocalStateStoresManager(
this.lock = new Object();
this.closed = false;

for (File localStateRecoveryRootDir : localStateRootDirectories) {
for (File localStateRecoveryRootDir : localStateRootDirectories.deref()) {

if (!localStateRecoveryRootDir.exists()
&& !localStateRecoveryRootDir.mkdirs()
Expand Down Expand Up @@ -219,28 +220,30 @@ public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID)
}

public void shutdown() {

HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;

synchronized (lock) {
if (closed) {
return;
}

closed = true;
toRelease = new HashMap<>(taskStateStoresByAllocationID);
taskStateStoresByAllocationID.clear();
}

ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");

for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry :
toRelease.entrySet()) {
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

doRelease(entry.getValue().values());
cleanupAllocationBaseDirs(entry.getKey());
if (localStateRootDirectories.isOwned()) {
for (File localStateRootDirectory : localStateRootDirectories.deref()) {
try {
FileUtils.deleteDirectory(localStateRootDirectory);
} catch (IOException ioe) {
LOG.warn(
"Could not delete local state directory {}.",
localStateRootDirectory,
ioe);
}
}
}
}

Expand All @@ -251,7 +254,7 @@ boolean isLocalRecoveryEnabled() {

@VisibleForTesting
File[] getLocalStateRootDirectories() {
return localStateRootDirectories;
return localStateRootDirectories.deref();
}

@VisibleForTesting
Expand All @@ -261,10 +264,11 @@ String allocationSubDirString(AllocationID allocationID) {

private File[] allocationBaseDirectories(AllocationID allocationID) {
final String allocationSubDirString = allocationSubDirString(allocationID);
final File[] allocationDirectories = new File[localStateRootDirectories.length];
for (int i = 0; i < localStateRootDirectories.length; ++i) {
final File[] derefLocalStateRootDirectories = localStateRootDirectories.deref();
final File[] allocationDirectories = new File[derefLocalStateRootDirectories.length];
for (int i = 0; i < derefLocalStateRootDirectories.length; ++i) {
allocationDirectories[i] =
new File(localStateRootDirectories[i], allocationSubDirString);
new File(derefLocalStateRootDirectories[i], allocationSubDirString);
}
return allocationDirectories;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ public static TaskManagerServices fromConfiguration(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

final File[] stateRootDirectoryStrings =
taskManagerServicesConfiguration.getLocalRecoveryStateDirectories();

final TaskExecutorLocalStateStoresManager taskStateManager =
new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -66,7 +67,7 @@ public class TaskManagerServicesConfiguration {

private final String[] tmpDirPaths;

private final File[] localRecoveryStateDirectories;
private final Reference<File[]> localRecoveryStateDirectories;

private final int numberOfSlots;

Expand Down Expand Up @@ -98,7 +99,7 @@ private TaskManagerServicesConfiguration(
int externalDataPort,
boolean localCommunicationOnly,
String[] tmpDirPaths,
File[] localRecoveryStateDirectories,
Reference<File[]> localRecoveryStateDirectories,
boolean localRecoveryEnabled,
@Nullable QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
Expand Down Expand Up @@ -172,7 +173,7 @@ public String[] getTmpDirPaths() {
return tmpDirPaths;
}

File[] getLocalRecoveryStateDirectories() {
Reference<File[]> getLocalRecoveryStateDirectories() {
return localRecoveryStateDirectories;
}

Expand Down Expand Up @@ -256,17 +257,20 @@ public static TaskManagerServicesConfiguration fromConfiguration(
WorkingDirectory workingDirectory)
throws Exception {
String[] localStateRootDirs = ConfigurationUtils.parseLocalStateDirectories(configuration);
final File[] localStateDirs;
final Reference<File[]> localStateDirs;

if (localStateRootDirs.length == 0) {
localStateDirs = new File[] {workingDirectory.getLocalStateDirectory()};
localStateDirs =
Reference.borrowed(new File[] {workingDirectory.getLocalStateDirectory()});
} else {
localStateDirs = new File[localStateRootDirs.length];
File[] createdLocalStateDirs = new File[localStateRootDirs.length];
final String localStateDirectoryName = LOCAL_STATE_SUB_DIRECTORY_ROOT + resourceID;

for (int i = 0; i < localStateRootDirs.length; i++) {
localStateDirs[i] = new File(localStateRootDirs[i], localStateDirectoryName);
createdLocalStateDirs[i] = new File(localStateRootDirs[i], localStateDirectoryName);
}

localStateDirs = Reference.owned(createdLocalStateDirs);
}

boolean localRecoveryMode = configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;

Expand All @@ -41,6 +42,7 @@
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Paths;

Expand Down Expand Up @@ -154,7 +156,9 @@ public void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exceptio
boolean localRecoveryEnabled = false;
TaskExecutorLocalStateStoresManager storesManager =
new TaskExecutorLocalStateStoresManager(
localRecoveryEnabled, rootDirs, Executors.directExecutor());
localRecoveryEnabled,
Reference.owned(rootDirs),
Executors.directExecutor());

TaskLocalStateStore taskLocalStateStore =
storesManager.localStateStoreForSubtask(
Expand Down Expand Up @@ -189,7 +193,8 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()
};
TaskExecutorLocalStateStoresManager storesManager =
new TaskExecutorLocalStateStoresManager(true, rootDirs, Executors.directExecutor());
new TaskExecutorLocalStateStoresManager(
true, Reference.owned(rootDirs), Executors.directExecutor());

TaskLocalStateStore taskLocalStateStore =
storesManager.localStateStoreForSubtask(
Expand Down Expand Up @@ -267,6 +272,52 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
checkRootDirsClean(rootDirs);
}

@Test
public void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {
final File localStateStoreA = temporaryFolder.newFolder();
final File localStateStoreB = temporaryFolder.newFolder();

final File[] localStateDirectories = {localStateStoreA, localStateStoreB};

final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager =
new TaskExecutorLocalStateStoresManager(
true, Reference.owned(localStateDirectories), Executors.directExecutor());

for (File localStateDirectory : localStateDirectories) {
assertThat(localStateDirectory).exists();
}

taskExecutorLocalStateStoresManager.shutdown();

for (File localStateDirectory : localStateDirectories) {
assertThat(localStateDirectory).doesNotExist();
}
}

@Test
public void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOException {
final File localStateStoreA = temporaryFolder.newFolder();
final File localStateStoreB = temporaryFolder.newFolder();

final File[] localStateDirectories = {localStateStoreA, localStateStoreB};

final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager =
new TaskExecutorLocalStateStoresManager(
true,
Reference.borrowed(localStateDirectories),
Executors.directExecutor());

for (File localStateDirectory : localStateDirectories) {
assertThat(localStateDirectory).exists();
}

taskExecutorLocalStateStoresManager.shutdown();

for (File localStateDirectory : localStateDirectories) {
assertThat(localStateDirectory).exists();
}
}

private void checkRootDirsClean(File[] rootDirs) {
for (File rootDir : rootDirs) {
File[] files = rootDir.listFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.Reference;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
Expand Down Expand Up @@ -434,7 +435,9 @@ private void internalTestPartitionRelease(

final TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false, new File[] {tmp.newFolder()}, Executors.directExecutor());
false,
Reference.owned(new File[] {tmp.newFolder()}),
Executors.directExecutor());

final TaskManagerServices taskManagerServices =
new TaskManagerServicesBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.Executors;
Expand Down Expand Up @@ -266,7 +267,9 @@ public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {

final TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false, ioManager.getSpillingDirectories(), Executors.directExecutor());
false,
Reference.borrowed(ioManager.getSpillingDirectories()),
Executors.directExecutor());

nettyShuffleEnvironment.start();

Expand Down Expand Up @@ -2707,7 +2710,7 @@ public void testReleaseInactiveSlots() throws Exception {
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
throws IOException {
return new TaskExecutorLocalStateStoresManager(
false, new File[] {tmp.newFolder()}, Executors.directExecutor());
false, Reference.owned(new File[] {tmp.newFolder()}), Executors.directExecutor());
}

private TaskExecutor createTaskExecutor(int numberOFSlots) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;

import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -150,7 +151,7 @@ private TaskSubmissionTestEnvironment(
TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false,
new File[] {temporaryFolder.newFolder()},
Reference.owned(new File[] {temporaryFolder.newFolder()}),
Executors.directExecutor());

final TaskManagerServices taskManagerServices =
Expand Down

0 comments on commit 577ef90

Please sign in to comment.