From ef96054d18ca1fb35f933fd0c715bb1ecc39b8c9 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Wed, 25 Jan 2017 09:24:21 +0000 Subject: [PATCH] [FLINK-5612] [code] Make GlobPathFilter serializable --- .../flink/api/common/io/FileInputFormat.java | 2 +- .../api/common/io/GlobFilePathFilter.java | 35 ++++++++++--- .../api/common/io/GlobFilePathFilterTest.java | 50 +++++++++++++++---- 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 785fb3b899826..4e81dab008e13 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -481,7 +481,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { totalLength += pathFile.getLen(); } // returns if unsplittable - if(unsplittable) { + if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen()); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java index 0ee6f03175927..748ed284248be 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; import java.nio.file.FileSystem; import java.nio.file.FileSystems; @@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter { private static final long serialVersionUID = 1L; - private final ArrayList includeMatchers; - private final ArrayList excludeMatchers; + private final List includePatterns; + private final List excludePatterns; + + // Path matchers are not serializable so we are delaying their + // creation until they are used + private transient ArrayList includeMatchers; + private transient ArrayList excludeMatchers; /** * Constructor for GlobFilePathFilter that will match all files @@ -69,8 +75,8 @@ public GlobFilePathFilter() { * @param excludePatterns glob patterns for files to exclude */ public GlobFilePathFilter(List includePatterns, List excludePatterns) { - includeMatchers = buildPatterns(includePatterns); - excludeMatchers = buildPatterns(excludePatterns); + this.includePatterns = Preconditions.checkNotNull(includePatterns); + this.excludePatterns = Preconditions.checkNotNull(excludePatterns); } private ArrayList buildPatterns(List patterns) { @@ -86,7 +92,7 @@ private ArrayList buildPatterns(List patterns) { @Override public boolean filterPath(Path filePath) { - if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) { + if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) { return false; } @@ -97,7 +103,7 @@ public boolean filterPath(Path filePath) { final java.nio.file.Path nioPath = Paths.get(path); - for (PathMatcher matcher : includeMatchers) { + for (PathMatcher matcher : getIncludeMatchers()) { if (matcher.matches(nioPath)) { return shouldExclude(nioPath); } @@ -106,12 +112,27 @@ public boolean filterPath(Path filePath) { return true; } + private ArrayList getIncludeMatchers() { + if (includeMatchers == null) { + includeMatchers = buildPatterns(includePatterns); + } + return includeMatchers; + } + + private ArrayList getExcludeMatchers() { + if (excludeMatchers == null) { + excludeMatchers = buildPatterns(excludePatterns); + } + return excludeMatchers; + } + private boolean shouldExclude(java.nio.file.Path nioPath) { - for (PathMatcher matcher : excludeMatchers) { + for (PathMatcher matcher : getExcludeMatchers()) { if (matcher.matches(nioPath)) { return true; } } return false; } + } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java index bced076d353a0..c9f8da4921ec1 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java @@ -18,8 +18,10 @@ package org.apache.flink.api.common.io; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; +import java.io.IOException; import java.util.Collections; import static org.junit.Assert.assertFalse; @@ -27,13 +29,13 @@ public class GlobFilePathFilterTest { @Test - public void defaultConstructorCreateMatchAllFilter() { + public void testDefaultConstructorCreateMatchAllFilter() { GlobFilePathFilter matcher = new GlobFilePathFilter(); assertFalse(matcher.filterPath(new Path("dir/file.txt"))); } @Test - public void matchAllFilesByDefault() { + public void testMatchAllFilesByDefault() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.emptyList(), Collections.emptyList()); @@ -42,7 +44,7 @@ public void matchAllFilesByDefault() { } @Test - public void excludeFilesNotInIncludePatterns() { + public void testExcludeFilesNotInIncludePatterns() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("dir/*"), Collections.emptyList()); @@ -52,7 +54,7 @@ public void excludeFilesNotInIncludePatterns() { } @Test - public void excludeFilesIfMatchesExclude() { + public void testExcludeFilesIfMatchesExclude() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("dir/*"), Collections.singletonList("dir/file.txt")); @@ -61,7 +63,7 @@ public void excludeFilesIfMatchesExclude() { } @Test - public void includeFileWithAnyCharacterMatcher() { + public void testIncludeFileWithAnyCharacterMatcher() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("dir/?.txt"), Collections.emptyList()); @@ -71,7 +73,7 @@ public void includeFileWithAnyCharacterMatcher() { } @Test - public void includeFileWithCharacterSetMatcher() { + public void testIncludeFileWithCharacterSetMatcher() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("dir/[acd].txt"), Collections.emptyList()); @@ -83,7 +85,7 @@ public void includeFileWithCharacterSetMatcher() { } @Test - public void includeFileWithCharacterRangeMatcher() { + public void testIncludeFileWithCharacterRangeMatcher() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("dir/[a-d].txt"), Collections.emptyList()); @@ -96,7 +98,7 @@ public void includeFileWithCharacterRangeMatcher() { } @Test - public void excludeHDFSFile() { + public void testExcludeHDFSFile() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("**"), Collections.singletonList("/dir/file2.txt")); @@ -107,7 +109,7 @@ public void excludeHDFSFile() { } @Test - public void excludeFilenameWithStart() { + public void testExcludeFilenameWithStart() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("**"), Collections.singletonList("\\*")); @@ -118,7 +120,7 @@ public void excludeFilenameWithStart() { } @Test - public void singleStarPattern() { + public void testSingleStarPattern() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("*"), Collections.emptyList()); @@ -129,7 +131,7 @@ public void singleStarPattern() { } @Test - public void doubleStarPattern() { + public void testDoubleStarPattern() { GlobFilePathFilter matcher = new GlobFilePathFilter( Collections.singletonList("**"), Collections.emptyList()); @@ -138,4 +140,30 @@ public void doubleStarPattern() { assertFalse(matcher.filterPath(new Path("a/b"))); assertFalse(matcher.filterPath(new Path("a/b/c"))); } + + @Test(expected = NullPointerException.class) + public void testIncluePatternIsNull() { + new GlobFilePathFilter( + null, + Collections.emptyList()); + } + + @Test(expected = NullPointerException.class) + public void testExcludePatternIsNull() { + new GlobFilePathFilter( + Collections.singletonList("**"), + null); + } + + @Test + public void testGlobFilterSerializable() throws IOException { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("**"), + Collections.emptyList()); + + GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher); + assertFalse(matcher.filterPath(new Path("a"))); + assertFalse(matcher.filterPath(new Path("a/b"))); + assertFalse(matcher.filterPath(new Path("a/b/c"))); + } }