forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Pulsar-IO: Added Kinesis Source Connector (apache#3784)
- Loading branch information
1 parent
16c15ed
commit 53fb055
Showing
14 changed files
with
941 additions
and
127 deletions.
There are no files selected for viewing
116 changes: 116 additions & 0 deletions
116
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AbstractKinesisConnector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.kinesis; | ||
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static org.apache.commons.lang3.StringUtils.isNotBlank; | ||
|
||
import java.lang.reflect.Constructor; | ||
import java.util.Map; | ||
|
||
import com.amazonaws.auth.AWSCredentials; | ||
import com.amazonaws.auth.AWSCredentialsProvider; | ||
import com.amazonaws.auth.BasicAWSCredentials; | ||
import com.google.gson.Gson; | ||
import com.google.gson.reflect.TypeToken; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public abstract class AbstractKinesisConnector { | ||
|
||
public static final String ACCESS_KEY_NAME = "accessKey"; | ||
public static final String SECRET_KEY_NAME = "secretKey"; | ||
|
||
protected AWSCredentialsProvider createCredentialProvider(String awsCredentialPluginName, | ||
String awsCredentialPluginParam) { | ||
if (isNotBlank(awsCredentialPluginName)) { | ||
return createCredentialProviderWithPlugin(awsCredentialPluginName, awsCredentialPluginParam); | ||
} else { | ||
return defaultCredentialProvider(awsCredentialPluginParam); | ||
} | ||
} | ||
|
||
/** | ||
* Creates a instance of credential provider which can return {@link AWSCredentials} or {@link BasicAWSCredentials} | ||
* based on IAM user/roles. | ||
* | ||
* @param pluginFQClassName | ||
* @param param | ||
* @return | ||
* @throws IllegalArgumentException | ||
*/ | ||
public static AWSCredentialsProvider createCredentialProviderWithPlugin(String pluginFQClassName, String param) | ||
throws IllegalArgumentException { | ||
try { | ||
Class<?> clazz = Class.forName(pluginFQClassName); | ||
Constructor<?> ctor = clazz.getConstructor(); | ||
final AwsCredentialProviderPlugin plugin = (AwsCredentialProviderPlugin) ctor.newInstance(new Object[] {}); | ||
plugin.init(param); | ||
return plugin.getCredentialProvider(); | ||
} catch (Exception e) { | ||
log.error("Failed to initialize AwsCredentialProviderPlugin {}", pluginFQClassName, e); | ||
throw new IllegalArgumentException( | ||
String.format("invalid authplugin name %s , failed to init %s", pluginFQClassName, e.getMessage())); | ||
} | ||
} | ||
|
||
/** | ||
* It creates a default credential provider which takes accessKey and secretKey form configuration and creates | ||
* {@link AWSCredentials} | ||
* | ||
* @param awsCredentialPluginParam | ||
* @return | ||
*/ | ||
protected AWSCredentialsProvider defaultCredentialProvider(String awsCredentialPluginParam) { | ||
Map<String, String> credentialMap = new Gson().fromJson(awsCredentialPluginParam, | ||
new TypeToken<Map<String, String>>() { | ||
}.getType()); | ||
String accessKey = credentialMap.get(ACCESS_KEY_NAME); | ||
String secretKey = credentialMap.get(SECRET_KEY_NAME); | ||
checkArgument(isNotBlank(accessKey) && isNotBlank(secretKey), | ||
String.format( | ||
"Default %s and %s must be present into json-map if AwsCredentialProviderPlugin not provided", | ||
ACCESS_KEY_NAME, SECRET_KEY_NAME)); | ||
return defaultCredentialProvider(accessKey, secretKey); | ||
} | ||
|
||
private AWSCredentialsProvider defaultCredentialProvider(String accessKey, String secretKey) { | ||
return new AWSCredentialsProvider() { | ||
@Override | ||
public AWSCredentials getCredentials() { | ||
return new AWSCredentials() { | ||
@Override | ||
public String getAWSAccessKeyId() { | ||
return accessKey; | ||
} | ||
|
||
@Override | ||
public String getAWSSecretKey() { | ||
return secretKey; | ||
} | ||
}; | ||
} | ||
@Override | ||
public void refresh() { | ||
// no-op | ||
} | ||
}; | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.kinesis; | ||
|
||
import java.io.Serializable; | ||
|
||
import org.apache.pulsar.io.core.annotations.FieldDoc; | ||
|
||
import lombok.Data; | ||
|
||
@Data | ||
public abstract class BaseKinesisConfig implements Serializable { | ||
|
||
private static final long serialVersionUID = 1L; | ||
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", | ||
help = "Kinesis end-point url. It can be found at https://docs.aws.amazon.com/general/latest/gr/rande.html" | ||
) | ||
private String awsEndpoint; | ||
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", | ||
help = "Appropriate aws region. E.g. us-west-1, us-west-2" | ||
) | ||
private String awsRegion; | ||
|
||
@FieldDoc( | ||
required = true, | ||
defaultValue = "", | ||
help = "Kinesis stream name" | ||
) | ||
private String awsKinesisStreamName; | ||
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", | ||
help = "Fully-Qualified class name of implementation of AwsCredentialProviderPlugin." | ||
+ " It is a factory class which creates an AWSCredentialsProvider that will be used by Kinesis Sink." | ||
+ " If it is empty then KinesisSink will create a default AWSCredentialsProvider which accepts json-map" | ||
+ " of credentials in `awsCredentialPluginParam`") | ||
private String awsCredentialPluginName; | ||
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", | ||
help = "json-parameters to initialize `AwsCredentialsProviderPlugin`") | ||
private String awsCredentialPluginParam; | ||
|
||
} |
81 changes: 81 additions & 0 deletions
81
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.kinesis; | ||
|
||
import java.nio.charset.CharacterCodingException; | ||
import java.nio.charset.Charset; | ||
import java.nio.charset.CharsetDecoder; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.apache.pulsar.functions.api.Record; | ||
import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils; | ||
|
||
public class KinesisRecord implements Record<byte[]> { | ||
|
||
public static final String ARRIVAL_TIMESTAMP = ""; | ||
public static final String ENCRYPTION_TYPE = ""; | ||
public static final String PARTITION_KEY = ""; | ||
public static final String SEQUENCE_NUMBER = ""; | ||
|
||
private static final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); | ||
private final Optional<String> key; | ||
private final byte[] value; | ||
private final HashMap<String, String> userProperties = new HashMap<String, String> (); | ||
|
||
public KinesisRecord(com.amazonaws.services.kinesis.model.Record record) { | ||
this.key = Optional.of(record.getPartitionKey()); | ||
setProperty(ARRIVAL_TIMESTAMP, record.getApproximateArrivalTimestamp().toString()); | ||
setProperty(ENCRYPTION_TYPE, record.getEncryptionType()); | ||
setProperty(PARTITION_KEY, record.getPartitionKey()); | ||
setProperty(SEQUENCE_NUMBER, record.getSequenceNumber()); | ||
|
||
if (StringUtils.isBlank(record.getEncryptionType())) { | ||
String s = null; | ||
try { | ||
s = decoder.decode(record.getData()).toString(); | ||
} catch (CharacterCodingException e) { | ||
// Ignore | ||
} | ||
this.value = (s != null) ? s.getBytes() : null; | ||
} else { | ||
// Who knows? | ||
this.value = null; | ||
} | ||
} | ||
|
||
@Override | ||
public Optional<String> getKey() { | ||
return key; | ||
} | ||
|
||
@Override | ||
public byte[] getValue() { | ||
return value; | ||
} | ||
|
||
public Map<String, String> getProperties() { | ||
return userProperties; | ||
} | ||
|
||
public void setProperty(String key, String value) { | ||
userProperties.put(key, value); | ||
} | ||
} |
115 changes: 115 additions & 0 deletions
115
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.kinesis; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; | ||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException; | ||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; | ||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; | ||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; | ||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; | ||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; | ||
import com.amazonaws.services.kinesis.model.Record; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public class KinesisRecordProcessor implements IRecordProcessor { | ||
|
||
private final int numRetries; | ||
private final long checkpointInterval; | ||
private final long backoffTime; | ||
|
||
private final LinkedBlockingQueue<KinesisRecord> queue; | ||
private long nextCheckpointTimeInNanos; | ||
private String kinesisShardId; | ||
|
||
public KinesisRecordProcessor(LinkedBlockingQueue<KinesisRecord> queue, KinesisSourceConfig config) { | ||
this.queue = queue; | ||
this.backoffTime = config.getBackoffTime(); | ||
this.checkpointInterval = config.getCheckpointInterval(); | ||
this.numRetries = config.getNumRetries(); | ||
} | ||
|
||
@Override | ||
public void initialize(String shardId) { | ||
kinesisShardId = shardId; | ||
} | ||
|
||
@Override | ||
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { | ||
log.info("Processing " + records.size() + " records from " + kinesisShardId); | ||
|
||
for (Record record : records) { | ||
try { | ||
queue.put(new KinesisRecord(record)); | ||
} catch (InterruptedException e) { | ||
log.warn("unable to create KinesisRecord ", e); | ||
} | ||
} | ||
|
||
// Checkpoint once every checkpoint interval. | ||
if (System.nanoTime() > nextCheckpointTimeInNanos) { | ||
checkpoint(checkpointer); | ||
nextCheckpointTimeInNanos = System.nanoTime() + checkpointInterval; | ||
} | ||
} | ||
|
||
@Override | ||
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { | ||
log.info("Shutting down record processor for shard: " + kinesisShardId); | ||
if (reason == ShutdownReason.TERMINATE) { | ||
checkpoint(checkpointer); | ||
} | ||
} | ||
|
||
private void checkpoint(IRecordProcessorCheckpointer checkpointer) { | ||
log.info("Checkpointing shard " + kinesisShardId); | ||
|
||
for (int i = 0; i < numRetries; i++) { | ||
try { | ||
checkpointer.checkpoint(); | ||
break; | ||
} catch (ShutdownException se) { | ||
// Ignore checkpoint if the processor instance has been shutdown. | ||
log.info("Caught shutdown exception, skipping checkpoint.", se); | ||
break; | ||
} catch (InvalidStateException e) { | ||
log.error("Cannot save checkpoint to the DynamoDB table.", e); | ||
break; | ||
} catch (ThrottlingException e) { | ||
// Back off and re-attempt checkpoint upon transient failures | ||
if (i >= (numRetries - 1)) { | ||
log.error("Checkpoint failed after " + (i + 1) + "attempts.", e); | ||
break; | ||
} | ||
} | ||
|
||
try { | ||
Thread.sleep(backoffTime); | ||
} catch (InterruptedException e) { | ||
log.debug("Interrupted sleep", e); | ||
} | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.