Skip to content
This repository has been archived by the owner on May 17, 2021. It is now read-only.

Commit

Permalink
[FLINK-1307] Allow file input from nested directory structure
Browse files Browse the repository at this point in the history
This closes apache#260
  • Loading branch information
vasia authored and fhueske committed Dec 15, 2014
1 parent 13968cd commit c56be66
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 26 deletions.
43 changes: 40 additions & 3 deletions docs/programming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ Data Sources
<div data-lang="java" markdown="1">

Data sources create the initial data sets, such as from files or from Java collections. The general
mechanism of of creating data sets is abstracted behind an
mechanism of creating data sets is abstracted behind an
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}.
Flink comes
with several built-in formats to create data sets from common file formats. Many of them have
Expand Down Expand Up @@ -1662,11 +1662,30 @@ DataSet<Tuple2<String, Integer> dbData =
// manually provide the type information as shown in the examples above.
{% endhighlight %}

#### Recursive Traversal of the Input Path Directory

For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the `recursive.file.enumeration` configuration parameter, like in the following example.

{% highlight java %}
// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
.withParameters(parameters);
{% endhighlight %}

</div>
<div data-lang="scala" markdown="1">

Data sources create the initial data sets, such as from files or from Java collections. The general
mechanism of of creating data sets is abstracted behind an
mechanism of creating data sets is abstracted behind an
{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java "InputFormat"%}.
Flink comes
with several built-in formats to create data sets from common file formats. Many of them have
Expand Down Expand Up @@ -1730,9 +1749,27 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar")
// generate a number sequence
val numbers = env.generateSequence(1, 10000000);
{% endhighlight %}

#### Recursive Traversal of the Input Path Directory

For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the `recursive.file.enumeration` configuration parameter, like in the following example.

{% highlight scala %}
// enable recursive enumeration of nested input files
val env = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
{% endhighlight %}

</div>
</div>

[Back to top](#top)

Data Sinks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
* The timeout (in milliseconds) to wait for a filesystem stream to respond.
*/
private static long DEFAULT_OPENING_TIMEOUT;

/**
* Files with that suffix are unsplittable at a file level
* and compressed.
Expand Down Expand Up @@ -151,6 +151,12 @@ static final long getDefaultOpeningTimeout() {
* Therefore, the FileInputFormat can only read whole files.
*/
protected boolean unsplittable = false;

/**
* The flag to specify whether recursive traversal of the input directory
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;

// --------------------------------------------------------------------------------------------
// Constructors
Expand Down Expand Up @@ -231,7 +237,7 @@ public void setOpenTimeout(long openTimeout) {
}
this.openTimeout = openTimeout;
}

// --------------------------------------------------------------------------------------------
// Getting information about the split that is currently open
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -278,6 +284,9 @@ public void configure(Configuration parameters) {
else if (this.filePath == null) {
throw new IllegalArgumentException("File path was not specified in input format, or configuration.");
}

Boolean nestedFilesFlag = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
this.enumerateNestedFiles = nestedFilesFlag;
}

/**
Expand Down Expand Up @@ -319,41 +328,47 @@ protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path f
// get the file info and check whether the cached statistics are still valid.
final FileStatus file = fs.getFileStatus(filePath);
long latestModTime = file.getModificationTime();
long totalLength = 0;

if(!acceptFile(file)) {
throw new IOException("The given file does not pass the file-filter");
}
// enumerate all files and check their modification time stamp.
if (file.isDir()) {
FileStatus[] fss = fs.listStatus(filePath);
files.ensureCapacity(fss.length);

for (FileStatus s : fss) {
if (!s.isDir()) {
files.add(s);
latestModTime = Math.max(s.getModificationTime(), latestModTime);
testForUnsplittable(s);
if (acceptFile(s)) {
files.add(s);
totalLength += s.getLen();
latestModTime = Math.max(s.getModificationTime(), latestModTime);
testForUnsplittable(s);
}
}
else {
if (enumerateNestedFiles) {
totalLength += addNestedFiles(s.getPath(), files, 0);
}
}
}
} else {
files.add(file);
testForUnsplittable(file);
totalLength += file.getLen();
}

// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}

// calculate the whole length
long len = 0;
for (FileStatus s : files) {
len += s.getLen();
}

// sanity check
if (len <= 0) {
len = BaseStatistics.SIZE_UNKNOWN;
if (totalLength <= 0) {
totalLength = BaseStatistics.SIZE_UNKNOWN;
}

return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}

@Override
Expand All @@ -363,7 +378,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits

/**
* Computes the input splits for the file. By default, one file block is one split. If more splits
* are requested than blocks are available, then a split may by a fraction of a block and splits may cross
* are requested than blocks are available, then a split may be a fraction of a block and splits may cross
* block boundaries.
*
* @param minNumSplits The minimum desired number of file splits.
Expand Down Expand Up @@ -397,16 +412,23 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (!dir[i].isDir() && acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory, we can not split it
testForUnsplittable(dir[i]);
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
totalLength += addNestedFiles(dir[i].getPath(), files, 0);
}
}
else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory, we can not split it
testForUnsplittable(dir[i]);
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
Expand Down Expand Up @@ -506,6 +528,30 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure
* and enumerate all accepted nested files.
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length)
throws IOException {
final FileSystem fs = path.getFileSystem();

for(FileStatus dir: fs.listStatus(path)) {
if (dir.isDir()) {
addNestedFiles(dir.getPath(), files, length);
}
else {
if(acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
}
}
}
return length;
}

private boolean testForUnsplittable(FileStatus pathFile) {
if(pathFile.getPath().getName().endsWith(DEFLATE_SUFFIX)) {
unsplittable = true;
Expand Down Expand Up @@ -807,6 +853,11 @@ private void abortWait() {
* The config parameter which defines the input file path.
*/
private static final String FILE_PARAMETER_KEY = "input.file.path";

/**
* The config parameter which defines whether input directories are recursively traversed.
*/
private static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration";


// ----------------------------------- Config Builder -----------------------------------------
Expand Down
Loading

0 comments on commit c56be66

Please sign in to comment.