Skip to content

Commit

Permalink
[FLINK-34485] Add URI/Configuration constructor to DynamicTemporaryAW…
Browse files Browse the repository at this point in the history
…SCredentialsProvider

Required such that this provider can be used with the presto S3 filesystem.
  • Loading branch information
zentol committed Feb 22, 2024
1 parent 6a938c9 commit cf5bb80
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 5 deletions.
9 changes: 9 additions & 0 deletions docs/content.zh/docs/deployment/filesystems/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ s3.access-key: your-access-key
s3.secret-key: your-secret-key
```
You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
```yaml
# flink-s3-fs-hadoop
fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
# flink-s3-fs-presto
presto.s3.credential-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
```
## 配置非 S3 访问点
S3 文件系统还支持兼容 S3 的对象存储服务,如 [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) 和 [Minio](https://min.io/)。可在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中配置使用的访问点:
Expand Down
9 changes: 9 additions & 0 deletions docs/content/docs/deployment/filesystems/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ s3.access-key: your-access-key
s3.secret-key: your-secret-key
```
You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
```yaml
# flink-s3-fs-hadoop
fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
# flink-s3-fs-presto
presto.s3.credential-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
```
## Configure Non-S3 Endpoint
The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.securitytoken.model.Credentials;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;

/**
* Support dynamic session credentials for authenticating with AWS. Please note that users may
* reference this class name from configuration property fs.s3a.aws.credentials.provider. Therefore,
Expand All @@ -45,6 +48,10 @@ public class DynamicTemporaryAWSCredentialsProvider implements AWSCredentialsPro
private static final Logger LOG =
LoggerFactory.getLogger(DynamicTemporaryAWSCredentialsProvider.class);

public DynamicTemporaryAWSCredentialsProvider() {}

public DynamicTemporaryAWSCredentialsProvider(URI uri, Configuration conf) {}

@Override
public AWSCredentials getCredentials() throws SdkBaseException {
Credentials credentials = AbstractS3DelegationTokenReceiver.getCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
import org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.flink.runtime.util.HadoopConfigLoader;

import com.amazonaws.auth.AWSCredentialsProvider;
Expand All @@ -32,6 +33,7 @@
import java.lang.reflect.Field;
import java.net.URI;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -53,6 +55,21 @@ public void testConfigPropagation() throws Exception {
validateBasicCredentials(fs);
}

@Test
public void testDynamicConfigProvider() throws Exception {
final Configuration conf = new Configuration();

conf.setString(
"presto.s3.credentials-provider",
DynamicTemporaryAWSCredentialsProvider.class.getName());

FileSystem.initialize(conf);

FileSystem fs = FileSystem.get(new URI("s3://test"));
assertThat(getAwsCredentialsProvider(getPrestoFileSystem(fs)))
.isInstanceOf(DynamicTemporaryAWSCredentialsProvider.class);
}

@Test
public void testConfigPropagationWithPrestoPrefix() throws Exception {
final Configuration conf = new Configuration();
Expand Down Expand Up @@ -98,15 +115,18 @@ public void testShadingOfAwsCredProviderConfig() {
// ------------------------------------------------------------------------

private static void validateBasicCredentials(FileSystem fs) throws Exception {
try (PrestoS3FileSystem prestoFs = getPrestoFileSystem(fs)) {
AWSCredentialsProvider provider = getAwsCredentialsProvider(prestoFs);
assertTrue(provider instanceof AWSStaticCredentialsProvider);
}
}

private static PrestoS3FileSystem getPrestoFileSystem(FileSystem fs) {
assertTrue(fs instanceof FlinkS3FileSystem);

org.apache.hadoop.fs.FileSystem hadoopFs = ((FlinkS3FileSystem) fs).getHadoopFileSystem();
assertTrue(hadoopFs instanceof PrestoS3FileSystem);

try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {
AWSCredentialsProvider provider = getAwsCredentialsProvider(prestoFs);
assertTrue(provider instanceof AWSStaticCredentialsProvider);
}
return (PrestoS3FileSystem) hadoopFs;
}

private static AWSCredentialsProvider getAwsCredentialsProvider(PrestoS3FileSystem fs)
Expand Down

0 comments on commit cf5bb80

Please sign in to comment.