From 69cbd26eafb87b8e84b3abfe4d0f0b915fd75320 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 6 Aug 2020 19:02:42 +0900 Subject: [PATCH] [broker] Make resetting cursor in REST API asynchronous (#7744) ### Motivation As mentioned in #7478, `PersistentSubscription#resetCursor()` may not complete for unknown reasons. This blocks `pulsar-web` threads, so it can cause the broker server to be unable to respond to HTTP requests. ### Modifications Get the result of `PersistentSubscription#resetCursor()` asynchronously so as not to block `pulsar-web` threads. --- .../pulsar/broker/admin/AdminResource.java | 2 +- .../admin/impl/PersistentTopicsBase.java | 140 +++++++++++------- .../broker/admin/v1/PersistentTopics.java | 11 +- .../broker/admin/v2/PersistentTopics.java | 9 +- .../broker/admin/PersistentTopicsTest.java | 1 - 5 files changed, 103 insertions(+), 60 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 590c03282d50b..1e28dc41facc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -873,7 +873,7 @@ protected CompletableFuture checkTopicExistsAsync(TopicName topicName) protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { - asyncResponse.resume(throwable); + asyncResponse.resume((WebApplicationException) throwable); } else { asyncResponse.resume(new RestException(throwable)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 004ca6faaf5b0..866180fb7f3fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -42,6 +42,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1566,7 +1567,8 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName, try { validateGlobalNamespaceOwnership(namespaceName); } catch (Exception e) { - log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e); + log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}", clientAppId(), topicName, + subName, timestamp, e.getMessage()); resumeAsyncResponseExceptionally(asyncResponse, e); return; } @@ -1667,22 +1669,32 @@ private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncRespon asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); return; } - sub.resetCursor(timestamp).get(); - log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, - timestamp); - asyncResponse.resume(Response.noContent().build()); + sub.resetCursor(timestamp).thenRun(() -> { + log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, + timestamp); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); + log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, + subName, timestamp, t); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for timestamp specified: " + t.getMessage())); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Failed for Subscription Busy: " + t.getMessage())); + } else { + resumeAsyncResponseExceptionally(asyncResponse, t); + } + return null; + }); } catch (Exception e) { - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, - subName, timestamp, e); + log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, subName, + timestamp, e); if (e instanceof NotAllowedException) { asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage())); - } else if (e instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for timestamp specified -" + e.getMessage())); - } else if (e instanceof WebApplicationException) { - asyncResponse.resume(e); } else { - asyncResponse.resume(new RestException(e)); + resumeAsyncResponseExceptionally(asyncResponse, e); } } } @@ -1796,69 +1808,91 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn // Mark the cursor as "inactive" as it was created without a real consumer connected subscription.deactivateCursor(); subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId())) - .get(); + .thenRun(() -> { + log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), + topicName, subscriptionName, targetMessageId); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); + log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, targetMessageId, t); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage())); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Failed for Subscription Busy: " + t.getMessage())); + } else { + resumeAsyncResponseExceptionally(asyncResponse, t); + } + return null; + }); } catch (Throwable e) { - Throwable t = e.getCause(); - if (t instanceof SubscriptionInvalidCursorPosition) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for position specified: " + t.getMessage())); - } else if (e instanceof WebApplicationException) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName, - subscriptionName, targetMessageId, e); - } - asyncResponse.resume(e); - } else if (t instanceof SubscriptionBusyException) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Failed for Subscription Busy: " + t.getMessage())); - } else { - asyncResponse.resume(new RestException(e)); - } + log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, + subscriptionName, targetMessageId, e); + resumeAsyncResponseExceptionally(asyncResponse, e); } - - log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId); - asyncResponse.resume(Response.noContent().build()); } - protected void internalResetCursorOnPosition(String subName, boolean authoritative, MessageIdImpl messageId) { + protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, + MessageIdImpl messageId) { if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}: {}", clientAppId(), + topicName, subName, messageId, e.getMessage()); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } } + log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId); + // If the topic name is a partition name, no need to get partition topic metadata again if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName, subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Reset-cursor at position is not allowed for partitioned-topic"); + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "Reset-cursor at position is not allowed for partitioned-topic")); + return; } else { validateAdminAccessForSubscriber(subName, authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); if (topic == null) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + return; } try { PersistentSubscription sub = topic.getSubscription(subName); Preconditions.checkNotNull(sub); - sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); - log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(), - topicName, subName, messageId); + sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).thenRun(() -> { + log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(), + topicName, subName, messageId); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); + log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), + topicName, subName, messageId, t); + if (t instanceof SubscriptionInvalidCursorPosition) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unable to find position for position specified: " + t.getMessage())); + } else if (t instanceof SubscriptionBusyException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Failed for Subscription Busy: " + t.getMessage())); + } else { + resumeAsyncResponseExceptionally(asyncResponse, t); + } + return null; + }); } catch (Exception e) { - Throwable t = e.getCause(); - log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), - topicName, subName, messageId, e); + log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName, + subName, messageId, e); if (e instanceof NullPointerException) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); - } else if (t instanceof SubscriptionInvalidCursorPosition) { - throw new RestException(Status.PRECONDITION_FAILED, - "Unable to find position for position specified: " + t.getMessage()); - } else if (t instanceof SubscriptionBusyException) { - throw new RestException(Status.PRECONDITION_FAILED, - "Failed for SubscriptionBusy: " + t.getMessage()); + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); } else { - throw new RestException(e); + resumeAsyncResponseExceptionally(asyncResponse, e); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 8bdef8b7b3527..36ed69cf7d02c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -504,12 +504,17 @@ public void resetCursor(@Suspended final AsyncResponse asyncResponse, @PathParam @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics") }) - public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void resetCursorOnPosition(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) { - validateTopicName(property, cluster, namespace, encodedTopic); - internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + } } @PUT diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index cd0dae6d10123..1273bfb7fee21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -878,6 +878,7 @@ public void resetCursor( @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void resetCursorOnPosition( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -890,8 +891,12 @@ public void resetCursorOnPosition( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)") MessageIdImpl messageId) { - validateTopicName(tenant, namespace, encodedTopic); - internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId); + } catch (Exception e) { + resumeAsyncResponseExceptionally(asyncResponse, e); + } } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index d6e60458d0172..92f7693d758a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -232,7 +232,6 @@ public void testTerminatePartitionedTopic() { @Test public void testNonPartitionedTopics() { - pulsar.getConfiguration().setAllowAutoTopicCreation(false); final String nonPartitionTopic = "non-partitioned-topic"; AsyncResponse response = mock(AsyncResponse.class); persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,