Skip to content

Commit

Permalink
[broker] Fix call sync method in async rest api for internalGetReplic…
Browse files Browse the repository at this point in the history
…atedSubscriptionStatus (apache#13888)
  • Loading branch information
suiyuzeng authored Apr 25, 2022
1 parent 14ba6c4 commit 43ce8ef
Showing 1 changed file with 91 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4808,108 +4808,113 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
}

// Permission to consume this topic is required
try {
validateTopicOperation(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
CompletableFuture<Void> validateFuture =
validateTopicOperationAsync(topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);

CompletableFuture<Void> resultFuture;
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative);
resultFuture = validateFuture.thenAccept(
__ -> internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative));
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures = Lists.newArrayList();
final Map<String, Boolean> status = Maps.newHashMap();
resultFuture = validateFuture
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
final Map<String, Boolean> status = Maps.newHashMap();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic or subscription not found"));
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot get replicated subscriptions on non-global topics"));
} else {
log.error("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic or subscription not found"));
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Cannot get replicated subscriptions on non-global topics"));
} else {
log.error("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
}
}
asyncResponse.resume(status);
return null;
});
} else {
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
asyncResponse.resume(status);
return null;
});
} else {
internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

resultFuture.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse,
private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
try {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnership(topicName, authoritative);

Topic topic = getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}

Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}

if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
Map res = Maps.newHashMap();
res.put(topicName.toString(), sub.isReplicated());
asyncResponse.resume(res);
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
}
} catch (Exception e) {
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
Map res = Maps.newHashMap();
res.put(topicName.toString(), sub.isReplicated());
asyncResponse.resume(res);
} else {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent topics"));
}
})
.exceptionally(e -> {
Throwable cause = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, cause);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompatibilityStrategy(boolean applied) {
Expand Down

0 comments on commit 43ce8ef

Please sign in to comment.