Skip to content

Commit

Permalink
[FLINK-4777] catch IOException in ContinuousFileMonitoringFunction
Browse files Browse the repository at this point in the history
FileSystem.listStatus(path) may throw an IOException when it lists files
and then retrieves their file status. This is quite common, e.g. editors
which create temporary files and move them. The
ContinuousFileMonitoringFunction can only apply a file path filter
afterwards.

The solution is to defer file checks until no exception is caught anymore.

This closes apache#2610.
  • Loading branch information
mxm committed Oct 10, 2016
1 parent 706dc13 commit b949d42
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,21 @@ private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> eligible
* method to decide which parts of the file to be processed, and forward them downstream.
*/
private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
List<FileStatus> files = new ArrayList<>();

FileStatus[] statuses = fileSystem.listStatus(new Path(path));
final FileStatus[] statuses;
try {
statuses = fileSystem.listStatus(new Path(path));
} catch (IOException e) {
// we may run into an IOException if files are moved while listing their status
// delay the check for eligible files in this case
return Collections.emptyList();
}

if (statuses == null) {
LOG.warn("Path does not exist: {}", path);
return Collections.emptyList();
} else {
List<FileStatus> files = new ArrayList<>();
// handle the new files
for (FileStatus status : statuses) {
Path filePath = status.getPath();
Expand All @@ -258,8 +267,8 @@ private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOExcep
files.add(status);
}
}
return files;
}
return files;
}

/**
Expand Down

0 comments on commit b949d42

Please sign in to comment.