Skip to content

Commit

Permalink
[kinesis-source] Improvements to kinesis-source (apache#5180)
Browse files Browse the repository at this point in the history
* Move to using the KCL v2

This moves to using KCL v2 with the new API which supports
"enhanced fan-out". Enhanced fan-out is beneficial as we get more
guranteed capacity and it won't compete with other consumers.

However, support for ehanced fan-out requires pulling in v2 of AWS SDKs.

This isn't a huge problem, as they can be shared, but it does mean
implemting a few shim APIs around credentials.

Most of the business logic remains largely the same, just using the
newer interfaces. Enhanced fan-out can also be disabled and fall back to
a polling model.

Note, this also bumps the kinesis-producer to v0.13, which moves to an
APL license (but has no other changes) to make this nicer to use in
an Apache project without license concerns.

* Add included providers for kinesis auth

This adds two providers for Kinesis auth. First, the
AwsDefaultProviderChain, which just uses the default provider chain
which will pull from env vars, system configuration files, and the
metadata API.

Second, the STSAssumeRole plugin allows us to assume a role (which first
relies on the default provider chain) to run the code under a given role

* add some docs and minor cleanup
  • Loading branch information
addisonj authored and jiazhai committed Sep 28, 2019
1 parent 07bb845 commit 87ccdb0
Show file tree
Hide file tree
Showing 19 changed files with 549 additions and 177 deletions.
25 changes: 22 additions & 3 deletions pulsar-io/kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<!-- add cbor for kinesis-client to fix dep conflict -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
Expand All @@ -77,15 +84,27 @@
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.9.0</version>
<version>2.2.3</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.12.9</version>
<version>0.13.1</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.11.619</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.8.5</version>
</dependency>
<!-- /kinesis dependencies -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Map;

Expand All @@ -38,7 +39,7 @@ public abstract class AbstractKinesisConnector {
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";

protected AWSCredentialsProvider createCredentialProvider(String awsCredentialPluginName,
protected AwsCredentialProviderPlugin createCredentialProvider(String awsCredentialPluginName,
String awsCredentialPluginParam) {
if (isNotBlank(awsCredentialPluginName)) {
return createCredentialProviderWithPlugin(awsCredentialPluginName, awsCredentialPluginParam);
Expand All @@ -56,14 +57,14 @@ protected AWSCredentialsProvider createCredentialProvider(String awsCredentialPl
* @return
* @throws IllegalArgumentException
*/
public static AWSCredentialsProvider createCredentialProviderWithPlugin(String pluginFQClassName, String param)
public static AwsCredentialProviderPlugin createCredentialProviderWithPlugin(String pluginFQClassName, String param)
throws IllegalArgumentException {
try {
Class<?> clazz = Class.forName(pluginFQClassName);
Constructor<?> ctor = clazz.getConstructor();
final AwsCredentialProviderPlugin plugin = (AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {});
plugin.init(param);
return plugin.getCredentialProvider();
return plugin;
} catch (Exception e) {
log.error("Failed to initialize AwsCredentialProviderPlugin {}", pluginFQClassName, e);
throw new IllegalArgumentException(
Expand All @@ -78,7 +79,7 @@ public static AWSCredentialsProvider createCredentialProviderWithPlugin(String p
* @param awsCredentialPluginParam
* @return
*/
protected AWSCredentialsProvider defaultCredentialProvider(String awsCredentialPluginParam) {
protected AwsCredentialProviderPlugin defaultCredentialProvider(String awsCredentialPluginParam) {
Map<String, String> credentialMap = new Gson().fromJson(awsCredentialPluginParam,
new TypeToken<Map<String, String>>() {
}.getType());
Expand All @@ -88,7 +89,23 @@ protected AWSCredentialsProvider defaultCredentialProvider(String awsCredentialP
String.format(
"Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided",
ACCESS_KEY_NAME, SECRET_KEY_NAME));
return defaultCredentialProvider(accessKey, secretKey);
return new AwsCredentialProviderPlugin() {
@Override
public void init(String param) {
// noop

}

@Override
public AWSCredentialsProvider getCredentialProvider() {
return defaultCredentialProvider(accessKey, secretKey);
}

@Override
public void close() throws IOException {

}
};
}

private AWSCredentialsProvider defaultCredentialProvider(String accessKey, String secretKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.io.Closeable;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.BasicSessionCredentials;

/**
Expand All @@ -47,4 +49,28 @@ public interface AwsCredentialProviderPlugin extends Closeable {
*/
AWSCredentialsProvider getCredentialProvider();

/**
* Returns a V2 credential provider for use with the v2 SDK.
*
* Defaults to an implementation that pulls credentials from a v1 provider
*/
default software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
// make a small wrapper to forward requests to v1, this allows
// for this interface to not "break" for implementers
AWSCredentialsProvider v1Provider = getCredentialProvider();
return () -> {
AWSCredentials creds = v1Provider.getCredentials();
if (creds instanceof AWSSessionCredentials) {
return software.amazon.awssdk.auth.credentials.AwsSessionCredentials.create(
creds.getAWSAccessKeyId(),
creds.getAWSSecretKey(),
((AWSSessionCredentials) creds).getSessionToken());
} else {
return software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create(
creds.getAWSAccessKeyId(),
creds.getAWSSecretKey());
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kinesis;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

import java.io.IOException;

public class AwsDefaultProviderChainPlugin implements AwsCredentialProviderPlugin {
@Override
public void init(String param) {

}

@Override
public AWSCredentialsProvider getCredentialProvider() {
return new DefaultAWSCredentialsProviderChain();
}

@Override
public software.amazon.awssdk.auth.credentials.AwsCredentialsProvider getV2CredentialsProvider() {
return DefaultCredentialsProvider.create();
}

@Override
public void close() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.apache.pulsar.io.core.annotations.FieldDoc;

import software.amazon.awssdk.regions.Region;

import lombok.Data;

@Data
Expand All @@ -34,35 +36,38 @@ public abstract class BaseKinesisConfig implements Serializable {
defaultValue = "",
help = "Kinesis end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html"
)
private String awsEndpoint;
private String awsEndpoint = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Appropriate aws region. E.g. us-west-1, us-west-2"
)
private String awsRegion;
private String awsRegion = "";

@FieldDoc(
required = true,
defaultValue = "",
help = "Kinesis stream name"
)
private String awsKinesisStreamName;
private String awsKinesisStreamName = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "Fully-Qualified class name of implementation of AwsCredentialProviderPlugin."
+ " It is a factory class which creates an AWSCredentialsProvider that will be used by Kinesis Sink."
+ " If it is empty then KinesisSink will create a default AWSCredentialsProvider which accepts json-map"
+ " of credentials in `awsCredentialPluginParam`")
private String awsCredentialPluginName;
private String awsCredentialPluginName = "";

@FieldDoc(
required = false,
defaultValue = "",
help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
private String awsCredentialPluginParam;

private String awsCredentialPluginParam = "";

protected Region regionAsV2Region() {
return Region.of(this.getAwsRegion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import java.util.Optional;

import org.apache.pulsar.functions.api.Record;
import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.EncryptionType;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecord implements Record<byte[]> {

Expand All @@ -40,21 +41,30 @@ public class KinesisRecord implements Record<byte[]> {
private final byte[] value;
private final HashMap<String, String> userProperties = new HashMap<String, String> ();

public KinesisRecord(com.amazonaws.services.kinesis.model.Record record) {
this.key = Optional.of(record.getPartitionKey());
setProperty(ARRIVAL_TIMESTAMP, record.getApproximateArrivalTimestamp().toString());
setProperty(ENCRYPTION_TYPE, record.getEncryptionType());
setProperty(PARTITION_KEY, record.getPartitionKey());
setProperty(SEQUENCE_NUMBER, record.getSequenceNumber());

if (StringUtils.isBlank(record.getEncryptionType())) {
public KinesisRecord(KinesisClientRecord record) {
this.key = Optional.of(record.partitionKey());
// encryption type can (annoyingly) be null, so we default to NONE
EncryptionType encType = EncryptionType.NONE;
if (record.encryptionType() != null) {
encType = record.encryptionType();
}
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
setProperty(ENCRYPTION_TYPE, encType.toString());
setProperty(PARTITION_KEY, record.partitionKey());
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());

if (encType == EncryptionType.NONE) {
String s = null;
try {
s = decoder.decode(record.getData()).toString();
s = decoder.decode(record.data()).toString();
} catch (CharacterCodingException e) {
// Ignore
}
this.value = (s != null) ? s.getBytes() : null;
} else if (encType == EncryptionType.KMS) {
// use the raw encrypted value, let them handle it downstream
// TODO: support decoding KMS data here... should be fairly simple
this.value = record.data().array();
} else {
// Who knows?
this.value = null;
Expand Down
Loading

0 comments on commit 87ccdb0

Please sign in to comment.