Skip to content

Commit

Permalink
[Issue 7949][Tiered Storage] support aws creds per offload policies (a…
Browse files Browse the repository at this point in the history
…pache#7950)

Fixes apache#7949 

### Motivation

Provide different s3 credentials per offloadpolicies on each ns.

### Modifications

Add awsId/awsSecret in OffloadPolicies.

### Does this pull request potentially affect one of the following parts:

  - The rest endpoints: yes it adds options
  - The admin cli options: yes it adds options

### Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs
  • Loading branch information
KannarFr authored Nov 6, 2020
1 parent 572de53 commit 2530177
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ Set<Long> deletedOffloads() {
}

OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void testOffloadPolicies() throws Exception {
long offloadDeletionLagInMillis = 100L;

OffloadPolicies offload1 = OffloadPolicies.create(
driver, region, bucket, endpoint, 100, 100,
offloadThresholdInBytes, offloadDeletionLagInMillis);
driver, region, bucket, endpoint, null, null,
100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ public void testSetOffloadThreshold() throws Exception {
// the ledger config should have the expected value
ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
Expand All @@ -1254,6 +1255,7 @@ public void testSetOffloadThreshold() throws Exception {
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
admin.namespaces().getOffloadPolicies(namespace);
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
Expand All @@ -1267,6 +1269,7 @@ public void testSetOffloadThreshold() throws Exception {
assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
Expand All @@ -1280,6 +1283,7 @@ public void testSetOffloadThreshold() throws Exception {
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ public void namespaces() throws Exception {
namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
10L * 1024 * 1024, 10000L));
"http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 10000L));

namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
Expand Down Expand Up @@ -753,7 +753,7 @@ public void topics() throws Exception {

cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
, "endpoint", 8, 9, 10L, null);
, "endpoint", null, null, 8, 9, 10L, null);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);

cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,18 @@ private class SetOffloadPolicies extends CliCommand {
required = false)
private String endpoint;

@Parameter(
names = {"--aws-id", "-i"},
description = "AWS Credential Id to use when using driver S3 or aws-s3",
required = false)
private String awsId;

@Parameter(
names = {"--aws-secret", "-s"},
description = "AWS Credential Secret to use when using driver S3 or aws-s3",
required = false)
private String awsSecret;

@Parameter(
names = {"--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB",
Expand Down Expand Up @@ -1697,7 +1709,7 @@ && maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)
}
}

OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret,
maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
offloadAfterElapsedInMillis);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,14 @@ private class SetOffloadPolicies extends CliCommand {
, description = "ManagedLedger offload service endpoint, only s3 requires this parameter")
private String endpoint;

@Parameter(names = {"-i", "--aws-id"}
, description = "AWS Credential Id to use when using driver S3 or aws-s3")
private String awsId;

@Parameter(names = {"-s", "--aws-secret"}
, description = "AWS Credential Secret to use when using driver S3 or aws-s3")
private String awsSecret;

@Parameter(names = {"-m", "--maxBlockSizeInBytes"}
, description = "ManagedLedger offload max block Size in bytes, s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
Expand All @@ -1244,7 +1252,7 @@ private class SetOffloadPolicies extends CliCommand {
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, maxBlockSizeInBytes
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
, readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis);
admin.topics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class OffloadPolicies implements Serializable {
private Integer managedLedgerOffloadReadBufferSizeInBytes;

public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
String credentialId, String credentialSecret,
Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
OffloadPolicies offloadPolicies = new OffloadPolicies();
Expand All @@ -148,6 +149,12 @@ public static OffloadPolicies create(String driver, String region, String bucket
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);

if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
if (credentialId != null) {
offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
}
if (credentialSecret != null) {
offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(credentialSecret);
}
offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public void testS3Configuration() {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
final Integer maxBlockSizeInBytes = 5 * M;
final Integer readBufferSizeInBytes = 2 * M;
Expand All @@ -52,6 +54,8 @@ public void testS3Configuration() {
region,
bucket,
endPoint,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
Expand All @@ -76,6 +80,8 @@ public void testGcsConfiguration() {
final String region = "test-region";
final String bucket = "test-bucket";
final String endPoint = "test-endpoint";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final Integer maxBlockSizeInBytes = 5 * M;
final Integer readBufferSizeInBytes = 2 * M;
final Long offloadThresholdInBytes = 0L;
Expand All @@ -86,6 +92,8 @@ public void testGcsConfiguration() {
region,
bucket,
endPoint,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
Expand Down
2 changes: 2 additions & 0 deletions site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,8 @@ Options
|`-r`, `--region`|The long term storage region||
|`-b`, `--bucket`|Bucket to place offloaded ledger into||
|`-e`, `--endpoint`|Alternative endpoint to connect to||
|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3||
|`-mbs`, `--maxBlockSize`|Max block size|64MB|
|`-rbs`, `--readBufferSize`|Read buffer size|1MB|
|`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||
Expand Down

0 comments on commit 2530177

Please sign in to comment.