diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java index c85fe9fd79107..9402e82ff2215 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Interface for offloading ledgers to long-term storage @@ -112,5 +113,17 @@ CompletableFuture readOffloaded(long ledgerId, UUID uid, */ CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map offloadDriverMetadata); + + /** + * Get offload policies of this LedgerOffloader + * + * @return offload policies + */ + OffloadPolicies getOffloadPolicies(); + + /** + * Close the resources if necessary + */ + void close(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index f0a6890aeb272..7a0e6dc38d0d9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Factory to create {@link LedgerOffloader} to offload ledgers into long-term storage. @@ -43,13 +44,13 @@ public interface LedgerOffloaderFactory { /** * Create a ledger offloader with the provided configuration, user-metadata and scheduler. * - * @param properties service configuration + * @param offloadPolicies offload policies * @param userMetadata user metadata * @param scheduler scheduler * @return the offloader instance * @throws IOException when fail to create an offloader */ - T create(Properties properties, + T create(OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) throws IOException; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java index 3401f1bbbb647..aa3aae71d1db7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Null implementation that throws an error on any invokation. @@ -60,4 +61,14 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, promise.completeExceptionally(new UnsupportedOperationException()); return promise; } + + @Override + public OffloadPolicies getOffloadPolicies() { + return null; + } + + @Override + public void close() { + + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 17d54fa6e71f3..0a96a1fdc894e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.testng.Assert; import org.testng.annotations.Test; @@ -145,6 +146,16 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uuid, offloads.remove(uuid); return CompletableFuture.completedFuture(null); }; + + @Override + public OffloadPolicies getOffloadPolicies() { + return null; + } + + @Override + public void close() { + + } } static class MockOffloadReadHandle implements ReadHandle { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index bf2f2e845d66b..997f3f66f3310 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -51,6 +51,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1025,6 +1026,16 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uuid, } return promise; }; + + @Override + public OffloadPolicies getOffloadPolicies() { + return null; + } + + @Override + public void close() { + + } } static class ErroringMockLedgerOffloader extends MockLedgerOffloader { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ff7ba47728a9c..fe848b5e8b466 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -38,7 +38,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -98,6 +100,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -109,7 +112,6 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; -import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; @@ -162,7 +164,8 @@ public class PulsarService implements AutoCloseable { private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; private Offloaders offloaderManager = new Offloaders(); - private LedgerOffloader offloader; + private LedgerOffloader defaultOffloader; + private Map ledgerOffloaderMap = new ConcurrentHashMap<>(); private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; private ScheduledFuture loadResourceQuotaTask = null; @@ -398,7 +401,8 @@ public void start() throws PulsarServerException { // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); - this.offloader = createManagedLedgerOffloader(this.getConfiguration()); + this.defaultOffloader = createManagedLedgerOffloader( + OffloadPolicies.create(this.getConfiguration().getProperties())); brokerService.start(); @@ -764,28 +768,54 @@ public ManagedLedgerClientFactory getManagedLedgerClientFactory() { return managedLedgerClientFactory; } - public LedgerOffloader getManagedLedgerOffloader() { - return offloader; + /** + * First, get LedgerOffloader from local map cache, create new LedgerOffloader if not in cache or + * the OffloadPolicies changed, return the LedgerOffloader directly if exist in cache + * and the OffloadPolicies not changed. + * + * @param namespaceName NamespaceName + * @param offloadPolicies the OffloadPolicies + * @return LedgerOffloader + */ + public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPolicies offloadPolicies) { + if (offloadPolicies == null) { + return getDefaultOffloader(); + } + return ledgerOffloaderMap.compute(namespaceName, (ns, offloader) -> { + try { + if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) { + return offloader; + } else { + if (offloader != null) { + offloader.close(); + } + return createManagedLedgerOffloader(offloadPolicies); + } + } catch (PulsarServerException e) { + LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e); + return new NullLedgerOffloader(); + } + }); } - public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) + public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies offloadPolicies) throws PulsarServerException { try { - if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) { - checkNotNull(conf.getOffloadersDirectory(), + if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) { + checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", - conf.getManagedLedgerOffloadDriver()); - this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()); + offloadPolicies.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory()); LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( - conf.getManagedLedgerOffloadDriver()); + offloadPolicies.getManagedLedgerOffloadDriver()); try { return offloaderFactory.create( - conf.getProperties(), + offloadPolicies, ImmutableMap.of( LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() ), - getOffloaderScheduler(conf)); + getOffloaderScheduler(offloadPolicies)); } catch (IOException ioe) { throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); } @@ -862,10 +892,10 @@ public synchronized Compactor getCompactor() throws PulsarServerException { return this.compactor; } - protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) { + protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() - .numThreads(conf.getManagedLedgerOffloadMaxThreads()) + .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) .name("offloader").build(); } return this.offloaderScheduler; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f28205130ec17..d2368d7c7bd20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -2276,5 +2277,74 @@ private void mutatePolicy(Function policyTransformation, } } + protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPolicies offloadPolicies) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + validateOffloadPolicies(offloadPolicies); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, namespaceName.toString()); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + policies.offload_policies = offloadPolicies; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(), + (rc, path1, ctx, stat) -> { + if (rc == KeeperException.Code.OK.intValue()) { + policiesCache().invalidate(path(POLICIES, namespaceName.toString())); + } else { + String errorMsg = String.format( + "[%s] Failed to update offload configuration for namespace %s", + clientAppId(), namespaceName); + if (rc == KeeperException.Code.NONODE.intValue()) { + log.warn("{} : does not exist", errorMsg); + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); + } else if (rc == KeeperException.Code.BADVERSION.intValue()) { + log.warn("{} : concurrent modification", errorMsg); + asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); + } else { + asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg)); + } + } + }, null); + log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(), + namespaceName, jsonMapper().writeValueAsString(policies.offload_policies)); + asyncResponse.resume(Response.noContent().build()); + } catch (Exception e) { + log.error("[{}] Failed to update offload configuration for namespace {}", clientAppId(), namespaceName, + e); + asyncResponse.resume(new RestException(e)); + } + } + + private void validateOffloadPolicies(OffloadPolicies offloadPolicies) { + if (offloadPolicies == null) { + log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null", + clientAppId(), namespaceName); + throw new RestException(Status.PRECONDITION_FAILED, + "The offloadPolicies must be specified for namespace offload."); + } + if (!offloadPolicies.driverSupported()) { + log.warn("[{}] Failed to update offload configuration for namespace {}: " + + "driver is not supported, support value: {}", + clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames()); + throw new RestException(Status.PRECONDITION_FAILED, + "The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames()); + } + if (!offloadPolicies.bucketValid()) { + log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified", + clientAppId(), namespaceName); + throw new RestException(Status.PRECONDITION_FAILED, + "The bucket must be specified for namespace offload."); + } + } + + protected OffloadPolicies internalGetOffloadPolicies() { + validateAdminAccessForTenant(namespaceName.getTenant()); + + Policies policies = getNamespacePolicies(namespaceName); + return policies.offload_policies; + } + private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 0d497404b2e78..8efcfa19b87f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -51,6 +51,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1142,5 +1143,37 @@ public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant, internalSetSchemaValidationEnforced(schemaValidationEnforced); } + @POST + @Path("/{tenant}/{namespace}/offloadPolicies") + @ApiOperation(value = " Set offload configuration on a namespace.") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid") }) + public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + OffloadPolicies offload, @Suspended final AsyncResponse asyncResponse) { + try { + validateNamespaceName(tenant, namespace); + internalSetOffloadPolicies(asyncResponse, offload); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @GET + @Path("/{tenant}/{namespace}/offloadPolicies") + @ApiOperation(value = "Get offload configuration on a namespace.") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public OffloadPolicies getOffloadPolicies(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + return internalGetOffloadPolicies(); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 21a7d74e15a1e..46f322ff3729b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -121,6 +121,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.Policies; @@ -982,7 +983,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); + OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null); + managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); policies.ifPresent(p -> { long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs(); if (p.offload_deletion_lag_ms != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b54e4c1fd402b..f624ae8011268 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -99,7 +99,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -1800,6 +1799,9 @@ public CompletableFuture onPoliciesUpdate(Policies data) { if (this.subscribeRateLimiter.isPresent()) { subscribeRateLimiter.get().onPoliciesUpdate(data); } + getManagedLedger().getConfig().setLedgerOffloader( + brokerService.pulsar().getManagedLedgerOffloader( + TopicName.get(topic).getNamespaceObject(), data.offload_policies)); return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 7f56a3fc02e9b..25c45dfd29a7d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -71,7 +72,7 @@ private void testOffload(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); when(offloader.getOffloadDriverName()).thenReturn("mock"); - doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); + doReturn(offloader).when(pulsar).getManagedLedgerOffloader(any(), any()); CompletableFuture promise = new CompletableFuture<>(); doReturn(promise).when(offloader).offload(any(), any(), any()); @@ -138,4 +139,20 @@ public void testOffloadV1() throws Exception { String mlName = "prop-xyz/test/ns1/persistent/topic2"; testOffload(topicName, mlName); } + + @Test + public void testOffloadPolicies() throws Exception { + String namespaceName = "prop-xyz/ns1"; + String driver = "aws-s3"; + String region = "test-region"; + String bucket = "test-bucket"; + String endpoint = "test-endpoint"; + + OffloadPolicies offload1 = OffloadPolicies.create( + driver, region, bucket, endpoint, 100, 100); + admin.namespaces().setOffloadPolicies(namespaceName, offload1); + OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName); + Assert.assertEquals(offload1, offload2); + } + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 0d69ce42ff581..3ffbf8971edc5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1742,4 +1743,74 @@ boolean getIsAllowAutoUpdateSchema(String namespace) */ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException; + + /** + * Set the offload configuration for all the topics in a namespace. + *

+ * Set the offload configuration in a namespace. This operation requires pulsar tenant access. + *

+ * Request parameter example: + *

+ * + *

+     * 
+     * {
+     *     "region" : "us-east-2",                   // The long term storage region
+     *     "bucket" : "bucket",                      // Bucket to place offloaded ledger into
+     *     "endpoint" : "endpoint",                  // Alternative endpoint to connect to
+     *     "maxBlockSize" : 1024,                    // Max Block Size, default 64MB
+     *     "readBufferSize" : 1024,                  // Read Buffer Size, default 1MB
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @param offloadPolicies + * Offload configuration + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws ConflictException + * Concurrent modification + * @throws PulsarAdminException + * Unexpected error + */ + void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException; + + /** + * Get the offload configuration for a namespace. + *

+ * Get the offload configuration for a namespace. + *

+ * Response example: + *

+ * + *

+     * 
+     * {
+     *     "region" : "us-east-2",                   // The long term storage region
+     *     "bucket" : "bucket",                      // Bucket to place offloaded ledger into
+     *     "endpoint" : "endpoint",                  // Alternative endpoint to connect to
+     *     "maxBlockSize" : 1024,                    // Max Block Size, default 64MB
+     *     "readBufferSize" : 1024,                  // Read Buffer Size, default 1MB
+     * }
+     * 
+     * 
+ * + * @param namespace + * Namespace name + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws ConflictException + * Concurrent modification + * @throws PulsarAdminException + * Unexpected error + */ + OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException; + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7832d0510072a..bd29057e65178 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1073,6 +1074,28 @@ public void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpda } } + @Override + public void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offloadPolicies"); + request(path).post(Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "offloadPolicies"); + return request(path).get(OffloadPolicies.class); + } catch (Exception e) { + throw getApiException(e); + } + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces; WebTarget namespacePath = base.path(namespace.toString()); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 2bbe32281ec59..453f543b1803d 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -499,6 +500,14 @@ void namespaces() throws Exception { namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1")); verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); + + namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M")); + verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", + OffloadPolicies.create("aws-s3", "test-region", "test-bucket", + "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024)); + + namespaces.run(split("get-offload-policies myprop/clust/ns1")); + verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1"); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 6159fa90e28ac..ab8b2264aeb86 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -22,6 +22,7 @@ import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.CommaParameterSplitter; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -29,9 +30,9 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.admin.cli.utils.IOUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -41,6 +42,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1341,6 +1343,129 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set the offload policies for a namespace") + private class SetOffloadPolicies extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter( + names = {"--driver", "-d"}, + description = "Driver to use to offload old data to long term storage, " + + "(Possible values: S3, aws-s3, google-cloud-storage)", + required = true) + private String driver; + + @Parameter( + names = {"--region", "-r"}, + description = "The long term storage region, " + + "default is s3ManagedLedgerOffloadRegion or gcsManagedLedgerOffloadRegion in broker.conf", + required = false) + private String region; + + @Parameter( + names = {"--bucket", "-b"}, + description = "Bucket to place offloaded ledger into", + required = true) + private String bucket; + + @Parameter( + names = {"--endpoint", "-e"}, + description = "Alternative endpoint to connect to, " + + "s3 default is s3ManagedLedgerOffloadServiceEndpoint in broker.conf", + required = false) + private String endpoint; + + @Parameter( + names = {"--maxBlockSize", "-mbs"}, + description = "Max block size (eg: 32M, 64M), default is 64MB", + required = false) + private String maxBlockSizeStr; + + @Parameter( + names = {"--readBufferSize", "-rbs"}, + description = "Read buffer size (eg: 1M, 5M), default is 1MB", + required = false) + private String readBufferSizeStr; + + private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; + + public boolean driverSupported(String driver) { + return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver)); + } + + public boolean isS3Driver(String driver) { + if (StringUtils.isEmpty(driver)) { + return false; + } + return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]); + } + + public boolean positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } + + public boolean maxValueCheck(String paramName, long value, long maxValue) { + if (value > maxValue) { + throw new ParameterException(paramName + " is not bigger than " + maxValue + "!"); + } + return true; + } + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + + if (!driverSupported(driver)) { + throw new ParameterException( + "The driver " + driver + " is not supported, " + + "(Possible values: S3, aws-s3, google-cloud-storage)."); + } + + if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) { + throw new ParameterException( + "Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set" + + " if s3 offload enabled"); + } + + int maxBlockSizeInBytes = OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + if (StringUtils.isNotEmpty(maxBlockSizeStr)) { + long maxBlockSize = validateSizeString(maxBlockSizeStr); + if (positiveCheck("MaxBlockSize", maxBlockSize) + && maxValueCheck("MaxBlockSize", maxBlockSize, Integer.MAX_VALUE)) { + maxBlockSizeInBytes = new Long(maxBlockSize).intValue(); + } + } + + int readBufferSizeInBytes = OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + if (StringUtils.isNotEmpty(readBufferSizeStr) ) { + long readBufferSize = validateSizeString(readBufferSizeStr); + if (positiveCheck("ReadBufferSize", readBufferSize) + && maxValueCheck("ReadBufferSize", readBufferSize, Integer.MAX_VALUE)) { + readBufferSizeInBytes = new Long(readBufferSize).intValue(); + } + } + + OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, + maxBlockSizeInBytes, readBufferSizeInBytes); + admin.namespaces().setOffloadPolicies(namespace, offloadPolicies); + } + } + + @Parameters(commandDescription = "Get the offload policies for a namespace") + private class GetOffloadPolicies extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getOffloadPolicies(namespace)); + } + } + public CmdNamespaces(PulsarAdmin admin) { super("namespaces", admin); jcommander.addCommand("list", new GetNamespacesPerProperty()); @@ -1447,5 +1572,8 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("get-schema-validation-enforce", new GetSchemaValidationEnforced()); jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced()); + + jcommander.addCommand("set-offload-policies", new SetOffloadPolicies()); + jcommander.addCommand("get-offload-policies", new GetOffloadPolicies()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java new file mode 100644 index 0000000000000..f46b44f031fbe --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import static org.apache.pulsar.common.util.FieldParser.value; + +import com.google.common.base.MoreObjects; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Objects; +import java.util.Properties; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +/** + * Definition of the offload policies. + */ +@Data +public class OffloadPolicies { + + public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB + public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB + public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2; + public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"}; + public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; + + // common config + private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY; + private String managedLedgerOffloadDriver = null; + private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS; + + // s3 config, set by service configuration or cli + private String s3ManagedLedgerOffloadRegion = null; + private String s3ManagedLedgerOffloadBucket = null; + private String s3ManagedLedgerOffloadServiceEndpoint = null; + private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + private int s3ManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + // s3 config, set by service configuration + private String s3ManagedLedgerOffloadRole = null; + private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload"; + + // gcs config, set by service configuration or cli + private String gcsManagedLedgerOffloadRegion = null; + private String gcsManagedLedgerOffloadBucket = null; + private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + private int gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + // gcs config, set by service configuration + private String gcsManagedLedgerOffloadServiceAccountKeyFile = null; + + // file system config, set by service configuration + private String fileSystemProfilePath = null; + private String fileSystemURI = null; + + public static OffloadPolicies create(String driver, String region, String bucket, String endpoint, + int maxBlockSizeInBytes, int readBufferSizeInBytes) { + OffloadPolicies offloadPolicies = new OffloadPolicies(); + offloadPolicies.setManagedLedgerOffloadDriver(driver); + if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) { + offloadPolicies.setS3ManagedLedgerOffloadRegion(region); + offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket); + offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint); + offloadPolicies.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes); + offloadPolicies.setS3ManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes); + } else if (driver.equalsIgnoreCase(DRIVER_NAMES[2])) { + offloadPolicies.setGcsManagedLedgerOffloadRegion(region); + offloadPolicies.setGcsManagedLedgerOffloadBucket(bucket); + offloadPolicies.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes); + offloadPolicies.setGcsManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes); + } + return offloadPolicies; + } + + public static OffloadPolicies create(Properties properties) { + OffloadPolicies data = new OffloadPolicies(); + Field[] fields = OffloadPolicies.class.getDeclaredFields(); + Arrays.stream(fields).forEach(f -> { + if (properties.containsKey(f.getName())) { + try { + f.setAccessible(true); + f.set(data, value((String) properties.get(f.getName()), f)); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("failed to initialize %s field while setting value %s", + f.getName(), properties.get(f.getName())), e); + } + } + }); + return data; + } + + public boolean driverSupported() { + return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver)); + } + + public static String getSupportedDriverNames() { + return StringUtils.join(DRIVER_NAMES, ","); + } + + public boolean isS3Driver() { + if (managedLedgerOffloadDriver == null) { + return false; + } + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[0]) + || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[1]); + } + + public boolean isGcsDriver() { + if (managedLedgerOffloadDriver == null) { + return false; + } + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[2]); + } + + public boolean isFileSystemDriver() { + if (managedLedgerOffloadDriver == null) { + return false; + } + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[3]); + } + + public boolean bucketValid() { + if (managedLedgerOffloadDriver == null) { + return false; + } + if (isS3Driver()) { + return StringUtils.isNotEmpty(s3ManagedLedgerOffloadBucket); + } else if (isGcsDriver()) { + return StringUtils.isNotEmpty(gcsManagedLedgerOffloadBucket); + } else if (isFileSystemDriver()) { + return true; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash( + managedLedgerOffloadDriver, + managedLedgerOffloadMaxThreads, + s3ManagedLedgerOffloadRegion, + s3ManagedLedgerOffloadBucket, + s3ManagedLedgerOffloadServiceEndpoint, + s3ManagedLedgerOffloadMaxBlockSizeInBytes, + s3ManagedLedgerOffloadReadBufferSizeInBytes, + s3ManagedLedgerOffloadRole, + s3ManagedLedgerOffloadRoleSessionName, + gcsManagedLedgerOffloadRegion, + gcsManagedLedgerOffloadBucket, + gcsManagedLedgerOffloadMaxBlockSizeInBytes, + gcsManagedLedgerOffloadReadBufferSizeInBytes, + gcsManagedLedgerOffloadServiceAccountKeyFile, + fileSystemProfilePath, + fileSystemURI); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + OffloadPolicies other = (OffloadPolicies) obj; + return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver()) + && Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads()) + && Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion()) + && Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket()) + && Objects.equals(s3ManagedLedgerOffloadServiceEndpoint, + other.getS3ManagedLedgerOffloadServiceEndpoint()) + && Objects.equals(s3ManagedLedgerOffloadMaxBlockSizeInBytes, + other.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()) + && Objects.equals(s3ManagedLedgerOffloadReadBufferSizeInBytes, + other.getS3ManagedLedgerOffloadReadBufferSizeInBytes()) + && Objects.equals(s3ManagedLedgerOffloadRole, other.getS3ManagedLedgerOffloadRole()) + && Objects.equals(s3ManagedLedgerOffloadRoleSessionName, + other.getS3ManagedLedgerOffloadRoleSessionName()) + && Objects.equals(gcsManagedLedgerOffloadRegion, other.getGcsManagedLedgerOffloadRegion()) + && Objects.equals(gcsManagedLedgerOffloadBucket, other.getGcsManagedLedgerOffloadBucket()) + && Objects.equals(gcsManagedLedgerOffloadMaxBlockSizeInBytes, + other.getGcsManagedLedgerOffloadMaxBlockSizeInBytes()) + && Objects.equals(gcsManagedLedgerOffloadReadBufferSizeInBytes, + other.getGcsManagedLedgerOffloadReadBufferSizeInBytes()) + && Objects.equals(gcsManagedLedgerOffloadServiceAccountKeyFile, + other.getGcsManagedLedgerOffloadServiceAccountKeyFile()) + && Objects.equals(fileSystemProfilePath, other.getFileSystemProfilePath()) + && Objects.equals(fileSystemURI, other.getFileSystemURI()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver) + .add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads) + .add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion) + .add("s3ManagedLedgerOffloadBucket", s3ManagedLedgerOffloadBucket) + .add("s3ManagedLedgerOffloadServiceEndpoint", s3ManagedLedgerOffloadServiceEndpoint) + .add("s3ManagedLedgerOffloadMaxBlockSizeInBytes", s3ManagedLedgerOffloadMaxBlockSizeInBytes) + .add("s3ManagedLedgerOffloadReadBufferSizeInBytes", s3ManagedLedgerOffloadReadBufferSizeInBytes) + .add("s3ManagedLedgerOffloadRole", s3ManagedLedgerOffloadRole) + .add("s3ManagedLedgerOffloadRoleSessionName", s3ManagedLedgerOffloadRoleSessionName) + .add("gcsManagedLedgerOffloadRegion", gcsManagedLedgerOffloadRegion) + .add("gcsManagedLedgerOffloadBucket", gcsManagedLedgerOffloadBucket) + .add("gcsManagedLedgerOffloadMaxBlockSizeInBytes", gcsManagedLedgerOffloadMaxBlockSizeInBytes) + .add("gcsManagedLedgerOffloadReadBufferSizeInBytes", gcsManagedLedgerOffloadReadBufferSizeInBytes) + .add("gcsManagedLedgerOffloadServiceAccountKeyFile", gcsManagedLedgerOffloadServiceAccountKeyFile) + .add("fileSystemProfilePath", fileSystemProfilePath) + .add("fileSystemURI", fileSystemURI) + .toString(); + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 175503adab466..4b2029b36f129 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -102,6 +102,9 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public boolean schema_validation_enforced = false; + @SuppressWarnings("checkstyle:MemberName") + public OffloadPolicies offload_policies = null; + @Override public int hashCode() { return Objects.hash(auth_policies, replication_clusters, @@ -120,7 +123,8 @@ public int hashCode() { schema_auto_update_compatibility_strategy, schema_validation_enforced, schema_compatibility_strategy, - is_allow_auto_update_schema); + is_allow_auto_update_schema, + offload_policies); } @Override @@ -157,7 +161,8 @@ public boolean equals(Object obj) { && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy && schema_validation_enforced == other.schema_validation_enforced && schema_compatibility_strategy == other.schema_compatibility_strategy - && is_allow_auto_update_schema == other.is_allow_auto_update_schema; + && is_allow_auto_update_schema == other.is_allow_auto_update_schema + && Objects.equals(offload_policies, other.offload_policies); } return false; @@ -209,6 +214,7 @@ public String toString() { .add("schema_auto_update_compatibility_strategy", schema_auto_update_compatibility_strategy) .add("schema_validation_enforced", schema_validation_enforced) .add("schema_compatibility_Strategy", schema_compatibility_strategy) - .add("is_allow_auto_update_Schema", is_allow_auto_update_schema).toString(); + .add("is_allow_auto_update_Schema", is_allow_auto_update_schema) + .add("offload_policies", offload_policies).toString(); } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 1af86a2225a06..a29aabd155879 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -25,6 +25,9 @@ import io.airlift.log.Logger; import java.io.IOException; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -37,8 +40,11 @@ import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * Implementation of a cache for the Pulsar connector. @@ -55,7 +61,8 @@ public class PulsarConnectorCache { private final StatsProvider statsProvider; private OrderedScheduler offloaderScheduler; private Offloaders offloaderManager; - private LedgerOffloader offloader; + private LedgerOffloader defaultOffloader; + private Map offloaderMap = new ConcurrentHashMap<>(); private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory"; private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver"; @@ -74,7 +81,9 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws this.statsProvider.start(clientConfiguration); - this.offloader = initManagedLedgerOffloader(pulsarConnectorConfig); + OffloadPolicies offloadPolicies = new OffloadPolicies(); + BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig); + this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies); } public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { @@ -108,46 +117,55 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig); } - public ManagedLedgerConfig getManagedLedgerConfig() { - - return new ManagedLedgerConfig() - .setLedgerOffloader(this.offloader); + public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies) { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + if (offloadPolicies == null) { + managedLedgerConfig.setLedgerOffloader(this.defaultOffloader); + } else { + LedgerOffloader ledgerOffloader = offloaderMap.compute(namespaceName, + (ns, offloader) -> { + if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) { + return offloader; + } else { + if (offloader != null) { + offloader.close(); + } + return initManagedLedgerOffloader(offloadPolicies); + } + }); + managedLedgerConfig.setLedgerOffloader(ledgerOffloader); + } + return managedLedgerConfig; } - private synchronized OrderedScheduler getOffloaderScheduler(PulsarConnectorConfig pulsarConnectorConfig) { + private synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() - .numThreads(pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()) + .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) .name("pulsar-offloader").build(); } return this.offloaderScheduler; } - private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) { + private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies) { try { - if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) { - checkNotNull(conf.getOffloadersDirectory(), + if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) { + checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", - conf.getManagedLedgerOffloadDriver()); - this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()); + offloadPolicies.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory()); LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( - conf.getManagedLedgerOffloadDriver()); - - Map offloaderProperties = conf.getOffloaderProperties(); - offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory()); - offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver()); - offloaderProperties - .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads())); + offloadPolicies.getManagedLedgerOffloadDriver()); try { return offloaderFactory.create( - PulsarConnectorUtils.getProperties(offloaderProperties), + offloadPolicies, ImmutableMap.of( LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() ), - getOffloaderScheduler(conf)); + getOffloaderScheduler(offloadPolicies)); } catch (IOException ioe) { log.error("Failed to create offloader: ", ioe); throw new RuntimeException(ioe.getMessage(), ioe.getCause()); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 4d5a25db836d6..eca99af4dbb4b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.api.raw.RawMessage; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.sql.presto.PulsarInternalColumn.PartitionColumn; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -110,8 +111,18 @@ public PulsarRecordCursor(List columnHandles, PulsarSplit pu close(); throw new RuntimeException(e); } + + OffloadPolicies offloadPolicies = pulsarSplit.getOffloadPolicies(); + if (offloadPolicies != null) { + offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory()); + offloadPolicies.setManagedLedgerOffloadMaxThreads( + pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()); + } initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, - pulsarConnectorCache.getManagedLedgerFactory(), pulsarConnectorCache.getManagedLedgerConfig(), + pulsarConnectorCache.getManagedLedgerFactory(), + pulsarConnectorCache.getManagedLedgerConfig( + TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), + pulsarSplit.getTableName()).getNamespaceObject(), offloadPolicies), new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider())); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index eeebbd174dd3f..dbebbf54a3eba 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -55,6 +56,8 @@ public class PulsarSplit implements ConnectorSplit { private final PositionImpl startPosition; private final PositionImpl endPosition; + private final OffloadPolicies offloadPolicies; + @JsonCreator public PulsarSplit( @JsonProperty("splitId") long splitId, @@ -69,7 +72,8 @@ public PulsarSplit( @JsonProperty("startPositionLedgerId") long startPositionLedgerId, @JsonProperty("endPositionLedgerId") long endPositionLedgerId, @JsonProperty("tupleDomain") TupleDomain tupleDomain, - @JsonProperty("properties") Map schemaInfoProperties) { + @JsonProperty("properties") Map schemaInfoProperties, + @JsonProperty("offloadPolicies") OffloadPolicies offloadPolicies) { this.splitId = splitId; requireNonNull(schemaName, "schema name is null"); this.schemaInfo = SchemaInfo.builder() @@ -91,6 +95,7 @@ public PulsarSplit( this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null"); this.startPosition = PositionImpl.get(startPositionLedgerId, startPositionEntryId); this.endPosition = PositionImpl.get(endPositionLedgerId, endPositionEntryId); + this.offloadPolicies = offloadPolicies; } @JsonProperty @@ -161,6 +166,11 @@ public PositionImpl getEndPosition() { return endPosition; } + @JsonProperty + public OffloadPolicies getOffloadPolicies() { + return offloadPolicies; + } + @Override public boolean isRemotelyAccessible() { return true; @@ -190,6 +200,7 @@ public String toString() { + ", endPositionEntryId=" + endPositionEntryId + ", startPositionLedgerId=" + startPositionLedgerId + ", endPositionLedgerId=" + endPositionLedgerId + + (offloadPolicies == null ? "" : offloadPolicies.toString()) + '}'; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 06bc1200146f7..848b9ccd1789c 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -59,6 +59,7 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -122,11 +123,15 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand Collection splits; try { + OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces() + .getOffloadPolicies(topicName.getNamespace()); if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) { - splits = getSplitsNonPartitionedTopic(numSplits, topicName, tableHandle, schemaInfo, tupleDomain); + splits = getSplitsNonPartitionedTopic( + numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies); log.debug("Splits for non-partitioned topic %s: %s", topicName, splits); } else { - splits = getSplitsPartitionedTopic(numSplits, topicName, tableHandle, schemaInfo, tupleDomain); + splits = getSplitsPartitionedTopic( + numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies); log.debug("Splits for partitioned topic %s: %s", topicName, splits); } } catch (Exception e) { @@ -138,7 +143,8 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand @VisibleForTesting Collection getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle - tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain) throws Exception { + tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain, + OffloadPolicies offloadPolicies) throws Exception { List predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain); if (log.isDebugEnabled()) { @@ -165,7 +171,8 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic tableHandle, schemaInfo, topicName.getPartition(predicatedPartitions.get(i)).getLocalName(), - tupleDomain)); + tupleDomain, + offloadPolicies)); } return splits; } @@ -219,8 +226,8 @@ private List getPredicatedPartitions(TopicName topicName, TupleDomain getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, - PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain) - throws Exception { + PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain, + OffloadPolicies offloadPolicies) throws Exception { ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig) .getManagedLedgerFactory(); @@ -230,7 +237,9 @@ Collection getSplitsNonPartitionedTopic(int numSplits, TopicName to numSplits, tableHandle, schemaInfo, - tableHandle.getTableName(), tupleDomain); + tableHandle.getTableName(), + tupleDomain, + offloadPolicies); } @VisibleForTesting @@ -239,7 +248,8 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, int numSplits, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, String tableName, - TupleDomain tupleDomain) + TupleDomain tupleDomain, + OffloadPolicies offloadPolicies) throws ManagedLedgerException, InterruptedException { ReadOnlyCursor readOnlyCursor = null; @@ -296,7 +306,8 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, startPosition.getLedgerId(), endPosition.getLedgerId(), tupleDomain, - schemaInfo.getProperties())); + schemaInfo.getProperties(), + offloadPolicies)); } return splits; } finally { diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index cd81442db6b8b..6c911a407541a 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -561,7 +561,7 @@ public static class Boo { new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()), topicsToSchemas.get(topicName.getSchemaName()).getType(), 0, topicsToNumEntries.get(topicName.getSchemaName()), - 0, 0, TupleDomain.all(), new HashMap<>())); + 0, 0, TupleDomain.all(), new HashMap<>(), null)); } } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index ef361487a054f..5ecbffc307764 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -87,7 +87,7 @@ public void testTopic(String delimiter) throws Exception { PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all()); final ResultCaptor> resultCaptor = new ResultCaptor<>(); - doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any()); + doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( @@ -95,7 +95,7 @@ public void testTopic(String delimiter) throws Exception { pulsarTableLayoutHandle, null); verify(this.pulsarSplitManager, times(1)) - .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any()); + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); int totalSize = 0; for (PulsarSplit pulsarSplit : resultCaptor.getResult()) { @@ -134,13 +134,13 @@ public void testPartitionedTopic(String delimiter) throws Exception { PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all()); final ResultCaptor> resultCaptor = new ResultCaptor<>(); - doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any()); + doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); this.pulsarSplitManager.getSplits(mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), pulsarTableLayoutHandle, null); verify(this.pulsarSplitManager, times(1)) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); int partitions = partitionedTopicsToPartitions.get(topicName.toString()); @@ -200,8 +200,8 @@ public void testPublishTimePredicatePushdown(String delimiter) throws Exception PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain); final ResultCaptor> resultCaptor = new ResultCaptor<>(); - doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any - (), any()); + doAnswer(resultCaptor).when(this.pulsarSplitManager) + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), @@ -209,7 +209,7 @@ public void testPublishTimePredicatePushdown(String delimiter) throws Exception verify(this.pulsarSplitManager, times(1)) - .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any()); + .getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); int totalSize = 0; int initalStart = 1; @@ -258,7 +258,7 @@ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) t final ResultCaptor> resultCaptor = new ResultCaptor<>(); doAnswer(resultCaptor).when(this.pulsarSplitManager) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits( mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class), @@ -266,7 +266,7 @@ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) t verify(this.pulsarSplitManager, times(1)) - .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any()); + .getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any()); int partitions = partitionedTopicsToPartitions.get(topicName.toString()); @@ -315,7 +315,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); TupleDomain tupleDomain = TupleDomain.withColumnDomains(domainMap); Collection splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 2); } @@ -332,7 +332,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(1, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 2); } @@ -348,7 +348,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 3); } @@ -367,7 +367,7 @@ public void testPartitionFilter(String delimiter) throws Exception { domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain); tupleDomain = TupleDomain.withColumnDomains(domainMap); splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle, - schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain); + schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null); if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) { Assert.assertEquals(splits.size(), 4); } diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 3eb1d5cf08998..c52f87bad31f3 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -882,6 +882,8 @@ Subcommands * `clear-offload-deletion-lag` * `get-schema-autoupdate-strategy` * `set-schema-autoupdate-strategy` +* `set-offload-policies` +* `get-offload-policies` ### `list` @@ -2292,3 +2294,28 @@ Options |`-t`, `--type`|The type of the schema (avro or json)|| +### `get-offload-policies` +Get the offload policy for a namespace + +Usage +```bash +$ pulsar-admin namespaces get-offload-policies tenant/namespace +``` + +### `set-offload-policies` +Set the offload policy for a namespace + +Usage +```bash +$ pulsar-admin namespaces set-offload-policies tenant/namespace +``` + +Options +|Flag|Description|Default| +|----|---|---| +|`-d`, `--driver`|Driver to use to offload old data to long term storage,(Possible values: S3, aws-s3, google-cloud-storage)|| +|`-r`, `--region`|The long term storage region|| +|`-b`, `--bucket`|Bucket to place offloaded ledger into|| +|`-e`, `--endpoint`|Alternative endpoint to connect to|| +|`-mbs`, `--maxBlockSize`|Max block size|64MB| +|`-rbs`, `--readBufferSize`|Read buffer size|1MB| diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java deleted file mode 100644 index 899887b6f8fbf..0000000000000 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.mledger.offload.filesystem; - -import lombok.Data; - -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Properties; - -import static org.apache.pulsar.common.util.FieldParser.value; - -/** - * Configuration for file system. - */ -@Data -public class FileSystemConfigurationData implements Serializable, Cloneable { - - /**** --- Ledger Offloading --- ****/ - // Driver to use to offload old data to long term storage - private String managedLedgerOffloadDriver = null; - - private String fileSystemProfilePath = null; - - private String fileSystemURI = null; - - private int managedLedgerOffloadMaxThreads = 2; - - /** - * Create a tiered storage configuration from the provided properties. - * - * @param properties the configuration properties - * @return tiered storage configuration - */ - public static FileSystemConfigurationData create(Properties properties) { - FileSystemConfigurationData data = new FileSystemConfigurationData(); - Field[] fields = FileSystemConfigurationData.class.getDeclaredFields(); - Arrays.stream(fields).forEach(f -> { - if (properties.containsKey(f.getName())) { - try { - f.setAccessible(true); - f.set(data, value((String) properties.get(f.getName()), f)); - } catch (Exception e) { - throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", - f.getName(), properties.get(f.getName())), e); - } - } - }); - return data; - } -} diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java index cd52197a59bb5..cf86f38e1bedb 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java @@ -21,6 +21,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import java.io.IOException; import java.util.Map; @@ -33,8 +34,8 @@ public boolean isDriverSupported(String driverName) { } @Override - public FileSystemManagedLedgerOffloader create(Properties properties, Map userMetadata, OrderedScheduler scheduler) throws IOException { - FileSystemConfigurationData data = FileSystemConfigurationData.create(properties); - return FileSystemManagedLedgerOffloader.create(data, scheduler); + public FileSystemManagedLedgerOffloader create(OffloadPolicies offloadPolicies, + Map userMetadata, OrderedScheduler scheduler) throws IOException { + return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler); } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 309076c4c3605..eda747ded8712 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -29,7 +29,6 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory; -import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemConfigurationData; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.DataFormats; import org.apache.hadoop.conf.Configuration; @@ -39,6 +38,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +69,8 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader { private OrderedScheduler scheduler; private static final long ENTRIES_PER_READ = 100; private OrderedScheduler assignmentScheduler; + private OffloadPolicies offloadPolicies; + public static boolean driverSupported(String driver) { return DRIVER_NAMES.equals(driver); } @@ -77,11 +79,12 @@ public String getOffloadDriverName() { return driverName; } - public static FileSystemManagedLedgerOffloader create(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException { + public static FileSystemManagedLedgerOffloader create(OffloadPolicies conf, OrderedScheduler scheduler) throws IOException { return new FileSystemManagedLedgerOffloader(conf, scheduler); } - private FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException { + private FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler scheduler) throws IOException { + this.offloadPolicies = conf; this.configuration = new Configuration(); if (conf.getFileSystemProfilePath() != null) { String[] paths = conf.getFileSystemProfilePath().split(","); @@ -110,7 +113,8 @@ private FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, Order .name("offload-assignment").build(); } @VisibleForTesting - public FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException { + public FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException { + this.offloadPolicies = conf; this.configuration = new Configuration(); this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); this.configuration.set("fs.defaultFS", testHDFSPath); @@ -328,4 +332,20 @@ private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { return builder.build().toByteArray(); } + + @Override + public OffloadPolicies getOffloadPolicies() { + return offloadPolicies; + } + + @Override + public void close() { + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (Exception e) { + log.error("FileSystemManagedLedgerOffloader close failed!", e); + } + } + } } diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java index 4abff16443090..b014fd23860b6 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -48,7 +49,7 @@ public void start() throws Exception { hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/"; Properties properties = new Properties(); fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader( - FileSystemConfigurationData.create(properties), + OffloadPolicies.create(properties), scheduler, hdfsURI, basePath); } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/CredentialsUtil.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/CredentialsUtil.java new file mode 100644 index 0000000000000..4971a6d0e1b70 --- /dev/null +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/CredentialsUtil.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.google.common.base.Strings; +import org.apache.pulsar.common.policies.data.OffloadPolicies; + +public class CredentialsUtil { + + /** + * Builds an AWS credential provider based on the offload options + * @return aws credential provider + */ + public static AWSCredentialsProvider getAWSCredentialProvider(OffloadPolicies offloadPolicies) { + if (Strings.isNullOrEmpty(offloadPolicies.getS3ManagedLedgerOffloadRole())) { + return DefaultAWSCredentialsProviderChain.getInstance(); + } else { + String roleName = offloadPolicies.getS3ManagedLedgerOffloadRole(); + String roleSessionName = offloadPolicies.getS3ManagedLedgerOffloadRoleSessionName(); + return new STSAssumeRoleSessionCredentialsProvider.Builder(roleName, roleSessionName).build(); + } + } + +} diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index dffe253f94a98..20fac68f1f822 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -20,10 +20,10 @@ import java.io.IOException; import java.util.Map; -import java.util.Properties; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; /** * A jcloud based offloader factory. @@ -42,10 +42,9 @@ public boolean isDriverSupported(String driverName) { } @Override - public BlobStoreManagedLedgerOffloader create(Properties properties, + public BlobStoreManagedLedgerOffloader create(OffloadPolicies offloadPolicies, Map userMetadata, OrderedScheduler scheduler) throws IOException { - TieredStorageConfigurationData data = TieredStorageConfigurationData.create(properties); - return BlobStoreManagedLedgerOffloader.create(data, userMetadata, scheduler); + return BlobStoreManagedLedgerOffloader.create(offloadPolicies, userMetadata, scheduler); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java deleted file mode 100644 index a4c5cf4fa8b2e..0000000000000 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.mledger.offload.jcloud; - -import static org.apache.pulsar.common.util.FieldParser.value; - -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; -import com.google.common.base.Strings; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Properties; -import lombok.Data; - -/** - * Configuration for tiered storage. - */ -@Data -public class TieredStorageConfigurationData implements Serializable, Cloneable { - - /**** --- Ledger Offloading --- ****/ - // Driver to use to offload old data to long term storage - private String managedLedgerOffloadDriver = null; - - // Maximum number of thread pool threads for ledger offloading - private int managedLedgerOffloadMaxThreads = 2; - - // For Amazon S3 ledger offload, AWS region - private String s3ManagedLedgerOffloadRegion = null; - - // For Amazon S3 ledger offload, Bucket to place offloaded ledger into - private String s3ManagedLedgerOffloadBucket = null; - - // For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) - private String s3ManagedLedgerOffloadServiceEndpoint = null; - - // For Amazon S3 ledger offload, Max block size in bytes. - private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB - - // For Amazon S3 ledger offload, Read buffer size in bytes. - private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB - - // For Amazon S3 ledger offload, provide a role to assume before writing to s3 - private String s3ManagedLedgerOffloadRole = null; - - // For Amazon S3 ledger offload, provide a role session name when using a role - private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload"; - - // For Google Cloud Storage ledger offload, region where offload bucket is located. - // reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations - private String gcsManagedLedgerOffloadRegion = null; - - // For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into - private String gcsManagedLedgerOffloadBucket = null; - - // For Google Cloud Storage ledger offload, Max block size in bytes. - private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB - - // For Google Cloud Storage ledger offload, Read buffer size in bytes. - private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB - - // For Google Cloud Storage, path to json file containing service account credentials. - // For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849 - private String gcsManagedLedgerOffloadServiceAccountKeyFile = null; - - /** - * Builds an AWS credential provider based on the offload options - * @return aws credential provider - */ - public AWSCredentialsProvider getAWSCredentialProvider() { - if (Strings.isNullOrEmpty(this.getS3ManagedLedgerOffloadRole())) { - return DefaultAWSCredentialsProviderChain.getInstance(); - } else { - String roleName = this.getS3ManagedLedgerOffloadRole(); - String roleSessionName = this.getS3ManagedLedgerOffloadRoleSessionName(); - return new STSAssumeRoleSessionCredentialsProvider.Builder(roleName, roleSessionName).build(); - } - } - - /** - * Create a tiered storage configuration from the provided properties. - * - * @param properties the configuration properties - * @return tiered storage configuration - */ - public static TieredStorageConfigurationData create(Properties properties) { - TieredStorageConfigurationData data = new TieredStorageConfigurationData(); - Field[] fields = TieredStorageConfigurationData.class.getDeclaredFields(); - Arrays.stream(fields).forEach(f -> { - if (properties.containsKey(f.getName())) { - try { - f.setAccessible(true); - f.set(data, value((String) properties.get(f.getName()), f)); - } catch (Exception e) { - throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s", - f.getName(), properties.get(f.getName())), e); - } - } - }); - return data; - } - -} diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 7f7acaf25ccd6..0c92b96695e72 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -44,10 +44,11 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream; +import org.apache.bookkeeper.mledger.offload.jcloud.CredentialsUtil; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; -import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.jclouds.Constants; import org.jclouds.ContextBuilder; @@ -181,16 +182,19 @@ private static Pair createBlobStore(String driver, // offload driver metadata to be stored as part of the original ledger metadata private final String offloadDriverName; + private static OffloadPolicies offloadPolicies; + @VisibleForTesting - static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationData conf, + static BlobStoreManagedLedgerOffloader create(OffloadPolicies conf, OrderedScheduler scheduler) throws IOException { return create(conf, Maps.newHashMap(), scheduler); } - public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationData conf, + public static BlobStoreManagedLedgerOffloader create(OffloadPolicies conf, Map userMetadata, OrderedScheduler scheduler) throws IOException { + offloadPolicies = conf; String driver = conf.getManagedLedgerOffloadDriver(); if (!driverSupported(driver)) { throw new IOException( @@ -232,7 +236,8 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationD maxBlockSize, readBufferSize, endpoint, region, credentials, userMetadata); } - public static Supplier getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException { + public static Supplier getCredentials(String driver, + OffloadPolicies conf) throws IOException { // credentials: // for s3, get by DefaultAWSCredentialsProviderChain. // for gcs, use downloaded file 'google_creds.json', which contains service account key by @@ -252,7 +257,7 @@ public static Supplier getCredentials(String driver, TieredStorageC throw new IOException(ioe); } } else if (isS3Driver(driver)) { - AWSCredentialsProvider credsChain = conf.getAWSCredentialProvider(); + AWSCredentialsProvider credsChain = CredentialsUtil.getAWSCredentialProvider(conf); // try and get creds before starting... if we can't fetch // creds on boot, we want to fail try { @@ -587,6 +592,23 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, public interface VersionCheck { void check(String key, Blob blob) throws IOException; } + + @Override + public OffloadPolicies getOffloadPolicies() { + return offloadPolicies; + } + + @Override + public void close() { + if (writeBlobStore != null) { + writeBlobStore.getContext().close(); + } + for (BlobStore readBlobStore : readBlobStores.values()) { + if (readBlobStore != null) { + readBlobStore.getContext().close(); + } + } + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java index 16784a553be9e..baefb95a13b00 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java @@ -44,7 +44,7 @@ import org.testng.annotations.Test; @Slf4j -class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { +public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class); class RandomInputStream extends InputStream { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java index 183424177e9b7..2725c051d1269 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreTestBase.java @@ -21,11 +21,21 @@ import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.IObjectFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.ObjectFactory; + +@PrepareForTest({CredentialsUtil.class}) +@PowerMockIgnore({ + "org.apache.logging.log4j.*", + "org.apache.pulsar.jcloud.shade.com.google.common.*", + "org.jclouds.*"}) public class BlobStoreTestBase { private static final Logger log = LoggerFactory.getLogger(BlobStoreTestBase.class); @@ -79,4 +89,10 @@ public void tearDown() { } } + @ObjectFactory + // Necessary to make PowerMockito.mockStatic work with TestNG. + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index a3c4972365007..8aa977a0e8366 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -31,7 +31,6 @@ import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -53,8 +52,9 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase; -import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; +import org.apache.bookkeeper.mledger.offload.jcloud.CredentialsUtil; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; @@ -64,13 +64,14 @@ import org.jclouds.blobstore.options.CopyOptions; import org.jclouds.domain.Credentials; import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; import org.testng.collections.Maps; -class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { +public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class); private static MockZooKeeper createMockZooKeeper() throws Exception { @@ -90,7 +91,7 @@ private static MockZooKeeper createMockZooKeeper() throws Exception { final OrderedScheduler scheduler; final PulsarMockBookKeeper bk; - BlobStoreManagedLedgerOffloaderTest() throws Exception { + public BlobStoreManagedLedgerOffloaderTest() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this)); } @@ -154,7 +155,7 @@ public void testBucketDoesNotExist() throws Exception { @Test public void testNoRegionConfigured() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); @@ -168,7 +169,7 @@ public void testNoRegionConfigured() throws Exception { @Test public void testNoBucketConfigured() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); @@ -182,7 +183,7 @@ public void testNoBucketConfigured() throws Exception { @Test public void testSmallBlockSizeConfigured() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); @@ -198,7 +199,7 @@ public void testSmallBlockSizeConfigured() throws Exception { @Test public void testGcsNoKeyPath() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("google-cloud-storage"); conf.setGcsManagedLedgerOffloadBucket(BUCKET); @@ -213,7 +214,7 @@ public void testGcsNoKeyPath() throws Exception { @Test public void testGcsNoBucketConfigured() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("google-cloud-storage"); File tmpKeyFile = File.createTempFile("gcsOffload", "json"); conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath()); @@ -229,7 +230,7 @@ public void testGcsNoBucketConfigured() throws Exception { @Test public void testGcsSmallBlockSizeConfigured() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData(); + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("google-cloud-storage"); File tmpKeyFile = File.createTempFile("gcsOffload", "json"); conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath()); @@ -247,37 +248,35 @@ public void testGcsSmallBlockSizeConfigured() throws Exception { @Test public void testS3DriverConfiguredWell() throws Exception { - TieredStorageConfigurationData conf = new TieredStorageConfigurationData() { + PowerMockito.mockStatic(CredentialsUtil.class); + PowerMockito.when(CredentialsUtil.getAWSCredentialProvider(any())).thenReturn(new AWSCredentialsProvider() { @Override - public AWSCredentialsProvider getAWSCredentialProvider() { - return new AWSCredentialsProvider() { + public AWSCredentials getCredentials() { + return new AWSSessionCredentials() { @Override - public AWSCredentials getCredentials() { - return new AWSSessionCredentials() { - @Override - public String getSessionToken() { - return "token"; - } - - @Override - public String getAWSAccessKeyId() { - return "access"; - } - - @Override - public String getAWSSecretKey() { - return "secret"; - } - }; + public String getSessionToken() { + return "token"; } @Override - public void refresh() { + public String getAWSAccessKeyId() { + return "access"; + } + @Override + public String getAWSSecretKey() { + return "secret"; } }; } - }; + + @Override + public void refresh() { + + } + }); + + OffloadPolicies conf = new OffloadPolicies(); conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); conf.setS3ManagedLedgerOffloadServiceEndpoint("http://fake.s3.end.point"); @@ -644,8 +643,8 @@ public void testReadUnknownIndexVersion() throws Exception { @Test public void testSessionCredentialSupplier() throws Exception { - TieredStorageConfigurationData mock = mock(TieredStorageConfigurationData.class); - Mockito.when(mock.getAWSCredentialProvider()).thenReturn(new AWSCredentialsProvider() { + PowerMockito.mockStatic(CredentialsUtil.class); + PowerMockito.when(CredentialsUtil.getAWSCredentialProvider(any())).thenReturn(new AWSCredentialsProvider() { @Override public AWSCredentials getCredentials() { return new AWSSessionCredentials() { @@ -672,7 +671,7 @@ public void refresh() { } }); - Supplier creds = BlobStoreManagedLedgerOffloader.getCredentials("aws-s3", mock); + Supplier creds = BlobStoreManagedLedgerOffloader.getCredentials("aws-s3", any()); Assert.assertTrue(creds.get() instanceof SessionCredentials); SessionCredentials sessCreds = (SessionCredentials) creds.get();