diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md
index db69b2e4a6086..e62e6a3b063c6 100644
--- a/docs/ops/deployment/aws.md
+++ b/docs/ops/deployment/aws.md
@@ -105,6 +105,8 @@ To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective
cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/
{% endhighlight %}
+Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers for `s3a://`.
+
#### Configure Access Credentials
After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.
diff --git a/flink-filesystems/flink-s3-fs-hadoop/README.md b/flink-filesystems/flink-s3-fs-hadoop/README.md
index 11980e075e34b..f65ee61d189e8 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/README.md
+++ b/flink-filesystems/flink-s3-fs-hadoop/README.md
@@ -30,7 +30,7 @@ steps are required to keep the shading correct:
2. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3hadoop.S3FileSystemFactory`
- all other classes should be under `org.apache.flink.fs.s3hadoop.shaded`
- - there should be a `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` file pointing to the `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` class
+ - there should be a `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` file pointing to two classes: `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and `org.apache.flink.fs.s3hadoop.S3AFileSystemFactory`
- other service files under `META-INF/services` should have their names and contents in the relocated `org.apache.flink.fs.s3hadoop.shaded` package
- contains a `core-default-shaded.xml` file
- does not contain a `core-default.xml` or `core-site.xml` file
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3AFileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3AFileSystemFactory.java
new file mode 100644
index 0000000000000..7f3a068b4e9a4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3AFileSystemFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+/**
+ * Simple factory for the S3 file system, registered for the s3a:// scheme.
+ */
+public class S3AFileSystemFactory extends S3FileSystemFactory {
+ @Override
+ public String getScheme() {
+ return "s3a";
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
index f16986c7e5ddc..b0f723dba4187 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.fs.s3hadoop.S3FileSystemFactory
\ No newline at end of file
+org.apache.flink.fs.s3hadoop.S3FileSystemFactory
+org.apache.flink.fs.s3hadoop.S3AFileSystemFactory
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index e6406035d1548..8672ca40e207a 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -31,12 +31,16 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
@@ -53,12 +57,19 @@
* consistency guarantees
* and what the {@link org.apache.hadoop.fs.s3a.S3AFileSystem} offers.
*/
+@RunWith(Parameterized.class)
public class HadoopS3FileSystemITCase extends TestLogger {
+ @Parameterized.Parameter
+ public String scheme;
- private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+ @Parameterized.Parameters(name = "Scheme = {0}")
+ public static List parameters() {
+ return Arrays.asList("s3", "s3a");
+ }
private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+ private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
@@ -117,9 +128,13 @@ public static void cleanUp() throws IOException, InterruptedException {
}
}
+ private String getBasePath() {
+ return scheme + "://" + BUCKET + '/' + TEST_DATA_DIR + "/" + scheme;
+ }
+
@Test
public void testConfigKeysForwarding() throws Exception {
- final Path path = new Path("s3://" + BUCKET + '/');
+ final Path path = new Path(getBasePath());
// access without credentials should fail
{
@@ -181,7 +196,7 @@ public void testSimpleFileWriteAndRead() throws Exception {
FileSystem.initialize(conf);
- final Path path = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR + "/test.txt");
+ final Path path = new Path(getBasePath() + "/test.txt");
final FileSystem fs = path.getFileSystem();
try {
@@ -217,7 +232,7 @@ public void testDirectoryListing() throws Exception {
FileSystem.initialize(conf);
- final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR + "/testdir/");
+ final Path directory = new Path(getBasePath() + "/testdir/");
final FileSystem fs = directory.getFileSystem();
// directory must not yet exist
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index e08f0cc480ee0..23697d62a818e 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -412,7 +412,7 @@ check_shaded_artifacts_s3_fs() {
UNSHADED_CLASSES=`cat allClasses | grep -v -e '^META-INF' -e '^assets' -e "^org/apache/flink/fs/s3${VARIANT}/" | grep '\.class$'`
if [ "$?" == "0" ]; then
echo "=============================================================================="
- echo "Detected unshaded dependencies in fat jar:"
+ echo "${VARIANT}: Detected unshaded dependencies in fat jar:"
echo "${UNSHADED_CLASSES}"
echo "=============================================================================="
return 1
@@ -420,24 +420,33 @@ check_shaded_artifacts_s3_fs() {
if [ ! `cat allClasses | grep '^META-INF/services/org\.apache\.flink\.core\.fs\.FileSystemFactory$'` ]; then
echo "=============================================================================="
- echo "File does not exist: services/org.apache.flink.core.fs.FileSystemFactory"
+ echo "${VARIANT}: File does not exist: services/org.apache.flink.core.fs.FileSystemFactory"
echo "=============================================================================="
+ return 1
fi
UNSHADED_SERVICES=`cat allClasses | grep '^META-INF/services/' | grep -v -e '^META-INF/services/org\.apache\.flink\.core\.fs\.FileSystemFactory$' -e "^META-INF/services/org\.apache\.flink\.fs\.s3${VARIANT}\.shaded" -e '^META-INF/services/'`
if [ "$?" == "0" ]; then
echo "=============================================================================="
- echo "Detected unshaded service files in fat jar:"
+ echo "${VARIANT}: Detected unshaded service files in fat jar:"
echo "${UNSHADED_SERVICES}"
echo "=============================================================================="
return 1
fi
- FS_SERVICE_FILE_CLASS=`unzip -q -c flink-filesystems/flink-s3-fs-${VARIANT}/target/flink-s3-fs-${VARIANT}*.jar META-INF/services/org.apache.flink.core.fs.FileSystemFactory | grep -v -e '^#' -e '^$'`
- if [ "${FS_SERVICE_FILE_CLASS}" != "org.apache.flink.fs.s3${VARIANT}.S3FileSystemFactory" ]; then
+ FS_SERVICE_FILE_CLASSES=`unzip -q -c flink-filesystems/flink-s3-fs-${VARIANT}/target/flink-s3-fs-${VARIANT}*.jar META-INF/services/org.apache.flink.core.fs.FileSystemFactory | grep -v -e '^#' -e '^$'`
+ EXPECTED_FS_SERVICE_FILE_CLASSES="org.apache.flink.fs.s3${VARIANT}.S3FileSystemFactory"
+ if [ "${VARIANT}" == "hadoop" ]; then
+ read -r -d '' EXPECTED_FS_SERVICE_FILE_CLASSES <