Skip to content

Commit

Permalink
[FLINK-3730] Fix RocksDB Local Directory Initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 13, 2016
1 parent db85f38 commit db6528b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
Expand Down Expand Up @@ -181,13 +182,15 @@ public void initializeForJob(

for (Path path : configuredDbBasePaths) {
File f = new File(path.toUri().getPath());
if (!f.exists() && !f.mkdirs()) {
String msg = "Local DB files directory '" + f.getAbsolutePath()
File testDir = new File(f, UUID.randomUUID().toString());
if (!testDir.mkdirs()) {
String msg = "Local DB files directory '" + path
+ "' does not exist and cannot be created. ";
LOG.error(msg);
errorMessage += msg;
} else {
dirs.add(f);
}
dirs.add(f);
}

if (dirs.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.AbstractStateBackend;
Expand Down Expand Up @@ -133,14 +135,17 @@ public void testFailWhenNoLocalStorageDir() throws Exception {

RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());


boolean hasFailure = false;
try {
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
}
catch (Exception e) {
assertTrue(e.getMessage().contains("No local storage directories available"));
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
hasFailure = true;
}
assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand Down Expand Up @@ -168,6 +173,9 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {

try {
rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);

// actually get a state to see whether we can write to the storage directory
rocksDbBackend.getPartitionedState(null, VoidSerializer.INSTANCE, new ValueStateDescriptor<>("test", String.class, ""));
}
catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit db6528b

Please sign in to comment.