Skip to content

Commit

Permalink
[FLINK-25817] Move local state directory creation into TaskManagerSer…
Browse files Browse the repository at this point in the history
…vicesConfiguration.fromConfiguration

This commit also adds the resource id to the localState directory in order to avoid clashes with other
processes running on the same machine.
  • Loading branch information
tillrohrmann committed Feb 8, 2022
1 parent 45c661e commit d63c48d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.PermanentBlobService;
Expand Down Expand Up @@ -67,8 +66,6 @@
public class TaskManagerServices {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);

@VisibleForTesting public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState";

/** TaskManager services. */
private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;

Expand Down Expand Up @@ -326,20 +323,13 @@ public static TaskManagerServices fromConfiguration(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

final String[] stateRootDirectoryStrings =
taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] =
new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final File[] stateRootDirectoryStrings =
taskManagerServicesConfiguration.getLocalRecoveryStateDirectories();

final TaskExecutorLocalStateStoresManager taskStateManager =
new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskManagerServicesConfiguration.getLocalRecoveryStateDirectories(),
ioExecutor);

final TaskExecutorStateChangelogStoragesManager changelogStoragesManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
*/
public class TaskManagerServicesConfiguration {

private static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState_";

private final Configuration configuration;

private final ResourceID resourceID;
Expand All @@ -64,7 +66,7 @@ public class TaskManagerServicesConfiguration {

private final String[] tmpDirPaths;

private final String[] localRecoveryStateRootDirectories;
private final File[] localRecoveryStateDirectories;

private final int numberOfSlots;

Expand Down Expand Up @@ -96,7 +98,7 @@ private TaskManagerServicesConfiguration(
int externalDataPort,
boolean localCommunicationOnly,
String[] tmpDirPaths,
String[] localRecoveryStateRootDirectories,
File[] localRecoveryStateDirectories,
boolean localRecoveryEnabled,
@Nullable QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
Expand All @@ -116,7 +118,7 @@ private TaskManagerServicesConfiguration(
this.externalDataPort = externalDataPort;
this.localCommunicationOnly = localCommunicationOnly;
this.tmpDirPaths = checkNotNull(tmpDirPaths);
this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
this.localRecoveryStateDirectories = checkNotNull(localRecoveryStateDirectories);
this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
this.queryableStateConfig = queryableStateConfig;
this.numberOfSlots = checkNotNull(numberOfSlots);
Expand Down Expand Up @@ -170,8 +172,8 @@ public String[] getTmpDirPaths() {
return tmpDirPaths;
}

String[] getLocalRecoveryStateRootDirectories() {
return localRecoveryStateRootDirectories;
File[] getLocalRecoveryStateDirectories() {
return localRecoveryStateDirectories;
}

boolean isLocalRecoveryEnabled() {
Expand Down Expand Up @@ -253,12 +255,18 @@ public static TaskManagerServicesConfiguration fromConfiguration(
TaskExecutorResourceSpec taskExecutorResourceSpec,
WorkingDirectory workingDirectory)
throws Exception {
String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);

if (localStateRootDir.length == 0) {
final File localStateDir = workingDirectory.getLocalStateDirectory();

localStateRootDir = new String[] {localStateDir.getAbsolutePath()};
String[] localStateRootDirs = ConfigurationUtils.parseLocalStateDirectories(configuration);
final File[] localStateDirs;

if (localStateRootDirs.length == 0) {
localStateDirs = new File[] {workingDirectory.getLocalStateDirectory()};
} else {
localStateDirs = new File[localStateRootDirs.length];
final String localStateDirectoryName = LOCAL_STATE_SUB_DIRECTORY_ROOT + resourceID;

for (int i = 0; i < localStateRootDirs.length; i++) {
localStateDirs[i] = new File(localStateRootDirs[i], localStateDirectoryName);
}
}

boolean localRecoveryMode = configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
Expand Down Expand Up @@ -298,7 +306,7 @@ public static TaskManagerServicesConfiguration fromConfiguration(
externalDataPort,
localCommunicationOnly,
tmpDirs,
localStateRootDir,
localStateDirs,
localRecoveryMode,
queryableStateConfig,
ConfigurationParserUtils.getSlot(configuration),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@

import java.io.File;
import java.net.InetAddress;
import java.nio.file.Paths;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link TaskExecutorLocalStateStoresManager}. */
public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
Expand Down Expand Up @@ -89,15 +92,12 @@ public void testCreationFromConfig() throws Exception {
String[] split = rootDirString.split(",");
File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
for (int i = 0; i < split.length; ++i) {
Assert.assertEquals(
new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
rootDirectories[i]);
assertThat(rootDirectories[i].toPath()).startsWith(Paths.get(split[i]));
}

// verify local recovery mode
Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());

Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
for (File rootDirectory : rootDirectories) {
FileUtils.deleteFileOrDirectory(rootDirectory);
}
Expand Down Expand Up @@ -131,10 +131,7 @@ public void testCreationFromConfigDefault() throws Exception {

for (int i = 0; i < localStateRootDirectories.length; ++i) {
Assert.assertEquals(
new File(
workingDirectory.getLocalStateDirectory(),
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
localStateRootDirectories[i]);
workingDirectory.getLocalStateDirectory(), localStateRootDirectories[i]);
}

Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
Expand Down

0 comments on commit d63c48d

Please sign in to comment.