Skip to content

Commit

Permalink
[FLINK-25445] No need to create local recovery dirs when disabled loc…
Browse files Browse the repository at this point in the history
…al-recovery in TaskExecutorLocalStateStoresManager
  • Loading branch information
zuston authored and XComp committed Feb 1, 2022
1 parent 9050fd5 commit 9f61e12
Show file tree
Hide file tree
Showing 16 changed files with 123 additions and 140 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -81,8 +80,7 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera
@Nonnull
@Override
public LocalRecoveryConfig createLocalRecoveryConfig() {
LocalRecoveryDirectoryProvider provider = new SavepointLocalRecoveryProvider();
return new LocalRecoveryConfig(false, provider);
return new LocalRecoveryConfig(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.runtime.state;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Supplier;

/**
* This class encapsulates the completed configuration for local recovery, i.e. the root directories
Expand All @@ -27,35 +30,29 @@
*/
public class LocalRecoveryConfig {

/** The local recovery mode. */
private final boolean localRecoveryEnabled;

/** Encapsulates the root directories and the subtask-specific path. */
@Nonnull private final LocalRecoveryDirectoryProvider localStateDirectories;
@Nullable private final LocalRecoveryDirectoryProvider localStateDirectories;

public LocalRecoveryConfig(
boolean localRecoveryEnabled,
@Nonnull LocalRecoveryDirectoryProvider directoryProvider) {
this.localRecoveryEnabled = localRecoveryEnabled;
public LocalRecoveryConfig(@Nullable LocalRecoveryDirectoryProvider directoryProvider) {
this.localStateDirectories = directoryProvider;
}

public boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
return localStateDirectories != null;
}

@Nonnull
public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() {
return localStateDirectories;
public Optional<LocalRecoveryDirectoryProvider> getLocalStateDirectoryProvider() {
return Optional.ofNullable(localStateDirectories);
}

@Override
public String toString() {
return "LocalRecoveryConfig{"
+ "localRecoveryMode="
+ localRecoveryEnabled
+ ", localStateDirectories="
+ localStateDirectories
+ '}';
return "LocalRecoveryConfig{" + "localStateDirectories=" + localStateDirectories + '}';
}

public static Supplier<IllegalStateException> localRecoveryNotEnabled() {
return () ->
new IllegalStateException(
"Getting a LocalRecoveryDirectoryProvider is only supported with the local recovery enabled. This is a bug and should be reported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,17 @@ public TaskLocalStateStore localStateStoreForSubtask(

if (taskLocalStateStore == null) {

// create the allocation base dirs, one inside each root dir.
File[] allocationBaseDirectories = allocationBaseDirectories(allocationID);

LocalRecoveryDirectoryProviderImpl directoryProvider =
new LocalRecoveryDirectoryProviderImpl(
allocationBaseDirectories, jobId, jobVertexID, subtaskIndex);
LocalRecoveryDirectoryProviderImpl directoryProvider = null;
if (localRecoveryEnabled) {
// create the allocation base dirs, one inside each root dir.
File[] allocationBaseDirectories = allocationBaseDirectories(allocationID);
directoryProvider =
new LocalRecoveryDirectoryProviderImpl(
allocationBaseDirectories, jobId, jobVertexID, subtaskIndex);
}

LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
new LocalRecoveryConfig(directoryProvider);

taskLocalStateStore =
localRecoveryConfig.isLocalRecoveryEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -274,7 +275,10 @@ public CompletableFuture<Void> dispose() {

// delete the local state subdirectory that belong to this subtask.
LocalRecoveryDirectoryProvider directoryProvider =
localRecoveryConfig.getLocalStateDirectoryProvider();
localRecoveryConfig
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());

for (int i = 0; i < directoryProvider.allocationBaseDirsCount(); ++i) {
File subtaskBaseDirectory = directoryProvider.selectSubtaskBaseDirectory(i);
try {
Expand Down Expand Up @@ -341,28 +345,34 @@ private void discardLocalStateForCheckpoint(long checkpointID, TaskStateSnapshot
discardEx);
}

LocalRecoveryDirectoryProvider directoryProvider =
Optional<LocalRecoveryDirectoryProvider> directoryProviderOptional =
localRecoveryConfig.getLocalStateDirectoryProvider();
File checkpointDir = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID);

LOG.debug(
"Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).",
checkpointDir,
checkpointID,
jobID,
jobVertexID,
subtaskIndex);
if (directoryProviderOptional.isPresent()) {
File checkpointDir =
directoryProviderOptional
.get()
.subtaskSpecificCheckpointDirectory(checkpointID);

try {
deleteDirectory(checkpointDir);
} catch (IOException ex) {
LOG.warn(
"Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).",
LOG.debug(
"Deleting local state directory {} of checkpoint {} for subtask ({} - {} - {}).",
checkpointDir,
checkpointID,
jobID,
jobVertexID,
subtaskIndex,
ex);
subtaskIndex);

try {
deleteDirectory(checkpointDir);
} catch (IOException ex) {
LOG.warn(
"Exception while deleting local state directory of checkpoint {} in subtask ({} - {} - {}).",
checkpointID,
jobID,
jobVertexID,
subtaskIndex,
ex);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
CheckpointedStateScope.EXCLUSIVE,
streamFactory,
localRecoveryConfig
.getLocalStateDirectoryProvider())
.getLocalStateDirectoryProvider()
.orElseThrow(
LocalRecoveryConfig
.localRecoveryNotEnabled()))
: () ->
createSimpleStream(
CheckpointedStateScope.EXCLUSIVE, streamFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.File;
import java.net.InetAddress;

/** Test for {@link TaskExecutorLocalStateStoresManager}. */
public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down Expand Up @@ -140,6 +141,38 @@ public void testCreationFromConfigDefault() throws Exception {
}
}

@Test
public void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
JobID jobID = new JobID();
JobVertexID jobVertexID = new JobVertexID();
AllocationID allocationID = new AllocationID();
int subtaskIdx = 23;

File[] rootDirs = {
temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()
};

boolean localRecoveryEnabled = false;
TaskExecutorLocalStateStoresManager storesManager =
new TaskExecutorLocalStateStoresManager(
localRecoveryEnabled, rootDirs, Executors.directExecutor());

TaskLocalStateStore taskLocalStateStore =
storesManager.localStateStoreForSubtask(
jobID, allocationID, jobVertexID, subtaskIdx);

Assert.assertFalse(taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
Assert.assertNull(
taskLocalStateStore
.getLocalRecoveryConfig()
.getLocalStateDirectoryProvider()
.orElse(null));

for (File recoveryDir : rootDirs) {
Assert.assertEquals(0, recoveryDir.listFiles().length);
}
}

/**
* This tests that the {@link TaskExecutorLocalStateStoresManager} creates {@link
* TaskLocalStateStoreImpl} that have a properly initialized local state base directory. It also
Expand All @@ -164,7 +197,10 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
jobID, allocationID, jobVertexID, subtaskIdx);

LocalRecoveryDirectoryProvider directoryProvider =
taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
taskLocalStateStore
.getLocalRecoveryConfig()
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());

for (int i = 0; i < 10; ++i) {
Assert.assertEquals(
Expand Down Expand Up @@ -217,7 +253,10 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
jobID, otherAllocationID, jobVertexID, subtaskIdx);

directoryProvider =
taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
taskLocalStateStore
.getLocalRecoveryConfig()
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());

File chkDir = directoryProvider.subtaskSpecificCheckpointDirectory(23L);
Assert.assertTrue(chkDir.mkdirs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void before() throws Exception {
new LocalRecoveryDirectoryProviderImpl(
allocationBaseDirs, jobID, jobVertexID, subtaskIdx);

LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, directoryProvider);
LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider);

this.taskLocalStateStore =
new TaskLocalStateStoreImpl(
Expand All @@ -94,13 +94,14 @@ public void getLocalRecoveryRootDirectoryProvider() {
LocalRecoveryConfig directoryProvider = taskLocalStateStore.getLocalRecoveryConfig();
Assert.assertEquals(
allocationBaseDirs.length,
directoryProvider.getLocalStateDirectoryProvider().allocationBaseDirsCount());
directoryProvider.getLocalStateDirectoryProvider().get().allocationBaseDirsCount());

for (int i = 0; i < allocationBaseDirs.length; ++i) {
Assert.assertEquals(
allocationBaseDirs[i],
directoryProvider
.getLocalStateDirectoryProvider()
.get()
.selectAllocationBaseDirectory(i));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I
LocalRecoveryDirectoryProviderImpl directoryProvider =
new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);

LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(true, directoryProvider);
LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(directoryProvider);

TaskLocalStateStore taskLocalStateStore =
new TaskLocalStateStoreImpl(
Expand Down Expand Up @@ -251,11 +250,13 @@ public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I
allocBaseDirs[i % allocBaseDirs.length],
localRecoveryConfFromTaskLocalStateStore
.getLocalStateDirectoryProvider()
.get()
.allocationBaseDirectory(i));
Assert.assertEquals(
allocBaseDirs[i % allocBaseDirs.length],
localRecoveryConfFromTaskStateManager
.getLocalStateDirectoryProvider()
.get()
.allocationBaseDirectory(i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
/** Helper methods to easily create a {@link LocalRecoveryConfig} for tests. */
public class TestLocalRecoveryConfig {

private static final LocalRecoveryDirectoryProvider INSTANCE =
new TestDummyLocalDirectoryProvider();

public static LocalRecoveryConfig disabled() {
return new LocalRecoveryConfig(false, INSTANCE);
return new LocalRecoveryConfig(null);
}

public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public void close() {
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider())
localRecoveryConfig
.getLocalStateDirectoryProvider()
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
: () ->
CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
Expand Down
Loading

0 comments on commit 9f61e12

Please sign in to comment.