diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index eb8d9bcc513c0..2a4be06f418a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1045,11 +1045,13 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut } else { future = CompletableFuture.completedFuture(null); } - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenAccept(unused -> { + future.thenCompose(__ -> + validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS) + .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenAccept(unused1 -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { - internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse); } else { getPartitionedTopicMetadataAsync(topicName, authoritative, false) .thenAccept(partitionMetadata -> { @@ -1067,7 +1069,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut topicResources().persistentTopicExists(topicName.getPartition(i))); } FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())) - .thenApply(__ -> + .thenApply(unused2 -> existsFutures.entrySet().stream().filter(e -> e.getValue().join()) .map(item -> topicName.getPartition(item.getKey()).toString()) .collect(Collectors.toList()) @@ -1086,7 +1088,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut throw new RestException(e); } }); - }).thenAccept(__ -> resumeAsyncResponse(asyncResponse, + }).thenAccept(unused3 -> resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures)); } else { for (int i = 0; i < partitionMetadata.partitions; i++) { @@ -1104,7 +1106,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut asyncResponse.resume(e); } } else { - internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative); + internalGetSubscriptionsForNonPartitionedTopic(asyncResponse); } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1124,7 +1126,8 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; - }); + }) + ); } private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscriptions, @@ -1153,10 +1156,8 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr }); } - private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) + private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) { + getTopicReferenceAsync(topicName) .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1731,11 +1732,7 @@ protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subNa private CompletableFuture internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse, String subName, boolean authoritative) { - return validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> - validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)) - .thenCompose(__ -> - getTopicReferenceAsync(topicName).thenCompose(t -> { + return getTopicReferenceAsync(topicName).thenCompose(t -> { PersistentTopic topic = (PersistentTopic) t; BiConsumer biConsumer = (v, ex) -> { if (ex != null) { @@ -1764,8 +1761,7 @@ private CompletableFuture internalSkipAllMessagesForNonPartitionedTopicAsy } return sub.clearBacklog().whenComplete(biConsumer); } - }) - .exceptionally(ex -> { + }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", @@ -1773,7 +1769,7 @@ private CompletableFuture internalSkipAllMessagesForNonPartitionedTopicAsy } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; - })); + }); } protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, @@ -1936,7 +1932,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy for (int i = 0; i < subNames.size(); i++) { try { futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, - subNames.get(i), expireTimeInSeconds, authoritative)); + subNames.get(i), expireTimeInSeconds)); } catch (Exception e) { log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, e); @@ -3447,61 +3443,68 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St future = CompletableFuture.completedFuture(null); } future.thenCompose(__ -> - // If the topic name is a partition name, no need to get partition topic metadata again - getPartitionedTopicMetadataAsync(topicName, authoritative, false) - .thenCompose(partitionMetadata -> { - if (topicName.isPartitioned()) { - return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, subName, - expireTimeInSeconds, authoritative) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); - } else { - if (partitionMetadata.partitions > 0) { - return CompletableFuture.completedFuture(null).thenAccept(unused -> { - final List> futures = Lists.newArrayList(); - - // expire messages for each partition topic - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar() - .getAdminClient() - .topics() - .expireMessagesAsync(topicNamePartition.toString(), - subName, expireTimeInSeconds)); - } catch (Exception e) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), - expireTimeInSeconds, topicNamePartition, e); - asyncResponse.resume(new RestException(e)); - return; + validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) + .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(unused2 -> + // If the topic name is a partition name, no need to get partition topic metadata again + getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenCompose(partitionMetadata -> { + if (topicName.isPartitioned()) { + return internalExpireMessagesByTimestampForSinglePartitionAsync + (partitionMetadata, subName, expireTimeInSeconds) + .thenAccept(unused3 -> + asyncResponse.resume(Response.noContent().build())); + } else { + if (partitionMetadata.partitions > 0) { + return CompletableFuture.completedFuture(null).thenAccept(unused -> { + final List> futures = Lists.newArrayList(); + + // expire messages for each partition topic + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar() + .getAdminClient() + .topics() + .expireMessagesAsync(topicNamePartition.toString(), + subName, expireTimeInSeconds)); + } catch (Exception e) { + log.error("[{}] Failed to expire messages up to {} on {}", + clientAppId(), + expireTimeInSeconds, topicNamePartition, e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to expire messages up " + + "to {} on {}", clientAppId(), + expireTimeInSeconds, topicName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + asyncResponse.resume(Response.noContent().build()); + return null; + }); + }); + } else { + return internalExpireMessagesByTimestampForSinglePartitionAsync + (partitionMetadata, subName, expireTimeInSeconds) + .thenAccept(unused -> + asyncResponse.resume(Response.noContent().build())); } } + })) - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); - return null; - } else { - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } - asyncResponse.resume(Response.noContent().build()); - return null; - }); - }); - } else { - return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, - subName, expireTimeInSeconds, authoritative) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())); - } - } - }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { @@ -3514,69 +3517,65 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St } private CompletableFuture internalExpireMessagesByTimestampForSinglePartitionAsync( - PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds, - boolean authoritative) { + PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds) { if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) { String msg = "This method should not be called for partitioned topic"; return FutureUtil.failedFuture(new IllegalStateException(msg)); } else { final CompletableFuture resultFuture = new CompletableFuture<>(); - validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES) - .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { - if (t == null) { - resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); - return; - } - if (!(t instanceof PersistentTopic)) { - resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED, - "Expire messages on a non-persistent topic is not allowed")); - return; - } - PersistentTopic topic = (PersistentTopic) t; - - boolean issued; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic - .getPersistentReplicator(remoteCluster); - if (repl == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Replicator not found")); - return; - } - issued = repl.expireMessages(expireTimeInSeconds); - } else { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } - issued = sub.expireMessages(expireTimeInSeconds); - } - if (issued) { - log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), - expireTimeInSeconds, topicName, subName); - resultFuture.complete(__); - } else { - if (log.isDebugEnabled()) { - log.debug("Expire message by timestamp not issued on topic {} for subscription {} " - + "due to ongoing message expiration not finished or subscription almost" - + " catch up. If it's performed on a partitioned topic operation might " - + "succeeded on other partitions, please check stats of individual " - + "partition.", topicName, subName); - } - resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message " - + "by timestamp not issued on topic " + topicName + " for subscription " - + subName + " due to ongoing message expiration not finished or subscription " - + "almost catch up. If it's performed on a partitioned topic operation might" - + " succeeded on other partitions, please check stats of individual partition." - )); - return; - } - }) - ).exceptionally(e -> { + getTopicReferenceAsync(topicName).thenAccept(t -> { + if (t == null) { + resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); + return; + } + if (!(t instanceof PersistentTopic)) { + resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED, + "Expire messages on a non-persistent topic is not allowed")); + return; + } + PersistentTopic topic = (PersistentTopic) t; + + boolean issued; + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) topic + .getPersistentReplicator(remoteCluster); + if (repl == null) { + resultFuture.completeExceptionally( + new RestException(Status.NOT_FOUND, "Replicator not found")); + return; + } + issued = repl.expireMessages(expireTimeInSeconds); + } else { + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + resultFuture.completeExceptionally( + new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + issued = sub.expireMessages(expireTimeInSeconds); + } + if (issued) { + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), + expireTimeInSeconds, topicName, subName); + resultFuture.complete(null); + } else { + if (log.isDebugEnabled()) { + log.debug("Expire message by timestamp not issued on topic {} for subscription {} " + + "due to ongoing message expiration not finished or subscription almost" + + " catch up. If it's performed on a partitioned topic operation might " + + "succeeded on other partitions, please check stats of individual " + + "partition.", topicName, subName); + } + resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message " + + "by timestamp not issued on topic " + topicName + " for subscription " + + subName + " due to ongoing message expiration not finished or subscription " + + "almost catch up. If it's performed on a partitioned topic operation might" + + " succeeded on other partitions, please check stats of individual partition." + )); + return; + } + }).exceptionally(e -> { resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); return null; });