From 34ae881adb267d7cf7348335b576e281a22bfb72 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Tue, 10 Oct 2017 10:49:27 -0700 Subject: [PATCH] Admin-api to reset cursor by position (#785) * Admin-api to reset cursor by position * Fix: api notes for global topic * reset cursor using messageId --- .../pulsar/broker/admin/PersistentTopics.java | 70 +++++-- .../pulsar/broker/service/Subscription.java | 4 +- .../NonPersistentSubscription.java | 6 + .../persistent/PersistentSubscription.java | 125 ++++++------ .../pulsar/broker/admin/AdminApiTest.java | 58 ------ .../pulsar/broker/admin/AdminApiTest2.java | 181 +++++++++++++++++- .../pulsar/client/admin/PersistentTopics.java | 33 ++++ .../admin/internal/PersistentTopicsImpl.java | 22 +++ .../pulsar/admin/cli/CmdPersistentTopics.java | 39 +++- 9 files changed, 404 insertions(+), 134 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index 9ac0672ed3bca..a0ca6a1066009 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.admin; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.util.Codec.decode; import java.io.IOException; @@ -79,6 +80,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; @@ -87,8 +89,6 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.DestinationDomain; import org.apache.pulsar.common.naming.DestinationName; -import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthPolicies; @@ -113,7 +113,6 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; /** */ @@ -906,11 +905,9 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop @POST @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor/{timestamp}") - @ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "There should not be any active consumers on the subscription.") + @ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Topic does not exist"), - @ApiResponse(code = 405, message = "Not supported for global and non-persistent topics"), - @ApiResponse(code = 412, message = "Subscription has active consumers") }) + @ApiResponse(code = 404, message = "Topic/Subscription does not exist") }) public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, @PathParam("subName") String subName, @PathParam("timestamp") long timestamp, @@ -944,8 +941,8 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), dn, subName, timestamp, partitionException); throw new RestException(Status.PRECONDITION_FAILED, partitionException.getMessage()); - } else if (numPartException > 0 && log.isDebugEnabled()) { - log.debug("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", clientAppId(), + } else if (numPartException > 0) { + log.warn("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", clientAppId(), destination, subName, timestamp, partitionException); } @@ -954,6 +951,9 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus log.info("[{}][{}] received reset cursor on subscription {} to time {}", clientAppId(), destination, subName, timestamp); PersistentTopic topic = (PersistentTopic) getTopicReference(dn); + if (topic == null) { + throw new RestException(Status.NOT_FOUND, "Topic not found"); + } try { PersistentSubscription sub = topic.getSubscription(subName); checkNotNull(sub); @@ -967,8 +967,6 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus throw new RestException(Status.NOT_FOUND, "Subscription not found"); } else if (e instanceof NotAllowedException) { throw new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()); - } else if (t instanceof SubscriptionBusyException) { - throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers"); } else if (t instanceof SubscriptionInvalidCursorPosition) { throw new RestException(Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified -" + t.getMessage()); @@ -979,6 +977,56 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus } } + @POST + @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor") + @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Not supported for partitioned topics") }) + public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination, + @PathParam("subName") String subName, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) { + destination = decode(destination); + DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); + log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), destination, + subName, messageId); + + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace, + destination, authoritative); + + if (partitionMetadata.partitions > 0) { + log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), dn, subName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Reset-cursor at position is not allowed for partitioned-topic"); + } else { + validateAdminOperationOnDestination(dn, authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(dn); + if (topic == null) { + throw new RestException(Status.NOT_FOUND, "Topic not found"); + } + try { + PersistentSubscription sub = topic.getSubscription(subName); + checkNotNull(sub); + sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); + log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(), dn, + subName, messageId); + } catch (Exception e) { + Throwable t = e.getCause(); + log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), dn, + subName, messageId, e); + if (e instanceof NullPointerException) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); + } else if (t instanceof SubscriptionInvalidCursorPosition) { + throw new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage()); + } else { + throw new RestException(e); + } + } + } + } + @GET @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}") @ApiOperation(value = "Peek nth message on a topic subscription.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 1b346fff3d054..1e4a4187128a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -22,8 +22,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; @@ -59,6 +59,8 @@ public interface Subscription { CompletableFuture skipMessages(int numMessagesToSkip); CompletableFuture resetCursor(long timestamp); + + CompletableFuture resetCursor(Position position); CompletableFuture peekNthMessage(int messagePosition); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 25ef976824f37..83d5ccaa2031c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; @@ -347,6 +348,11 @@ public void markTopicWithBatchMessagePublished() { topic.markBatchMessagePublished(); } + @Override + public CompletableFuture resetCursor(Position position) { + return CompletableFuture.completedFuture(null); + } + private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f873e7d41458f..cff492f3d9cbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; @@ -343,62 +344,7 @@ public void findEntryComplete(Position position, Object ctx) { } else { finalPosition = position; } - - if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { - future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); - return; - } - - final CompletableFuture disconnectFuture; - if (dispatcher != null && dispatcher.isConsumerConnected()) { - disconnectFuture = dispatcher.disconnectAllConsumers(); - } else { - disconnectFuture = CompletableFuture.completedFuture(null); - } - - disconnectFuture.whenComplete((aVoid, throwable) -> { - if (throwable != null) { - log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); - IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); - future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription")); - return; - } - log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", topicName, subName); - - try { - cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() { - @Override - public void resetComplete(Object ctx) { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName, - timestamp); - } - IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); - future.complete(null); - } - - @Override - public void resetFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp, - exception); - IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); - // todo - retry on InvalidCursorPositionException - // or should we just ask user to retry one more time? - if (exception instanceof InvalidCursorPositionException) { - future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage())); - } else if (exception instanceof ConcurrentFindCursorPositionException) { - future.completeExceptionally(new SubscriptionBusyException(exception.getMessage())); - } else { - future.completeExceptionally(new BrokerServiceException(exception)); - } - } - }); - } catch (Exception e) { - log.error("[{}][{}] Error while resetting cursor", topicName, subName, e); - IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); - future.completeExceptionally(new BrokerServiceException(e)); - } - }); + resetCursor(finalPosition, future); } @Override @@ -415,6 +361,73 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } + @Override + public CompletableFuture resetCursor(Position position) { + CompletableFuture future = new CompletableFuture<>(); + resetCursor(position, future); + return future; + } + + private void resetCursor(Position finalPosition, CompletableFuture future) { + if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); + return; + } + + final CompletableFuture disconnectFuture; + if (dispatcher != null && dispatcher.isConsumerConnected()) { + disconnectFuture = dispatcher.disconnectAllConsumers(); + } else { + disconnectFuture = CompletableFuture.completedFuture(null); + } + + disconnectFuture.whenComplete((aVoid, throwable) -> { + if (throwable != null) { + log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + future.completeExceptionally( + new SubscriptionBusyException("Failed to disconnect consumers from subscription")); + return; + } + log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", + topicName, subName); + + try { + cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() { + @Override + public void resetComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName, + finalPosition); + } + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + future.complete(null); + } + + @Override + public void resetFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName, + finalPosition, exception); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + // todo - retry on InvalidCursorPositionException + // or should we just ask user to retry one more time? + if (exception instanceof InvalidCursorPositionException) { + future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage())); + } else if (exception instanceof ConcurrentFindCursorPositionException) { + future.completeExceptionally(new SubscriptionBusyException(exception.getMessage())); + } else { + future.completeExceptionally(new BrokerServiceException(exception)); + } + } + }); + } catch (Exception e) { + log.error("[{}][{}] Error while resetting cursor", topicName, subName, e); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + future.completeExceptionally(new BrokerServiceException(e)); + } + }); + } + @Override public CompletableFuture peekNthMessage(int messagePosition) { CompletableFuture future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 82d01fcfc4b4a..25c76904e04e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -21,8 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -48,8 +46,6 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -86,7 +82,6 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; -import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -1753,57 +1748,4 @@ public void testDestinationBundleRangeLookup() throws PulsarAdminException, Puls pulsar.getNamespaceService().getBundle(DestinationName.get(topicName)).getBundleRange()); } - // TODO: move to AdminApiTest2.java - /** - * Verify unloading topic - * - * @throws Exception - */ - @Test(dataProvider = "topicType") - public void testUnloadTopic(final String topicType) throws Exception { - - final String namespace = "prop-xyz/use/ns2"; - final String topicName = topicType + "://" + namespace + "/topic1"; - admin.namespaces().createNamespace(namespace); - - // create a topic by creating a producer - Producer producer = pulsarClient.createProducer(topicName); - producer.close(); - - Topic topic = pulsar.getBrokerService().getTopicReference(topicName); - assertNotNull(topic); - final boolean isPersistentTopic = topic instanceof PersistentTopic; - - // (1) unload the topic - unloadTopic(topicName, isPersistentTopic); - topic = pulsar.getBrokerService().getTopicReference(topicName); - // topic must be removed - assertNull(topic); - - // recreation of producer will load the topic again - producer = pulsarClient.createProducer(topicName); - topic = pulsar.getBrokerService().getTopicReference(topicName); - assertNotNull(topic); - // unload the topic - unloadTopic(topicName, isPersistentTopic); - // producer will retry and recreate the topic - for (int i = 0; i < 5; i++) { - topic = pulsar.getBrokerService().getTopicReference(topicName); - if (topic == null || i != 4) { - Thread.sleep(200); - } - } - // topic should be loaded by this time - topic = pulsar.getBrokerService().getTopicReference(topicName); - assertNotNull(topic); - } - - private void unloadTopic(String topicName, boolean isPersistentTopic) throws Exception { - if (isPersistentTopic) { - admin.persistentTopics().unload(topicName); - } else { - admin.nonPersistentTopics().unload(topicName); - } - } - } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 11e10ecbb7e17..8278f11b3af45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -19,10 +19,13 @@ package org.apache.pulsar.broker.admin; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.net.URL; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -31,17 +34,21 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.DestinationDomain; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.policies.data.ClusterData; @@ -51,10 +58,12 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -103,6 +112,17 @@ public void cleanup() throws Exception { mockPulsarSetup.cleanup(); } + @DataProvider(name = "topicType") + public Object[][] topicTypeProvider() { + return new Object[][] { { DestinationDomain.persistent.value() }, + { DestinationDomain.non_persistent.value() } }; + } + + @DataProvider(name = "namespaceNames") + public Object[][] namespaceNameProvider() { + return new Object[][] { { "ns1" }, { "global" } }; + } + /** *
      * It verifies increasing partitions for partitioned-topic.
@@ -345,4 +365,163 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
 
     }
 
