Skip to content

Commit

Permalink
[FLINK-9402] [kinesis] Kinesis consumer configuration requires either…
Browse files Browse the repository at this point in the history
… region or endpoint.

Fix validation logic to allow either region or endpoint, but not both.

This closes apache#6045.
  • Loading branch information
tweise authored and tillrohrmann committed May 22, 2018
1 parent a273f64 commit 9ad868c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ public static AmazonKinesis createKinesisClient(Properties configProps, ClientCo
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
.withClientConfiguration(awsClientConfig)
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
.withClientConfiguration(awsClientConfig);

if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
// Set signingRegion as null, to facilitate mocking Kinesis for local tests
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null));
} else {
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,13 @@ public static void validateAwsConfiguration(Properties config) {
}
}

if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
} else {
if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION));
}

if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
// specified AWS Region name must be recognizable
if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ public void testCorrectlySetRegionInProducerConfiguration() {

@Test
public void testMissingAwsRegionInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
exception.expectMessage(expectedMessage);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
Expand All @@ -154,6 +156,22 @@ public void testUnrecognizableAwsRegionInConfig() {
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testAwsRegionOrEndpointInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east");
testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
Expand Down

0 comments on commit 9ad868c

Please sign in to comment.