Skip to content

Commit

Permalink
[cleanup][admin] Remove unused methods in PersistentTopicsBase (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored Apr 4, 2024
1 parent 5b6f91b commit 706b588
Showing 1 changed file with 0 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1711,107 +1711,6 @@ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncRespon
});
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName,
authoritative);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.deleteForcefully();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
future.thenCompose(__ -> {
Expand Down

0 comments on commit 706b588

Please sign in to comment.