From 98ad39ffa51239e389c73411dfb8df7f5592a5aa Mon Sep 17 00:00:00 2001 From: wangyufan Date: Tue, 29 Dec 2020 17:08:16 +0800 Subject: [PATCH] [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985) [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS --- .../common/policies/data/OffloadPolicies.java | 4 +- .../provider/JCloudBlobStoreProvider.java | 81 ++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 422bf24875e36..4c5058b88cf5e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -60,7 +60,9 @@ public class OffloadPolicies implements Serializable { public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2; public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; - public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"}; + public final static String[] DRIVER_NAMES = { + "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss" + }; public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null; public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java index 9d0871e1d9d66..ba7065e5da390 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Properties; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -57,6 +58,8 @@ import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata; import org.jclouds.providers.AnonymousProviderMetadata; import org.jclouds.providers.ProviderMetadata; +import org.jclouds.s3.S3ApiMetadata; +import org.jclouds.s3.reference.S3Constants; /** * Enumeration of the supported JCloud Blob Store Providers. @@ -162,6 +165,28 @@ public void buildCredentials(TieredStorageConfiguration config) { } }, + + /** + * Aliyun OSS is compatible with the S3 API + * https://www.alibabacloud.com/help/doc-detail/64919.htm + */ + ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) { + @Override + public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { + ALIYUN_OSS_VALIDATION.validate(config); + } + + @Override + public BlobStore getBlobStore(TieredStorageConfiguration config) { + return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config); + } + + @Override + public void buildCredentials(TieredStorageConfiguration config) { + ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config); + } + }, + TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) { @Override public void validate(TieredStorageConfiguration config) throws IllegalArgumentException { @@ -177,7 +202,7 @@ public BlobStore getBlobStore(TieredStorageConfiguration config) { ContextBuilder builder = ContextBuilder.newBuilder("transient"); BlobStoreContext ctx = builder .buildView(BlobStoreContext.class); - + BlobStore bs = ctx.getBlobStore(); if (!bs.containerExists(config.getBucket())) { @@ -312,4 +337,58 @@ public ProviderMetadata getProviderMetadata() { } } }; + + static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> { + ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata()); + Properties overrides = config.getOverrides(); + // For security reasons, OSS supports only virtual hosted style access. + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true"); + contextBuilder.overrides(overrides); + contextBuilder.endpoint(config.getServiceEndpoint()); + + if (config.getProviderCredentials() != null) { + return contextBuilder + .credentialsSupplier(config.getCredentials()) + .buildView(BlobStoreContext.class) + .getBlobStore(); + } else { + log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket()); + return contextBuilder + .buildView(BlobStoreContext.class) + .getBlobStore(); + } + }; + + static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> { + if (Strings.isNullOrEmpty(config.getServiceEndpoint())) { + throw new IllegalArgumentException( + "ServiceEndpoint must specified for " + config.getDriver() + " offload"); + } + + if (Strings.isNullOrEmpty(config.getBucket())) { + throw new IllegalArgumentException( + "Bucket cannot be empty for " + config.getDriver() + " offload"); + } + + if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) { + throw new IllegalArgumentException( + "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for " + + config.getDriver() + " offload"); + } + }; + + static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> { + String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID"); + if (StringUtils.isEmpty(accountName)) { + throw new IllegalArgumentException("Couldn't get the aliyun oss access key id."); + } + String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET"); + if (StringUtils.isEmpty(accountKey)) { + throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret."); + } + Credentials credentials = new Credentials( + accountName, accountKey); + config.setProviderCredentials(() -> credentials); + }; + } \ No newline at end of file