From b0945d1d45d1e911d24151a23cf284e476203ba7 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Wed, 19 Oct 2022 21:02:19 -0700 Subject: [PATCH] [feat] [broker] PIP-188 support blue-green cluster migration [part-1] (#17962) * [feat][PIP-188] support blue-green cluster migration [part-1] Add blue-green cluster migration Fix dependency * cleanup --- conf/broker.conf | 4 + .../bookkeeper/mledger/ManagedLedger.java | 7 + .../mledger/impl/ManagedLedgerImpl.java | 33 +- .../pulsar/broker/ServiceConfiguration.java | 7 + .../broker/admin/impl/ClustersBase.java | 62 ++++ .../service/AbstractBaseDispatcher.java | 13 + .../pulsar/broker/service/AbstractTopic.java | 31 +- .../pulsar/broker/service/BrokerService.java | 11 + .../service/BrokerServiceException.java | 10 + .../pulsar/broker/service/Consumer.java | 13 + .../pulsar/broker/service/Producer.java | 11 + .../broker/service/PulsarCommandSender.java | 3 + .../service/PulsarCommandSenderImpl.java | 13 + .../pulsar/broker/service/ServerCnx.java | 19 +- .../apache/pulsar/broker/service/Topic.java | 2 + .../nonpersistent/NonPersistentTopic.java | 40 ++- .../persistent/CompactorSubscription.java | 4 +- ...PersistentDispatcherMultipleConsumers.java | 2 +- ...sistentDispatcherSingleActiveConsumer.java | 2 +- ...tStreamingDispatcherMultipleConsumers.java | 3 +- ...reamingDispatcherSingleActiveConsumer.java | 2 +- .../persistent/PersistentSubscription.java | 5 +- .../service/persistent/PersistentTopic.java | 23 +- .../intercept/CounterBrokerInterceptor.java | 2 +- .../broker/service/ClusterMigrationTest.java | 329 ++++++++++++++++++ pulsar-client-admin-api/pom.xml | 1 + .../apache/pulsar/client/admin/Clusters.java | 37 ++ .../common/policies/data/ClusterData.java | 23 ++ .../client/admin/internal/ClustersImpl.java | 14 + .../client/api/PulsarClientException.java | 16 + .../apache/pulsar/admin/cli/CmdClusters.java | 24 ++ .../apache/pulsar/client/impl/ClientCnx.java | 23 ++ .../pulsar/client/impl/ConnectionHandler.java | 7 +- .../pulsar/client/impl/HandlerState.java | 9 + pulsar-common/pom.xml | 5 + .../common/policies/data/ClusterDataImpl.java | 27 +- .../pulsar/common/protocol/Commands.java | 11 + .../pulsar/common/protocol/PulsarDecoder.java | 10 + pulsar-common/src/main/proto/PulsarApi.proto | 17 + site2/docs/reference-pulsar-admin.md | 18 + .../jcloud/impl/MockManagedLedger.java | 12 + 41 files changed, 886 insertions(+), 19 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java diff --git a/conf/broker.conf b/conf/broker.conf index 44cf26d21707b..f5e4be9d6f390 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1461,6 +1461,10 @@ splitTopicAndPartitionLabelInPrometheus=false # Otherwise, aggregate it by list index. aggregatePublisherStatsByProducerName=false +# Interval between checks to see if cluster is migrated and marks topic migrated +# if cluster is marked migrated. Disable with value 0. (Default disabled). +clusterMigrationCheckDurationSeconds=0 + ### --- Schema storage --- ### # The schema storage implementation used by this broker schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index c5de804b1379d..7fcbfdc8b476c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -443,6 +443,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map asyncMigrate(); + /** * Terminate the managed ledger and return the last committed entry. * @@ -534,6 +536,11 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); protected volatile State state = null; + private volatile boolean migrated = false; @Getter private final OrderedScheduler scheduledExecutor; @@ -343,7 +346,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config); this.mlOwnershipChecker = mlOwnershipChecker; - this.propertiesMap = new HashMap(); + this.propertiesMap = new ConcurrentHashMap<>(); this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs(); if (config.getManagedLedgerInterceptor() != null) { this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); @@ -367,7 +370,6 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); log.info("[{}] Recovering managed ledger terminated at {}", name, lastConfirmedEntry); } - for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { ledgers.put(ls.getLedgerId(), ls); } @@ -379,6 +381,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { propertiesMap.put(property.getKey(), property.getValue()); } } + migrated = mlInfo.hasTerminatedPosition() && propertiesMap.containsKey(MIGRATION_STATE_PROPERTY); if (managedLedgerInterceptor != null) { managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); } @@ -1271,6 +1274,27 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum } } + public CompletableFuture asyncMigrate() { + propertiesMap.put(MIGRATION_STATE_PROPERTY, Boolean.TRUE.toString()); + CompletableFuture result = new CompletableFuture<>(); + asyncTerminate(new TerminateCallback() { + + @Override + public void terminateComplete(Position lastCommittedPosition, Object ctx) { + migrated = true; + log.info("[{}] topic successfully terminated and migrated at {}", name, lastCommittedPosition); + result.complete(lastCommittedPosition); + } + + @Override + public void terminateFailed(ManagedLedgerException exception, Object ctx) { + log.info("[{}] topic failed to terminate and migrate ", name, exception); + result.completeExceptionally(exception); + } + }, null); + return result; + } + @Override public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) { if (state == State.Fenced) { @@ -1363,6 +1387,11 @@ public boolean isTerminated() { return state == State.Terminated; } + @Override + public boolean isMigrated() { + return migrated; + } + @Override public void close() throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6e327fd8b2609..8ba741801043b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2514,6 +2514,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Interval between checks to see if cluster is migrated and marks topic migrated " + + " if cluster is marked migrated. Disable with value 0. (Default disabled)." + ) + private int clusterMigrationCheckDurationSeconds = 0; + @FieldContext( category = CATEGORY_SCHEMA, doc = "Enforce schema validation on following cases:\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 597e191f11cf3..9e2c7c6b06c19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -41,6 +41,7 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @@ -59,6 +60,7 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; @@ -229,6 +231,66 @@ public void updateCluster( }); } + @POST + @Path("/{cluster}/migrate") + @ApiOperation( + value = "Update the configuration for a cluster migration.", + notes = "This operation requires Pulsar superuser privileges.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Cluster has been updated."), + @ApiResponse(code = 400, message = "Cluster url must not be empty."), + @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), + @ApiResponse(code = 404, message = "Cluster doesn't exist."), + @ApiResponse(code = 500, message = "Internal server error.") + }) + public void updateClusterMigration( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster, + @ApiParam(value = "Is cluster migrated", required = true) + @QueryParam("migrated") boolean isMigrated, + @ApiParam( + value = "The cluster url data", + required = true, + examples = @Example( + value = @ExampleProperty( + mediaType = MediaType.APPLICATION_JSON, + value = """ + { + "serviceUrl": "http://pulsar.example.com:8080", + "brokerServiceUrl": "pulsar://pulsar.example.com:6651" + } + """ + ) + ) + ) ClusterUrl clusterUrl) { + if (isMigrated && clusterUrl.isEmpty()) { + asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Cluster url must not be empty")); + return; + } + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> { + ClusterDataImpl data = (ClusterDataImpl) old; + data.setMigrated(isMigrated); + data.setMigratedClusterUrl(clusterUrl); + return data; + })) + .thenAccept(__ -> { + log.info("[{}] Updated cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, ex); + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist")); + return null; + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @POST @Path("/{cluster}/peers") @ApiOperation( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index b52f30361b192..5069f7cd44000 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -338,6 +338,19 @@ protected String getSubscriptionName() { return subscription == null ? null : subscription.getName(); } + protected void checkAndApplyReachedEndOfTopicOrTopicMigration(List consumers) { + PersistentTopic topic = (PersistentTopic) subscription.getTopic(); + checkAndApplyReachedEndOfTopicOrTopicMigration(topic, consumers); + } + + public static void checkAndApplyReachedEndOfTopicOrTopicMigration(PersistentTopic topic, List consumers) { + if (topic.isMigrated()) { + consumers.forEach(c -> c.topicMigrated(topic.getMigratedClusterUrl())); + } else { + consumers.forEach(Consumer::reachedEndOfTopic); + } + } + @Override public long getFilterProcessedMsgCount() { return this.filterProcessedMsgs.longValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 428c54ecc191d..a995192cead11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -44,12 +44,14 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resourcegroup.ResourceGroup; import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; @@ -59,6 +61,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; @@ -686,7 +689,10 @@ public CompletableFuture> addProducer(Producer producer, lock.writeLock().lock(); try { checkTopicFenced(); - if (isTerminated()) { + if (isMigrated()) { + log.warn("[{}] Attempting to add producer to a migrated topic", topic); + throw new TopicMigratedException("Topic was already migrated"); + } else if (isTerminated()) { log.warn("[{}] Attempting to add producer to a terminated topic", topic); throw new TopicTerminatedException("Topic was already terminated"); } @@ -1180,6 +1186,8 @@ public boolean deletePartitionedTopicMetadataWhileInactive() { protected abstract boolean isTerminated(); + protected abstract boolean isMigrated(); + private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class); public InactiveTopicPolicies getInactiveTopicPolicies() { @@ -1299,4 +1307,25 @@ public void updateBrokerSubscribeRate() { topicPolicies.getSubscribeRate().updateBrokerValue( subscribeRateInBroker(brokerService.pulsar().getConfiguration())); } + + public Optional getMigratedClusterUrl() { + return getMigratedClusterUrl(brokerService.getPulsar()); + } + + public static CompletableFuture> getMigratedClusterUrlAsync(PulsarService pulsar) { + return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName()) + .thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated()) + ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) + : Optional.empty()); + } + + public static Optional getMigratedClusterUrl(PulsarService pulsar) { + try { + return getMigratedClusterUrlAsync(pulsar) + .get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("Failed to get migration cluster URL", e); + } + return Optional.empty(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 63ab3352ef200..1def78e0266ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -582,6 +582,13 @@ protected void startInactivityMonitor() { subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS); } + + // check cluster migration + int interval = pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds(); + if (interval > 0) { + inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkClusterMigration()), interval, interval, + TimeUnit.SECONDS); + } } protected void startMessageExpiryMonitor() { @@ -1851,6 +1858,10 @@ public void checkGC() { forEachTopic(Topic::checkGC); } + public void checkClusterMigration() { + forEachTopic(Topic::checkClusterMigration); + } + public void checkMessageExpiry() { forEachTopic(Topic::checkMessageExpiry); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index c6d8ffabcecaf..6b3ab99595d31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -100,6 +100,16 @@ public TopicTerminatedException(Throwable t) { } } + public static class TopicMigratedException extends BrokerServiceException { + public TopicMigratedException(String msg) { + super(msg); + } + + public TopicMigratedException(Throwable t) { + super(t); + } + } + public static class ServerMetadataException extends BrokerServiceException { public ServerMetadataException(Throwable t) { super(t); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 767c7bb92747d..29eb5f5eb53dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; @@ -49,10 +50,12 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.KeyLongValue; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; @@ -785,6 +788,16 @@ public void reachedEndOfTopic() { cnx.getCommandSender().sendReachedEndOfTopic(consumerId); } + public void topicMigrated(Optional clusterUrl) { + if (clusterUrl.isPresent()) { + ClusterUrl url = clusterUrl.get(); + cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(), + url.getBrokerServiceUrlTls()); + // disconnect consumer after sending migrated cluster url + disconnect(); + } + } + /** * Checks if consumer-blocking on unAckedMessages is allowed for below conditions:
* a. consumer must have Shared-subscription
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 62182f6e84f49..902ba3ff19ad0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -43,10 +43,12 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; import org.apache.pulsar.common.protocol.Commands; @@ -665,6 +667,15 @@ public CompletableFuture disconnect() { return closeFuture; } + public void topicMigrated(Optional clusterUrl) { + if (clusterUrl.isPresent()) { + ClusterUrl url = clusterUrl.get(); + cnx.getCommandSender().sendTopicMigrated(ResourceType.Producer, producerId, url.getBrokerServiceUrl(), + url.getBrokerServiceUrlTls()); + disconnect(); + } + } + public void updateRates() { msgIn.calculateRate(); chunkedMessageRate.calculateRate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index dc5b97d846f53..d2775c19ab78e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; @@ -77,6 +78,8 @@ void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boo void sendReachedEndOfTopic(long consumerId); + boolean sendTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls); + Future sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, int partitionIdx, List entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 543739bae2760..7bc26072ffa75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.api.proto.TxnAction; @@ -219,6 +220,18 @@ public void sendReachedEndOfTopic(long consumerId) { } } + @Override + public boolean sendTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) { + // Only send notification if the client understand the command + if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v20.getValue()) { + log.info("[{}] Notifying {} that topic is migrated", type.name(), resourceId); + cnx.ctx().writeAndFlush(Commands.newTopicMigrated(type, resourceId, brokerUrl, brokerUrlTls), + cnx.ctx().voidPromise()); + return true; + } + return false; + } + @Override public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, int partitionIdx, List entries, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7e7e009562203..668998a6c5f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -23,6 +23,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; @@ -121,6 +122,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.api.proto.CommandWatchTopicList; import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; @@ -141,6 +143,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; @@ -1497,7 +1500,21 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ producers.remove(producerId, producerFuture); }).exceptionally(ex -> { - if (ex.getCause() instanceof BrokerServiceException.ProducerFencedException) { + if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) { + Optional clusterURL = getMigratedClusterUrl(service.getPulsar()); + if (clusterURL.isPresent()) { + log.info("[{}] redirect migrated producer to topic {}: producerId={}, {}", remoteAddress, topicName, + producerId, ex.getCause().getMessage()); + commandSender.sendTopicMigrated(ResourceType.Producer, producerId, + clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls()); + closeProducer(producer); + return null; + + } else { + log.warn("[{}] failed producer because migration url not configured topic {}: producerId={}, {}", + remoteAddress, topicName, producerId, ex.getCause().getMessage()); + } + } else if (ex.getCause() instanceof BrokerServiceException.ProducerFencedException) { if (log.isDebugEnabled()) { log.debug("[{}] Failed to add producer to topic {}: producerId={}, {}", remoteAddress, topicName, producerId, ex.getCause().getMessage()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index c0f931bd6a553..5b0a8c32fa906 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -197,6 +197,8 @@ CompletableFuture createSubscription(String subscriptionName, Init void checkGC(); + CompletableFuture checkClusterMigration(); + void checkInactiveSubscriptions(); /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 05e42c1b64d6e..c15a7605ca179 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -73,6 +73,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; @@ -106,6 +107,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter"); private volatile long entriesAddedCounter = 0; + private volatile boolean migrated = false; private static final FastThreadLocal threadLocalTopicStats = new FastThreadLocal() { @Override protected TopicStats initialValue() { @@ -153,10 +155,18 @@ public NonPersistentTopic(String topic, BrokerService brokerService) { registerTopicPolicyListener(); } + private CompletableFuture updateClusterMigrated() { + return getMigratedClusterUrlAsync(brokerService.getPulsar()).thenAccept(url -> migrated = url.isPresent()); + } + + private Optional getClusterMigrationUrl() { + return getMigratedClusterUrl(brokerService.getPulsar()); + } + public CompletableFuture initialize() { return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenAccept(optPolicies -> { + .thenCompose(optPolicies -> { if (!optPolicies.isPresent()) { log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic); isEncryptionRequired = false; @@ -168,6 +178,7 @@ public CompletableFuture initialize() { } updatePublishDispatcher(); updateResourceGroupLimiter(optPolicies); + return updateClusterMigrated(); }); } @@ -273,7 +284,6 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { final CompletableFuture future = new CompletableFuture<>(); - if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { if (log.isDebugEnabled()) { log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); @@ -313,6 +323,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest, DEFAULT_CONSUMER_EPOCH); + if (isMigrated()) { + consumer.topicMigrated(getClusterMigrationUrl()); + } addConsumerToSubscription(subscription, consumer).thenRun(() -> { if (!cnx.isActive()) { @@ -925,6 +938,23 @@ public boolean isActive() { return currentUsageCount() != 0 || !subscriptions.isEmpty(); } + @Override + public CompletableFuture checkClusterMigration() { + Optional url = getClusterMigrationUrl(); + if (url.isPresent()) { + this.migrated = true; + producers.forEach((__, producer) -> { + producer.topicMigrated(url); + }); + subscriptions.forEach((__, sub) -> { + sub.getConsumers().forEach((consumer) -> { + consumer.topicMigrated(url); + }); + }); + } + return CompletableFuture.completedFuture(null); + } + @Override public void checkGC() { if (!isDeleteWhileInactive()) { @@ -1164,6 +1194,12 @@ protected boolean isTerminated() { return false; } + + @Override + protected boolean isMigrated() { + return this.migrated; + } + @Override public boolean isPersistent() { return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index f7279968c51bb..8427acb48b11f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -19,13 +19,13 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration; import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.Compactor; @@ -102,7 +102,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) { // Notify all consumer that the end of topic was reached - dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); + checkAndApplyReachedEndOfTopicOrTopicMigration(topic, dispatcher.getConsumers()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9f09e60abb29e..24d1d702e7dae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -776,7 +776,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged - consumerList.forEach(Consumer::reachedEndOfTopic); + checkAndApplyReachedEndOfTopicOrTopicMigration(consumerList); } } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 3ba7a82aa5e35..385569cd8d8d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -486,7 +486,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged - consumers.forEach(Consumer::reachedEndOfTopic); + checkAndApplyReachedEndOfTopicOrTopicMigration(consumers); } } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 649d19bcec091..b5cd52b7885d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -31,7 +31,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest; import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher; @@ -142,7 +141,7 @@ public void notifyConsumersEndOfTopic() { if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged - consumerList.forEach(Consumer::reachedEndOfTopic); + checkAndApplyReachedEndOfTopicOrTopicMigration(consumerList); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index 2048bb016b8c3..612d3f5796e73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -97,7 +97,7 @@ public synchronized void notifyConsumersEndOfTopic() { if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged - consumers.forEach(Consumer::reachedEndOfTopic); + checkAndApplyReachedEndOfTopicOrTopicMigration(consumers); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 855bec48527e5..681b617277543 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -426,7 +427,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map disconnectProducersFuture; if (producers.size() > 0) { List> futures = new ArrayList<>(); + // send migration url metadata to producers before disconnecting them + if (isMigrated()) { + producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl())); + } producers.forEach((__, producer) -> futures.add(producer.disconnect())); disconnectProducersFuture = FutureUtil.waitForAll(futures); } else { @@ -585,7 +590,7 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); } - if (exception instanceof ManagedLedgerTerminatedException) { + if (exception instanceof ManagedLedgerTerminatedException && !isMigrated()) { // Signal the producer that this topic is no longer available callback.completed(new TopicTerminatedException(exception), -1, -1); } else { @@ -2352,6 +2357,17 @@ private boolean hasBacklogs() { return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0); } + @Override + public CompletableFuture checkClusterMigration() { + Optional clusterUrl = getMigratedClusterUrl(); + if (!isMigrated() && clusterUrl.isPresent()) { + log.info("{} triggering topic migration", topic); + return ledger.asyncMigrate().thenCompose(r -> null); + } else { + return CompletableFuture.completedFuture(null); + } + } + @Override public void checkGC() { if (!isDeleteWhileInactive()) { @@ -3318,6 +3334,11 @@ protected boolean isTerminated() { return ledger.isTerminated(); } + @Override + public boolean isMigrated() { + return ledger.isMigrated(); + } + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index dd83fd2a4ce87..54db4c8ff7139 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -40,8 +40,8 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.BaseCommand; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; import org.eclipse.jetty.server.Response; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java new file mode 100644 index 0000000000000..34397df924730 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.lang.reflect.Method; +import java.net.URL; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +import lombok.Cleanup; + +@Test(groups = "broker") +public class ClusterMigrationTest { + + private static final Logger log = LoggerFactory.getLogger(ClusterMigrationTest.class); + protected String methodName; + + String namespace = "pulsar/migrationNs"; + TestBroker broker1, broker2; + URL url1; + URL urlTls1; + PulsarService pulsar1; + + PulsarAdmin admin1; + + URL url2; + URL urlTls2; + PulsarService pulsar2; + PulsarAdmin admin2; + + @DataProvider(name = "TopicsubscriptionTypes") + public Object[][] subscriptionTypes() { + return new Object[][] { + {true, SubscriptionType.Shared}, + {true, SubscriptionType.Key_Shared}, + {true, SubscriptionType.Shared}, + {true, SubscriptionType.Key_Shared}, + + {false, SubscriptionType.Shared}, + {false, SubscriptionType.Key_Shared}, + {false, SubscriptionType.Shared}, + {false, SubscriptionType.Key_Shared}, + }; + } + + @BeforeMethod(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + + log.info("--- Starting ReplicatorTestBase::setup ---"); + + broker1 = new TestBroker(); + broker2 = new TestBroker(); + String clusterName = broker1.getClusterName(); + + pulsar1 = broker1.getPulsarService(); + url1 = new URL(pulsar1.getWebServiceAddress()); + urlTls1 = new URL(pulsar1.getWebServiceAddressTls()); + admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); + + pulsar2 = broker2.getPulsarService(); + url2 = new URL(pulsar2.getWebServiceAddress()); + urlTls2 = new URL(pulsar2.getWebServiceAddressTls()); + admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); + + // Start region 3 + + // Provision the global namespace + admin1.clusters().createCluster(clusterName, + ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build()); + admin2.clusters().createCluster(clusterName, + ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString()) + .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build()); + + admin1.tenants().createTenant("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet(clusterName))); + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(clusterName)); + + admin2.tenants().createTenant("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet(clusterName))); + admin2.namespaces().createNamespace(namespace, Sets.newHashSet(clusterName)); + + assertEquals(admin1.clusters().getCluster(clusterName).getServiceUrl(), url1.toString()); + assertEquals(admin2.clusters().getCluster(clusterName).getServiceUrl(), url2.toString()); + assertEquals(admin1.clusters().getCluster(clusterName).getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl()); + assertEquals(admin2.clusters().getCluster(clusterName).getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()); + + Thread.sleep(100); + log.info("--- ReplicatorTestBase::setup completed ---"); + + } + + @AfterMethod(alwaysRun = true, timeOut = 300000) + protected void cleanup() throws Exception { + log.info("--- Shutting down ---"); + broker1.cleanup(); + broker2.cleanup(); + } + + @BeforeMethod(alwaysRun = true) + public void beforeMethod(Method m) throws Exception { + methodName = m.getName(); + } + + /** + * Test producer/consumer migration: using persistent/non-persistent topic and all types of subscriptions + * (1) Producer1 and consumer1 connect to cluster-1 + * (2) Close consumer1 to build backlog and publish messages using producer1 + * (3) Migrate topic to cluster-2 + * (4) Validate producer-1 is connected to cluster-2 + * (5) create consumer1, drain backlog and migrate and reconnect to cluster-2 + * (6) Create new consumer2 with different subscription on cluster-1, + * which immediately migrate and reconnect to cluster-2 + * (7) Create producer-2 directly to cluster-2 + * (8) Create producer-3 on cluster-1 which should be redirected to cluster-2 + * (8) Publish messages using producer1, producer2, and producer3 + * (9) Consume all messages by both consumer1 and consumer2 + * (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1 + * (11) Restart Broker-1 and connect producer/consumer on cluster-1 + * @throws Exception + */ + @Test(dataProvider = "TopicsubscriptionTypes") + public void testClusterMigration(boolean persistent, SubscriptionType subType) throws Exception { + log.info("--- Starting ReplicatorTest::testClusterMigration ---"); + persistent = false; + final String topicName = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-1 producer/consumer + Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(topic1.getProducers().isEmpty()); + assertFalse(topic1.getSubscriptions().isEmpty()); + + // build backlog + consumer1.close(); + int n = 5; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-2 producer/consumer + Producer producer2 = client2.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(topic2.getProducers().isEmpty()); + + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration(broker2.getClusterName(), true, migratedUrl); + + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + + topic1.checkClusterMigration().get(); + + producer1.sendAsync("test1".getBytes()); + + // producer is disconnected from cluster-1 + retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + assertTrue(topic1.getProducers().isEmpty()); + + // create 3rd producer on cluster-1 which should be redirected to cluster-2 + Producer producer3 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + // producer is connected with cluster-2 + retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500); + assertTrue(topic2.getProducers().size() == 3); + + // try to consume backlog messages from cluster-1 + consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + if (persistent) { + for (int i = 0; i < n; i++) { + Message msg = consumer1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + consumer1.acknowledge(msg); + } + } + // after consuming all messages, consumer should have disconnected + // from cluster-1 and reconnect with cluster-2 + retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); + assertFalse(topic2.getSubscriptions().isEmpty()); + + // not also create a new consumer which should also reconnect to cluster-2 + Consumer consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s2").subscribe(); + retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500); + assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty()); + + // publish messages to cluster-2 and consume them + for (int i = 0; i < n; i++) { + producer1.send("test2".getBytes()); + producer2.send("test2".getBytes()); + producer3.send("test2".getBytes()); + } + log.info("Successfully published messages by migrated producers"); + for (int i = 0; i < n * 3; i++) { + assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + + } + + // create non-migrated topic which should connect to cluster-1 + String diffTopic = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + Consumer consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType) + .subscriptionName("s1-d").subscribe(); + Producer producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false) + .producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get(); + assertNotNull(topicDiff); + for (int i = 0; i < n; i++) { + producerDiff.send("diff".getBytes()); + assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes()); + } + + // restart broker-1 + broker1.restart(); + Producer producer4 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s3").subscribe(); + retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 500); + assertTrue(topic2.getProducers().size() == 4); + retryStrategically((test) -> topic2.getSubscription("s3") != null, 10, 500); + assertFalse(topic2.getSubscription("s3").getConsumers().isEmpty()); + for (int i = 0; i < n; i++) { + producer4.send("test3".getBytes()); + assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + } + + log.info("Successfully consumed messages by migrated consumers"); + } + + static class TestBroker extends MockedPulsarServiceBaseTest { + + public TestBroker() throws Exception { + setup(); + } + + @Override + protected void setup() throws Exception { + super.internalSetup(); + } + + public PulsarService getPulsarService() { + return pulsar; + } + + public String getClusterName() { + return configClusterName; + } + + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + public void restart() throws Exception { + restartBroker(); + } + + } +} diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml index 5083efc41fe0e..61b989e5050d2 100644 --- a/pulsar-client-admin-api/pom.xml +++ b/pulsar-client-admin-api/pom.xml @@ -43,6 +43,7 @@ org.slf4j slf4j-api + diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 4b83617e3eab5..2e8a43a826045 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -208,6 +209,42 @@ public interface Clusters { */ CompletableFuture updatePeerClusterNamesAsync(String cluster, LinkedHashSet peerClusterNames); + /** + * Update the configuration for a cluster migration. + *

+ * This operation requires Pulsar super-user privileges. + * + * @param cluster + * Cluster name + * @param migrated + * is cluster migrated + * @param clusterUrl + * the cluster url object + * + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateClusterMigration(String cluster, boolean migrated, ClusterUrl clusterUrl) throws PulsarAdminException; + + /** + * Update the configuration for a cluster migration asynchronously. + *

+ * This operation requires Pulsar super-user privileges. + * + * @param cluster + * Cluster name + * @param migrated + * is cluster migrated + * @param clusterUrl + * the cluster url object + * + */ + CompletableFuture updateClusterMigrationAsync(String cluster, boolean migrated, ClusterUrl clusterUrl); + /** * Get peer-cluster names. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java index 61a90a592a70d..f8cdf294d96e5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java @@ -19,6 +19,9 @@ package org.apache.pulsar.common.policies.data; import java.util.LinkedHashSet; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.pulsar.client.admin.utils.ReflectionUtils; import org.apache.pulsar.client.api.ProxyProtocol; @@ -57,6 +60,10 @@ public interface ClusterData { String getListenerName(); + boolean isMigrated(); + + ClusterUrl getMigratedClusterUrl(); + interface Builder { Builder serviceUrl(String serviceUrl); @@ -92,6 +99,10 @@ interface Builder { Builder listenerName(String listenerName); + Builder migrated(boolean migrated); + + Builder migratedClusterUrl(ClusterUrl migratedClusterUrl); + ClusterData build(); } @@ -100,4 +111,16 @@ interface Builder { static Builder builder() { return ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterDataImpl"); } + + @Data + @NoArgsConstructor + @AllArgsConstructor + class ClusterUrl { + String brokerServiceUrl; + String brokerServiceUrlTls; + + public boolean isEmpty() { + return brokerServiceUrl == null && brokerServiceUrlTls == null; + } + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index b32e3ea684cb7..1d4e3a4f28eeb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; @@ -106,6 +107,19 @@ public CompletableFuture updatePeerClusterNamesAsync(String cluster, Linke return asyncPostRequest(path, Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON)); } + @Override + public void updateClusterMigration(String cluster, boolean isMigrated, ClusterUrl clusterUrl) + throws PulsarAdminException { + sync(() -> updateClusterMigrationAsync(cluster, isMigrated, clusterUrl)); + } + + @Override + public CompletableFuture updateClusterMigrationAsync(String cluster, boolean isMigrated, + ClusterUrl clusterUrl) { + WebTarget path = adminClusters.path(cluster).path("migrate").queryParam("migrated", isMigrated); + return asyncPostRequest(path, Entity.entity(clusterUrl, MediaType.APPLICATION_JSON)); + } + @Override @SuppressWarnings("unchecked") public Set getPeerClusterNames(String cluster) throws PulsarAdminException { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index c68c575ec4f3b..e04d150597768 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -462,6 +462,22 @@ public TopicTerminatedException(String msg, long sequenceId) { } } + /** + * TopicMigration exception thrown by Pulsar client. + */ + public static class TopicMigrationException extends PulsarClientException { + /** + * Constructs an {@code TopicMigrationException} with the specified detail message. + * + * @param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public TopicMigrationException(String msg) { + super(msg); + } + } + /** * Producer fenced exception thrown by Pulsar client. */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index b412f30131eaf..12def9a9a96c1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; @@ -142,6 +143,28 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Update cluster migration") + private class UpdateClusterMigration extends CliCommand { + @Parameter(description = "cluster-name", required = true) + private java.util.List params; + + @Parameter(names = "--migrated", description = "Is cluster migrated", required = true) + private boolean migrated; + + @Parameter(names = "--broker-url", description = "New migrated cluster broker service url", required = false) + private String brokerServiceUrl; + + @Parameter(names = "--broker-url-secure", description = "New migrated cluster broker service url secure", + required = false) + private String brokerServiceUrlTls; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + ClusterUrl clusterUrl = new ClusterUrl(brokerServiceUrl, brokerServiceUrlTls); + getAdmin().clusters().updateClusterMigration(cluster, migrated, clusterUrl); + } + } + @Parameters(commandDescription = "Get list of peer-clusters") private class GetPeerClusters extends CliCommand { @@ -401,6 +424,7 @@ public CmdClusters(Supplier admin) { jcommander.addCommand("delete", new Delete()); jcommander.addCommand("list", new List()); jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters()); + jcommander.addCommand("update-cluster-migration", new UpdateClusterMigration()); jcommander.addCommand("get-peer-clusters", new GetPeerClusters()); jcommander.addCommand("get-failure-domain", new GetFailureDomain()); jcommander.addCommand("create-failure-domain", new CreateFailureDomain()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 14a33cd3203e1..a40b80727c875 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -33,6 +33,7 @@ import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URISyntaxException; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.List; @@ -87,6 +88,8 @@ import org.apache.pulsar.common.api.proto.CommandSendReceipt; import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.api.proto.ServerError; @@ -659,6 +662,26 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn } } + @Override + protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) { + final long resourceId = commandTopicMigrated.getResourceId(); + final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl(); + final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls(); + + HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer + ? producers.get(resourceId) + : consumers.get(resourceId); + log.info("{} is migrated to {}/{}", commandTopicMigrated.getResourceType().name(), serviceUrl, serviceUrlTls); + if (resource != null) { + try { + resource.setRedirectedClusterURI(serviceUrl, serviceUrlTls); + } catch (URISyntaxException e) { + log.info("[{}] Invalid redirect url {}/{} for {}", remoteAddress, serviceUrl, serviceUrlTls, + resourceId); + } + } + } + // caller of this method needs to be protected under pendingLookupRequestSemaphore private void addPendingLookupRequests(long requestId, TimedCompletableFuture future) { pendingRequests.put(requestId, future); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 6c5a5be200fb5..4d74e560b463f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -71,7 +72,11 @@ protected void grabCnx() { try { CompletableFuture cnxFuture; - if (state.topic == null) { + if (state.redirectedClusterURI != null) { + InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(), + state.redirectedClusterURI.getPort()); + cnxFuture = state.client.getConnection(address, address); + } else if (state.topic == null) { cnxFuture = state.client.getConnectionToServiceUrl(); } else { cnxFuture = state.client.getConnection(state.topic); // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java index 822ba411b711c..6489369ed3bed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java @@ -18,12 +18,16 @@ */ package org.apache.pulsar.client.impl; +import java.net.URI; +import java.net.URISyntaxException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.UnaryOperator; +import org.apache.commons.lang3.StringUtils; abstract class HandlerState { protected final PulsarClientImpl client; protected final String topic; + protected volatile URI redirectedClusterURI; private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(HandlerState.class, State.class, "state"); @@ -49,6 +53,11 @@ public HandlerState(PulsarClientImpl client, String topic) { STATE_UPDATER.set(this, State.Uninitialized); } + protected void setRedirectedClusterURI(String serviceUrl, String serviceUrlTls) throws URISyntaxException { + String url = client.conf.isUseTls() && StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl; + this.redirectedClusterURI = new URI(url); + } + // moves the state to ready if it wasn't closed protected boolean changeToReadyState() { if (STATE_UPDATER.get(this) == State.Ready) { diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index e780a84d50229..30a250423605d 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -162,6 +162,11 @@ provided true + + + com.google.protobuf + protobuf-java + diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java index 44cbfb4f35bef..006cce1a9c181 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java @@ -142,6 +142,17 @@ public final class ClusterDataImpl implements ClusterData, Cloneable { example = "" ) private String listenerName; + @ApiModelProperty( + name = "migrated", + value = "flag to check if cluster is migrated to different cluster", + example = "true/false" + ) + private boolean migrated; + @ApiModelProperty( + name = "migratedClusterUrl", + value = "url of cluster where current cluster is migrated" + ) + private ClusterUrl migratedClusterUrl; public static ClusterDataImplBuilder builder() { return new ClusterDataImplBuilder(); @@ -188,6 +199,8 @@ public static class ClusterDataImplBuilder implements ClusterData.Builder { private String brokerClientTlsTrustStorePassword; private String brokerClientTrustCertsFilePath; private String listenerName; + private boolean migrated; + private ClusterUrl migratedClusterUrl; ClusterDataImplBuilder() { } @@ -277,6 +290,16 @@ public ClusterDataImplBuilder listenerName(String listenerName) { return this; } + public ClusterDataImplBuilder migrated(boolean migrated) { + this.migrated = migrated; + return this; + } + + public ClusterDataImplBuilder migratedClusterUrl(ClusterUrl migratedClusterUrl) { + this.migratedClusterUrl = migratedClusterUrl; + return this; + } + public ClusterDataImpl build() { return new ClusterDataImpl( serviceUrl, @@ -295,7 +318,9 @@ public ClusterDataImpl build() { brokerClientTlsTrustStore, brokerClientTlsTrustStorePassword, brokerClientTrustCertsFilePath, - listenerName); + listenerName, + migrated, + migratedClusterUrl); } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0ebb2705d3ad3..cd88bb794e62f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -86,6 +86,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; @@ -739,6 +740,16 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) { return serializeWithSize(cmd); } + public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) { + BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED); + cmd.setTopicMigrated() + .setResourceType(type) + .setResourceId(resourceId) + .setBrokerServiceUrl(brokerUrl) + .setBrokerServiceUrlTls(brokerUrlTls); + return serializeWithSize(cmd); + } + public static ByteBuf newCloseProducer(long producerId, long requestId) { BaseCommand cmd = localCmd(Type.CLOSE_PRODUCER); cmd.setCloseProducer() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index d0b261034610a..2bd615d2ce4b2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -76,6 +76,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated; import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.api.proto.CommandWatchTopicList; import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; @@ -294,6 +295,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleReachedEndOfTopic(cmd.getReachedEndOfTopic()); break; + case TOPIC_MIGRATED: + checkArgument(cmd.hasTopicMigrated()); + handleTopicMigrated(cmd.getTopicMigrated()); + break; + case GET_LAST_MESSAGE_ID: checkArgument(cmd.hasGetLastMessageId()); handleGetLastMessageId(cmd.getGetLastMessageId()); @@ -600,6 +606,10 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn throw new UnsupportedOperationException(); } + protected void handleTopicMigrated(CommandTopicMigrated commandMigratedTopic) { + throw new UnsupportedOperationException(); + } + protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 7be65224f0cf3..acf75eab85826 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -262,6 +262,7 @@ enum ProtocolVersion { v17 = 17; // Added support ack receipt v18 = 18; // Add client support for broker entry metadata v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse + v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated } message CommandConnect { @@ -620,6 +621,19 @@ message CommandReachedEndOfTopic { required uint64 consumer_id = 1; } +message CommandTopicMigrated { + enum ResourceType { + Producer = 0; + Consumer = 1; + } + required uint64 resource_id = 1; + required ResourceType resource_type = 2; + optional string brokerServiceUrl = 3; + optional string brokerServiceUrlTls = 4; + +} + + message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; @@ -1025,6 +1039,7 @@ message BaseCommand { WATCH_TOPIC_UPDATE = 66; WATCH_TOPIC_LIST_CLOSE = 67; + TOPIC_MIGRATED = 68; } @@ -1106,4 +1121,6 @@ message BaseCommand { optional CommandWatchTopicListSuccess watchTopicListSuccess = 65; optional CommandWatchTopicUpdate watchTopicUpdate = 66; optional CommandWatchTopicListClose watchTopicListClose = 67; + + optional CommandTopicMigrated topicMigrated = 68; } diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index cbd4005eaa30a..6115182419544 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -321,6 +321,24 @@ Options |`--url`|service-url|| |`--url-secure`|service-url for secure connection|| +### `update cluster migration` +Update the configuration for a cluster + +Usage + +```bash +pulsar-admin clusters update-cluster-migration cluster-name options +``` + +Options + +|Flag|Description|Default| +|---|---|---| +|`--migrated`|Is cluster migrated.|| +|`--broker-url`|New cluster URL for the broker service.|| +|`--broker-url-secure`|New cluster service URL for a secure connection|| +|`--url`|service-url|| +|`--url-secure`|service-url for secure connection|| ### `delete` Deletes an existing cluster diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 730fbc90d2c8e..f838b06473a74 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -380,4 +380,16 @@ public void checkInactiveLedgerAndRollOver() { public void checkCursorsToCacheEntries() { // no-op } + + @Override + public CompletableFuture asyncMigrate() { + // no-op + return null; + } + + @Override + public boolean isMigrated() { + // no-op + return false; + } }