+    /**
+     * Verify unloading topic
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "topicType")
+    public void testUnloadTopic(final String topicType) throws Exception {
+
+        final String namespace = "prop-xyz/use/ns2";
+        final String topicName = topicType + "://" + namespace + "/topic1";
+        admin.namespaces().createNamespace(namespace);
+
+        // create a topic by creating a producer
+        Producer producer = pulsarClient.createProducer(topicName);
+        producer.close();
+
+        Topic topic = pulsar.getBrokerService().getTopicReference(topicName);
+        assertNotNull(topic);
+        final boolean isPersistentTopic = topic instanceof PersistentTopic;
+
+        // (1) unload the topic
+        unloadTopic(topicName, isPersistentTopic);
+        topic = pulsar.getBrokerService().getTopicReference(topicName);
+        // topic must be removed
+        assertNull(topic);
+
+        // recreation of producer will load the topic again
+        producer = pulsarClient.createProducer(topicName);
+        topic = pulsar.getBrokerService().getTopicReference(topicName);
+        assertNotNull(topic);
+        // unload the topic
+        unloadTopic(topicName, isPersistentTopic);
+        // producer will retry and recreate the topic
+        for (int i = 0; i < 5; i++) {
+            topic = pulsar.getBrokerService().getTopicReference(topicName);
+            if (topic == null || i != 4) {
+                Thread.sleep(200);
+            }
+        }
+        // topic should be loaded by this time
+        topic = pulsar.getBrokerService().getTopicReference(topicName);
+        assertNotNull(topic);
+    }
+
+    private void unloadTopic(String topicName, boolean isPersistentTopic) throws Exception {
+        if (isPersistentTopic) {
+            admin.persistentTopics().unload(topicName);
+        } else {
+            admin.nonPersistentTopics().unload(topicName);
+        }
+    }
+
+    /**
+     * Verifies reset-cursor at specific position using admin-api.
+     * 
+     * 
+     * 1. Publish 50 messages
+     * 2. Consume 20 messages
+     * 3. reset cursor position on 10th message
+     * 4. consume 40 messages from reset position
+     * 
+ * + * @param namespaceName + * @throws Exception + */ + @Test(dataProvider = "namespaceNames", timeOut = 10000) + public void testResetCursorOnPosition(String namespaceName) throws Exception { + final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition"; + final int totalProducedMessages = 50; + + // set retention + admin.namespaces().setRetention("prop-xyz/use/ns1", new RetentionPolicies(10, 10)); + + // create consumer and subscription + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Shared); + Consumer consumer = pulsarClient.subscribe(topicName, "my-sub", conf); + + assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); + + publishMessagesOnPersistentTopic(topicName, totalProducedMessages, 0); + + List messages = admin.persistentTopics().peekMessages(topicName, "my-sub", 10); + assertEquals(messages.size(), 10); + + Message message = null; + MessageIdImpl resetMessageId = null; + int resetPositionId = 10; + for (int i = 0; i < 20; i++) { + message = consumer.receive(1, TimeUnit.SECONDS); + consumer.acknowledge(message); + if (i == resetPositionId) { + resetMessageId = (MessageIdImpl) message.getMessageId(); + } + } + + // close consumer which will clean up intenral-receive-queue + consumer.close(); + + // messages should still be available due to retention + MessageIdImpl messageId = new MessageIdImpl(resetMessageId.getLedgerId(), resetMessageId.getEntryId(), -1); + // reset position at resetMessageId + admin.persistentTopics().resetCursor(topicName, "my-sub", messageId); + + consumer = pulsarClient.subscribe(topicName, "my-sub", conf); + MessageIdImpl msgId2 = (MessageIdImpl) consumer.receive(1, TimeUnit.SECONDS).getMessageId(); + assertEquals(resetMessageId, msgId2); + + int receivedAfterReset = 1; // start with 1 because we have already received 1 msg + + for (int i = 0; i < totalProducedMessages; i++) { + message = consumer.receive(500, TimeUnit.MILLISECONDS); + if (message == null) { + break; + } + consumer.acknowledge(message); + ++receivedAfterReset; + } + assertEquals(receivedAfterReset, totalProducedMessages - resetPositionId); + + // invalid topic name + try { + admin.persistentTopics().resetCursor(topicName + "invalid", "my-sub", messageId); + fail("It should have failed due to invalid topic name"); + } catch (PulsarAdminException.NotFoundException e) { + // Ok + } + + // invalid cursor name + try { + admin.persistentTopics().resetCursor(topicName, "invalid-sub", messageId); + fail("It should have failed due to invalid subscription name"); + } catch (PulsarAdminException.NotFoundException e) { + // Ok + } + + // invalid position + try { + messageId = new MessageIdImpl(0, 0, -1); + admin.persistentTopics().resetCursor(topicName, "my-sub", messageId); + fail("It should have failed due to invalid subscription name"); + } catch (PulsarAdminException.PreconditionFailedException e) { + // Ok + } + + consumer.close(); + } + + private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { + Producer producer = pulsarClient.createProducer(topicName); + + for (int i = startIdx; i < (messages + startIdx); i++) { + String message = "message-" + i; + producer.send(message.getBytes()); + } + + producer.close(); + } + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java index 64572ecb91377..093ae3e91253a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java @@ -829,4 +829,37 @@ public interface PersistentTopics { * reset subscription to position closest to time in ms since epoch */ CompletableFuture resetCursorAsync(String destination, String subName, long timestamp); + + /** + * Reset cursor position on a topic subscription + * + * @param destination + * Destination name + * @param subName + * Subscription name + * @param messageId + * reset subscription to messageId (or previous nearest messageId if given messageId is not valid) + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws NotAllowedException + * Command disallowed for requested resource + * @throws PulsarAdminException + * Unexpected error + */ + void resetCursor(String destination, String subName, MessageId messageId) throws PulsarAdminException; + + /** + * Reset cursor position on a topic subscription + * + * @param destination + * Destination name + * @param subName + * Subscription name + * @param MessageId + * reset subscription to messageId (or previous nearest messageId if given messageId is not valid) + */ + CompletableFuture resetCursorAsync(String destination, String subName, MessageId messageId); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java index 0b673a4cc7724..48f57fe1ef1da 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java @@ -640,6 +640,28 @@ public CompletableFuture resetCursorAsync(String destination, String subNa Entity.entity("", MediaType.APPLICATION_JSON)); } + @Override + public void resetCursor(String destination, String subName, MessageId messageId) throws PulsarAdminException { + try { + DestinationName ds = validateTopic(destination); + String encodedSubName = Codec.encode(subName); + request(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("subscription") + .path(encodedSubName).path("resetcursor")).post(Entity.entity(messageId, MediaType.APPLICATION_JSON), + ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public CompletableFuture resetCursorAsync(String destination, String subName, MessageId messageId) { + DestinationName ds = validateTopic(destination); + String encodedSubName = Codec.encode(subName); + return asyncPostRequest(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()) + .path("subscription").path(encodedSubName).path("resetcursor"), + Entity.entity(messageId, MediaType.APPLICATION_JSON)); + } + @Override public CompletableFuture terminateTopicAsync(String destination) { DestinationName ds = validateTopic(destination); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 8977ef1f0a98c..d7f7d51d82a5c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.admin.PersistentTopics; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -33,6 +34,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.CommaParameterSplitter; +import static com.google.common.base.Preconditions.checkArgument; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; @@ -416,7 +418,7 @@ void run() throws PulsarAdminException { } } - @Parameters(commandDescription = "Reset position for subscription to position closest to timestamp") + @Parameters(commandDescription = "Reset position for subscription to position closest to timestamp or messageId") private class ResetCursor extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/destination", required = true) private java.util.List params; @@ -426,17 +428,29 @@ private class ResetCursor extends CliCommand { private String subName; @Parameter(names = { "--time", - "-t" }, description = "time in minutes to reset back to (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = true) + "-t" }, description = "time in minutes to reset back to (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false) private String resetTimeStr; + + @Parameter(names = { "--messageId", + "-m" }, description = "messageId to reset back to (ledgerId:entryId)", required = false) + private String resetMessageIdStr; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - int resetBackTimeInMin = validateTimeString(resetTimeStr); - long resetTimeInMillis = TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES); - // now - go back time - long timestamp = System.currentTimeMillis() - resetTimeInMillis; - persistentTopics.resetCursor(persistentTopic, subName, timestamp); + if (isNotBlank(resetMessageIdStr)) { + MessageId messageId = validateMessageIdString(resetMessageIdStr); + persistentTopics.resetCursor(persistentTopic, subName, messageId); + } else if (isNotBlank(resetTimeStr)) { + int resetBackTimeInMin = validateTimeString(resetTimeStr); + long resetTimeInMillis = TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES); + // now - go back time + long timestamp = System.currentTimeMillis() - resetTimeInMillis; + persistentTopics.resetCursor(persistentTopic, subName, timestamp); + } else { + throw new PulsarAdminException( + "Either Timestamp (--time) or Position (--position) has to be provided to reset cursor"); + } } } @@ -520,4 +534,15 @@ private static int validateTimeString(String s) { return Integer.parseInt(s); } } + + private MessageId validateMessageIdString(String resetMessageIdStr) throws PulsarAdminException { + String[] messageId = resetMessageIdStr.split(":"); + try { + checkArgument(messageId.length == 2); + return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1); + } catch (Exception e) { + throw new PulsarAdminException( + "Invalid reset-position (must be in format: ledgerId:entryId) value " + resetMessageIdStr); + } + } }