Skip to content

Commit

Permalink
[broker] Make resetting cursor in REST API asynchronous (apache#7744)
Browse files Browse the repository at this point in the history
### Motivation

As mentioned in apache#7478, `PersistentSubscription#resetCursor()` may not complete for unknown reasons. This blocks `pulsar-web` threads, so it can cause the broker server to be unable to respond to HTTP requests.

### Modifications

Get the result of `PersistentSubscription#resetCursor()` asynchronously so as not to block `pulsar-web` threads.
  • Loading branch information
Masahiro Sakamoto authored Aug 6, 2020
1 parent 894e078 commit 69cbd26
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName)

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume(throwable);
asyncResponse.resume((WebApplicationException) throwable);
} else {
asyncResponse.resume(new RestException(throwable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1566,7 +1567,8 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}", clientAppId(), topicName,
subName, timestamp, e.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
Expand Down Expand Up @@ -1667,22 +1669,32 @@ private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncRespon
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.resetCursor(timestamp).get();
log.info("[{}] [{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp);
asyncResponse.resume(Response.noContent().build());
sub.resetCursor(timestamp).thenRun(() -> {
log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
subName, timestamp, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for timestamp specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
subName, timestamp, e);
log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
timestamp, e);
if (e instanceof NotAllowedException) {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
} else if (e instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for timestamp specified -" + e.getMessage()));
} else if (e instanceof WebApplicationException) {
asyncResponse.resume(e);
} else {
asyncResponse.resume(new RestException(e));
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}
Expand Down Expand Up @@ -1796,69 +1808,91 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()))
.get();
.thenRun(() -> {
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
} catch (Throwable e) {
Throwable t = e.getCause();
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (e instanceof WebApplicationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName,
subscriptionName, targetMessageId, e);
}
asyncResponse.resume(e);
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
asyncResponse.resume(new RestException(e));
}
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}

log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
}

protected void internalResetCursorOnPosition(String subName, boolean authoritative, MessageIdImpl messageId) {
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}: {}", clientAppId(),
topicName, subName, messageId, e.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);

// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic");
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic"));
return;
} else {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
Preconditions.checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).thenRun(() -> {
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
} catch (Exception e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId, e);
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId, e);
if (e instanceof NullPointerException) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage());
} else if (t instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED,
"Failed for SubscriptionBusy: " + t.getMessage());
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
} else {
throw new RestException(e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,17 @@ public void resetCursor(@Suspended final AsyncResponse asyncResponse, @PathParam
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void resetCursorOnPosition(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ public void resetCursor(
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void resetCursorOnPosition(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -890,8 +891,12 @@ public void resetCursorOnPosition(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
MessageIdImpl messageId) {
validateTopicName(tenant, namespace, encodedTopic);
internalResetCursorOnPosition(decode(encodedSubName), authoritative, messageId);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public void testTerminatePartitionedTopic() {

@Test
public void testNonPartitionedTopics() {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String nonPartitionTopic = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,
Expand Down

0 comments on commit 69cbd26

Please sign in to comment.