Skip to content

Commit

Permalink
[FLINK-9305] [s3] Also register flink-s3-fs-hadoop's factory for the …
Browse files Browse the repository at this point in the history
…s3a:// scheme

This closes apache#5963
  • Loading branch information
Nico Kruber authored and StephanEwen committed May 24, 2018
1 parent 92b0756 commit 33aee80
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
2 changes: 2 additions & 0 deletions docs/ops/deployment/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective
cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/
{% endhighlight %}

Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers for `s3a://`.

#### Configure Access Credentials

After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.
Expand Down
2 changes: 1 addition & 1 deletion flink-filesystems/flink-s3-fs-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ steps are required to keep the shading correct:
2. verify the shaded jar:
- does not contain any unshaded classes except for `org.apache.flink.fs.s3hadoop.S3FileSystemFactory`
- all other classes should be under `org.apache.flink.fs.s3hadoop.shaded`
- there should be a `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` file pointing to the `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` class
- there should be a `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` file pointing to two classes: `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and `org.apache.flink.fs.s3hadoop.S3AFileSystemFactory`
- other service files under `META-INF/services` should have their names and contents in the relocated `org.apache.flink.fs.s3hadoop.shaded` package
- contains a `core-default-shaded.xml` file
- does not contain a `core-default.xml` or `core-site.xml` file
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3hadoop;

/**
* Simple factory for the S3 file system, registered for the <tt>s3a://</tt> scheme.
*/
public class S3AFileSystemFactory extends S3FileSystemFactory {
@Override
public String getScheme() {
return "s3a";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.fs.s3hadoop.S3FileSystemFactory
org.apache.flink.fs.s3hadoop.S3FileSystemFactory
org.apache.flink.fs.s3hadoop.S3AFileSystemFactory
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
Expand All @@ -53,12 +57,19 @@
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">consistency guarantees</a>
* and what the {@link org.apache.hadoop.fs.s3a.S3AFileSystem} offers.
*/
@RunWith(Parameterized.class)
public class HadoopS3FileSystemITCase extends TestLogger {
@Parameterized.Parameter
public String scheme;

private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
@Parameterized.Parameters(name = "Scheme = {0}")
public static List<String> parameters() {
return Arrays.asList("s3", "s3a");
}

private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();

private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");

Expand Down Expand Up @@ -117,9 +128,13 @@ public static void cleanUp() throws IOException, InterruptedException {
}
}

private String getBasePath() {
return scheme + "://" + BUCKET + '/' + TEST_DATA_DIR + "/" + scheme;
}

@Test
public void testConfigKeysForwarding() throws Exception {
final Path path = new Path("s3://" + BUCKET + '/');
final Path path = new Path(getBasePath());

// access without credentials should fail
{
Expand Down Expand Up @@ -181,7 +196,7 @@ public void testSimpleFileWriteAndRead() throws Exception {

FileSystem.initialize(conf);

final Path path = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR + "/test.txt");
final Path path = new Path(getBasePath() + "/test.txt");
final FileSystem fs = path.getFileSystem();

try {
Expand Down Expand Up @@ -217,7 +232,7 @@ public void testDirectoryListing() throws Exception {

FileSystem.initialize(conf);

final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR + "/testdir/");
final Path directory = new Path(getBasePath() + "/testdir/");
final FileSystem fs = directory.getFileSystem();

// directory must not yet exist
Expand Down
23 changes: 16 additions & 7 deletions tools/travis_mvn_watchdog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -412,32 +412,41 @@ check_shaded_artifacts_s3_fs() {
UNSHADED_CLASSES=`cat allClasses | grep -v -e '^META-INF' -e '^assets' -e "^org/apache/flink/fs/s3${VARIANT}/" | grep '\.class$'`
if [ "$?" == "0" ]; then
echo "=============================================================================="
echo "Detected unshaded dependencies in fat jar:"
echo "${VARIANT}: Detected unshaded dependencies in fat jar:"
echo "${UNSHADED_CLASSES}"
echo "=============================================================================="
return 1
fi

if [ ! `cat allClasses | grep '^META-INF/services/org\.apache\.flink\.core\.fs\.FileSystemFactory$'` ]; then
echo "=============================================================================="
echo "File does not exist: services/org.apache.flink.core.fs.FileSystemFactory"
echo "${VARIANT}: File does not exist: services/org.apache.flink.core.fs.FileSystemFactory"
echo "=============================================================================="
return 1
fi

UNSHADED_SERVICES=`cat allClasses | grep '^META-INF/services/' | grep -v -e '^META-INF/services/org\.apache\.flink\.core\.fs\.FileSystemFactory$' -e "^META-INF/services/org\.apache\.flink\.fs\.s3${VARIANT}\.shaded" -e '^META-INF/services/'`
if [ "$?" == "0" ]; then
echo "=============================================================================="
echo "Detected unshaded service files in fat jar:"
echo "${VARIANT}: Detected unshaded service files in fat jar:"
echo "${UNSHADED_SERVICES}"
echo "=============================================================================="
return 1
fi

FS_SERVICE_FILE_CLASS=`unzip -q -c flink-filesystems/flink-s3-fs-${VARIANT}/target/flink-s3-fs-${VARIANT}*.jar META-INF/services/org.apache.flink.core.fs.FileSystemFactory | grep -v -e '^#' -e '^$'`
if [ "${FS_SERVICE_FILE_CLASS}" != "org.apache.flink.fs.s3${VARIANT}.S3FileSystemFactory" ]; then
FS_SERVICE_FILE_CLASSES=`unzip -q -c flink-filesystems/flink-s3-fs-${VARIANT}/target/flink-s3-fs-${VARIANT}*.jar META-INF/services/org.apache.flink.core.fs.FileSystemFactory | grep -v -e '^#' -e '^$'`
EXPECTED_FS_SERVICE_FILE_CLASSES="org.apache.flink.fs.s3${VARIANT}.S3FileSystemFactory"
if [ "${VARIANT}" == "hadoop" ]; then
read -r -d '' EXPECTED_FS_SERVICE_FILE_CLASSES <<EOF
org.apache.flink.fs.s3${VARIANT}.S3FileSystemFactory
org.apache.flink.fs.s3${VARIANT}.S3AFileSystemFactory
EOF
fi

if [ "${FS_SERVICE_FILE_CLASSES}" != "${EXPECTED_FS_SERVICE_FILE_CLASSES}" ]; then
echo "=============================================================================="
echo "Detected wrong content in services/org.apache.flink.core.fs.FileSystemFactory:"
echo "${FS_SERVICE_FILE_CLASS}"
echo "${VARIANT}: Detected wrong content in services/org.apache.flink.core.fs.FileSystemFactory:"
echo "${FS_SERVICE_FILE_CLASSES}"
echo "=============================================================================="
return 1
fi
Expand Down

0 comments on commit 33aee80

Please sign in to comment.